rustvani 0.1.2

Voice AI framework for Rust — real-time speech pipelines with STT, LLM, TTS, and Dhara conversation flows
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use serde_json::Value;

use crate::context::LLMContext;
use crate::error::Result;
use crate::frames::{
    Frame, FrameDirection, FrameHandler, FrameInner, FrameProcessor, SystemFrame,
};

use super::models::{self, ClientReadyData, SendTextData};
use super::observer::{RaviObserver, RaviObserverParams};

// ---------------------------------------------------------------------------
// RaviParams
// ---------------------------------------------------------------------------

#[derive(Debug, Clone)]
pub struct RaviParams {
    pub about:            Option<Value>,
    pub auto_bot_ready:   bool,
    pub protocol_version: String,
    /// If set, `send-text` client messages will inject turns into this context
    /// and push an `LLMContextFrame` downstream. If `None`, `send-text` is
    /// logged as a warning and ignored.
    pub context:          Option<Arc<Mutex<LLMContext>>>,
}

impl Default for RaviParams {
    fn default() -> Self {
        Self {
            about:            Some(serde_json::json!({ "library": "rustvani" })),
            auto_bot_ready:   true,
            protocol_version: models::PROTOCOL_VERSION.to_string(),
            context:          None,
        }
    }
}

// ---------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------

struct State {
    client_ready:    bool,
    client_ready_id: String,
    bot_ready:       bool,
}

// ---------------------------------------------------------------------------
// RaviHandler
// ---------------------------------------------------------------------------

struct RaviHandler {
    params:  RaviParams,
    state:   Mutex<State>,
}

impl RaviHandler {
    fn new(params: RaviParams) -> Self {
        Self {
            params,
            state: Mutex::new(State {
                client_ready:    false,
                client_ready_id: String::new(),
                bot_ready:       false,
            }),
        }
    }

    async fn handle_client_ready(
        &self,
        processor: &FrameProcessor,
        msg_id: &str,
        data_raw: Option<&str>,
    ) -> Result<()> {
        let client_data: Option<ClientReadyData> =
            data_raw.and_then(|s| serde_json::from_str(s).ok());

        let version = client_data.as_ref().map(|d| d.version.as_str()).unwrap_or("unknown");
        log::info!("RaviProcessor: client-ready (version={})", version);

        let server_major = self.params.protocol_version
            .split('.')
            .next()
            .and_then(|s| s.parse::<u32>().ok())
            .unwrap_or(1);

        if let Some(client_major) = version.split('.').next().and_then(|s| s.parse::<u32>().ok()) {
            if client_major != server_major {
                let err = format!(
                    "RAVI version {} is not compatible with server protocol {}. \
                     Compatibility issues may occur.",
                    version, self.params.protocol_version
                );
                log::warn!("RaviProcessor: {}", err);
                let payload = models::msg_error_response(msg_id, &err);
                processor
                    .push_frame(
                        Frame::ravi_server_response(msg_id, payload),
                        FrameDirection::Downstream,
                    )
                    .await?;
            }
        }

        {
            let mut s = self.state.lock().unwrap();
            s.client_ready    = true;
            s.client_ready_id = msg_id.to_string();
        }

        if self.params.auto_bot_ready {
            self.send_bot_ready(processor).await?;
        }

        Ok(())
    }

    async fn send_bot_ready(&self, processor: &FrameProcessor) -> Result<()> {
        let (msg_id, already_sent) = {
            let mut s = self.state.lock().unwrap();
            let id   = s.client_ready_id.clone();
            let sent = s.bot_ready;
            s.bot_ready = true;
            (id, sent)
        };

        if already_sent {
            log::warn!("RaviProcessor: send_bot_ready called more than once — ignoring");
            return Ok(());
        }

        let payload = models::msg_bot_ready(&msg_id, self.params.about.clone());
        processor
            .push_frame(Frame::ravi_server_message(payload), FrameDirection::Downstream)
            .await?;

        log::info!("RaviProcessor: bot-ready sent");
        Ok(())
    }

