use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use serde_json::Value;
use crate::context::LLMContext;
use crate::error::Result;
use crate::frames::{
Frame, FrameDirection, FrameHandler, FrameInner, FrameProcessor, SystemFrame,
};
use super::models::{self, ClientReadyData, SendTextData};
use super::observer::{RaviObserver, RaviObserverParams};
#[derive(Debug, Clone)]
pub struct RaviParams {
pub about: Option<Value>,
pub auto_bot_ready: bool,
pub protocol_version: String,
pub context: Option<Arc<Mutex<LLMContext>>>,
}
impl Default for RaviParams {
fn default() -> Self {
Self {
about: Some(serde_json::json!({ "library": "rustvani" })),
auto_bot_ready: true,
protocol_version: models::PROTOCOL_VERSION.to_string(),
context: None,
}
}
}
struct State {
client_ready: bool,
client_ready_id: String,
bot_ready: bool,
}
struct RaviHandler {
params: RaviParams,
state: Mutex<State>,
}
impl RaviHandler {
fn new(params: RaviParams) -> Self {
Self {
params,
state: Mutex::new(State {
client_ready: false,
client_ready_id: String::new(),
bot_ready: false,
}),
}
}
async fn handle_client_ready(
&self,
processor: &FrameProcessor,
msg_id: &str,
data_raw: Option<&str>,
) -> Result<()> {
let client_data: Option<ClientReadyData> =
data_raw.and_then(|s| serde_json::from_str(s).ok());
let version = client_data.as_ref().map(|d| d.version.as_str()).unwrap_or("unknown");
log::info!("RaviProcessor: client-ready (version={})", version);
let server_major = self.params.protocol_version
.split('.')
.next()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(1);
if let Some(client_major) = version.split('.').next().and_then(|s| s.parse::<u32>().ok()) {
if client_major != server_major {
let err = format!(
"RAVI version {} is not compatible with server protocol {}. \
Compatibility issues may occur.",
version, self.params.protocol_version
);
log::warn!("RaviProcessor: {}", err);
let payload = models::msg_error_response(msg_id, &err);
processor
.push_frame(
Frame::ravi_server_response(msg_id, payload),
FrameDirection::Downstream,
)
.await?;
}
}
{
let mut s = self.state.lock().unwrap();
s.client_ready = true;
s.client_ready_id = msg_id.to_string();
}
if self.params.auto_bot_ready {
self.send_bot_ready(processor).await?;
}
Ok(())
}
async fn send_bot_ready(&self, processor: &FrameProcessor) -> Result<()> {
let (msg_id, already_sent) = {
let mut s = self.state.lock().unwrap();
let id = s.client_ready_id.clone();
let sent = s.bot_ready;
s.bot_ready = true;
(id, sent)
};
if already_sent {
log::warn!("RaviProcessor: send_bot_ready called more than once — ignoring");
return Ok(());
}
let payload = models::msg_bot_ready(&msg_id, self.params.about.clone());
processor
.push_frame(Frame::ravi_server_message(payload), FrameDirection::Downstream)
.await?;
log::info!("RaviProcessor: bot-ready sent");
Ok(())
}
async fn handle_disconnect(&self, processor: &FrameProcessor) -> Result<()> {
log::info!("RaviProcessor: disconnect-bot received — ending pipeline");
processor.push_frame(Frame::end(), FrameDirection::Downstream).await
}
async fn handle_send_text(
&self,
processor: &FrameProcessor,
data_raw: Option<&str>,
context: &Arc<Mutex<LLMContext>>,
) -> Result<()> {
let data: SendTextData = match data_raw.and_then(|s| serde_json::from_str(s).ok()) {
Some(d) => d,
None => {
log::warn!("RaviProcessor: send-text missing or invalid data");
return Ok(());
}
};
let opts = data.options.unwrap_or_default();
if opts.run_immediately {
processor.broadcast_interruption().await?;
}
context.lock().unwrap().add_user_message(&data.content);
processor
.push_frame(Frame::llm_context(context.clone()), FrameDirection::Downstream)
.await?;
log::debug!("RaviProcessor: send-text injected: '{}'", data.content);
Ok(())
}
async fn handle_client_message(
&self,
processor: &FrameProcessor,
msg_id: &str,
msg_type: &str,
data: Option<String>,
) -> Result<()> {
log::debug!("RaviProcessor: client-message type='{}' id='{}'", msg_type, msg_id);
processor
.push_frame(
Frame::ravi_client_message(msg_id, msg_type, data),
FrameDirection::Downstream,
)
.await
}
}
#[async_trait]
impl FrameHandler for RaviHandler {
async fn on_process_frame(
&self,
processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
match &frame.inner {
FrameInner::System(SystemFrame::RaviClientMessage { msg_id, msg_type, data }) => {
let msg_id = msg_id.clone();
let msg_type = msg_type.clone();
let data = data.clone();
match msg_type.as_str() {
"client-ready" => {
self.handle_client_ready(processor, &msg_id, data.as_deref()).await?;
}
"disconnect-bot" => {
self.handle_disconnect(processor).await?;
}
"client-message" => {
if let Some(raw) = &data {
if let Ok(inner) = serde_json::from_str::<serde_json::Value>(raw) {
let inner_type = inner.get("t")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let inner_data = inner.get("d").map(|v| v.to_string());
self.handle_client_message(
processor, &msg_id, &inner_type, inner_data,
).await?;
}
}
}
"send-text" => {
match &self.params.context {
Some(ctx) => {
self.handle_send_text(processor, data.as_deref(), ctx).await?;
}
None => {
log::warn!(
"RaviProcessor: send-text received but no context wired — \
set RaviParams::context to handle this"
);
}
}
}
unknown => {
log::warn!("RaviProcessor: unsupported message type '{}'", unknown);
let payload = models::msg_error_response(
&msg_id,
&format!("Unsupported message type: {}", unknown),
);
processor
.push_frame(
Frame::ravi_server_response(&msg_id, payload),
FrameDirection::Downstream,
)
.await?;
}
}
}
_ => {
processor.push_frame(frame, direction).await?;
}
}
Ok(())
}
}
pub struct RaviProcessor;
impl RaviProcessor {
pub fn new(params: RaviParams) -> FrameProcessor {
FrameProcessor::new("RaviProcessor", Box::new(RaviHandler::new(params)), false)
}
pub fn create_observer(proc: &FrameProcessor, params: RaviObserverParams) -> RaviObserver {
RaviObserver::new(proc.clone(), params)
}
}