    async fn handle_disconnect(&self, processor: &FrameProcessor) -> Result<()> {
        log::info!("RaviProcessor: disconnect-bot received — ending pipeline");
        processor.push_frame(Frame::end(), FrameDirection::Downstream).await
    }

    async fn handle_send_text(
        &self,
        processor: &FrameProcessor,
        data_raw: Option<&str>,
        context: &Arc<Mutex<LLMContext>>,
    ) -> Result<()> {
        let data: SendTextData = match data_raw.and_then(|s| serde_json::from_str(s).ok()) {
            Some(d) => d,
            None => {
                log::warn!("RaviProcessor: send-text missing or invalid data");
                return Ok(());
            }
        };

        let opts = data.options.unwrap_or_default();

        if opts.run_immediately {
            processor.broadcast_interruption().await?;
        }

        context.lock().unwrap().add_user_message(&data.content);
        processor
            .push_frame(Frame::llm_context(context.clone()), FrameDirection::Downstream)
            .await?;

        log::debug!("RaviProcessor: send-text injected: '{}'", data.content);
        Ok(())
    }

    async fn handle_client_message(
        &self,
        processor: &FrameProcessor,
        msg_id: &str,
        msg_type: &str,
        data: Option<String>,
    ) -> Result<()> {
        log::debug!("RaviProcessor: client-message type='{}' id='{}'", msg_type, msg_id);
        processor
            .push_frame(
                Frame::ravi_client_message(msg_id, msg_type, data),
                FrameDirection::Downstream,
            )
            .await
    }
}

// ---------------------------------------------------------------------------
// FrameHandler
// ---------------------------------------------------------------------------

#[async_trait]
impl FrameHandler for RaviHandler {
    async fn on_process_frame(
        &self,
        processor: &FrameProcessor,
        frame: Frame,
        direction: FrameDirection,
    ) -> Result<()> {
        match &frame.inner {
            FrameInner::System(SystemFrame::RaviClientMessage { msg_id, msg_type, data }) => {
                let msg_id   = msg_id.clone();
                let msg_type = msg_type.clone();
                let data     = data.clone();

                match msg_type.as_str() {
                    "client-ready" => {
                        self.handle_client_ready(processor, &msg_id, data.as_deref()).await?;
                    }
                    "disconnect-bot" => {
                        self.handle_disconnect(processor).await?;
                    }
                    "client-message" => {
                        if let Some(raw) = &data {
                            if let Ok(inner) = serde_json::from_str::<serde_json::Value>(raw) {
                                let inner_type = inner.get("t")
                                    .and_then(|v| v.as_str())
                                    .unwrap_or("unknown")
                                    .to_string();
                                let inner_data = inner.get("d").map(|v| v.to_string());
                                self.handle_client_message(
                                    processor, &msg_id, &inner_type, inner_data,
                                ).await?;
                            }
                        }
                    }
                    "send-text" => {
                        match &self.params.context {
                            Some(ctx) => {
                                self.handle_send_text(processor, data.as_deref(), ctx).await?;
                            }
                            None => {
                                log::warn!(
                                    "RaviProcessor: send-text received but no context wired — \
                                     set RaviParams::context to handle this"
                                );
                            }
                        }
                    }
                    unknown => {
                        log::warn!("RaviProcessor: unsupported message type '{}'", unknown);
                        let payload = models::msg_error_response(
                            &msg_id,
                            &format!("Unsupported message type: {}", unknown),
                        );
                        processor
                            .push_frame(
                                Frame::ravi_server_response(&msg_id, payload),
                                FrameDirection::Downstream,
                            )
                            .await?;
                    }
                }
            }
            _ => {
                processor.push_frame(frame, direction).await?;
            }
        }

        Ok(())
    }
}

// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------

pub struct RaviProcessor;

impl RaviProcessor {
    pub fn new(params: RaviParams) -> FrameProcessor {
        FrameProcessor::new("RaviProcessor", Box::new(RaviHandler::new(params)), false)
    }

    pub fn create_observer(proc: &FrameProcessor, params: RaviObserverParams) -> RaviObserver {
        RaviObserver::new(proc.clone(), params)
    }
}