car-server-core 0.15.1

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! FFI wrapper around `car_engine::dispatch_voice_turn`.
//!
//! This module surfaces the engine-side two-track dispatch through
//! every binding (NAPI / PyO3 / UniFFI / WebSocket) without each
//! repeating the event-fanout glue. The bindings provide a
//! [`VoiceEventSink`] (typically the same one they already use for
//! transcripts); this module spawns a task that drives the
//! [`VoiceTurnHandle`] and emits `voice.turn.*` JSON events into
//! that sink.
//!
//! Why this is *not* the orchestrator: `VoiceOrchestrator` (in
//! `car-voice`) embeds TTS, sentence-buffering, mixer cancellation,
//! and bridge phrase synthesis. FFI consumers play audio host-side —
//! they don't want CAR to own the speaker. So the FFI path is a thin
//! engine wrapper: caller gets `voice.turn.fast_delta` / `bridge` /
//! `sidecar` / `error` / `cancelled` events and decides what to do.
//!
//! See `docs/proposals/voice-sidecar-orchestration-plan.md` §B.7-B.8.

use std::sync::{Arc, OnceLock};

use car_engine::{
    dispatch_voice_turn, dispatch_voice_turn_sidecar_only, SidecarResult, VoiceTurnControl,
    VoiceTurnError, VoiceTurnHandle,
};
use car_inference::{
    intent::IntentHint, GenerateParams, GenerateRequest, InferenceEngine, StreamEvent,
};
use car_voice::{
    bridge_phrase, classify_utterance, compose_voice_context, ToolKind, UtteranceClass,
    VoiceConfig, VoiceEventSink,
};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

/// JSON request shape for `dispatch_voice_turn`. Callers pass this as
/// an opaque JSON string from each binding.
#[derive(Debug, Clone, Default, Deserialize)]
pub struct DispatchVoiceTurnRequest {
    /// Finalized utterance text (typically from STT).
    pub utterance: String,
    /// Optional voice session id this turn belongs to. Used as the
    /// `session_id` field on emitted events; defaults to
    /// `"voice-turn"` when absent.
    #[serde(default)]
    pub session_id: Option<String>,
    /// Optional override for the voice-context overlay. `None` uses
    /// `DEFAULT_VOICE_PROMPT_OVERLAY`; `Some("")` disables; any other
    /// value substitutes.
    #[serde(default)]
    pub config_overlay: Option<String>,
    /// Optional sidecar wait timeout in milliseconds. Default 30_000.
    #[serde(default)]
    pub sidecar_timeout_ms: Option<u64>,
}

/// Synchronous response shape — just the freshly minted turn id.
/// Bindings serialize this and return JSON to keep the ABI stable.
#[derive(Debug, Clone, Serialize)]
pub struct DispatchVoiceTurnResponse {
    pub turn_id: u64,
}

/// Default sidecar wait when the request omits `sidecar_timeout_ms`.
const DEFAULT_SIDECAR_TIMEOUT_MS: u64 = 30_000;

/// Process-wide cancellation registry. Holds the most recently
/// dispatched turn's control surface so `cancel_voice_turn` can find
/// it without callers passing handles back through the FFI.
fn current_handle_slot() -> &'static Mutex<Option<VoiceTurnControl>> {
    static SLOT: OnceLock<Mutex<Option<VoiceTurnControl>>> = OnceLock::new();
    SLOT.get_or_init(|| Mutex::new(None))
}

/// Dispatch a voice turn and fan events through `sink`. Returns the
/// freshly minted turn id immediately; events arrive asynchronously
/// through the sink as the fast and sidecar tasks resolve.
///
/// The caller (binding) is responsible for routing events to the
/// host's voice event handler (the same one transcripts already use).
pub async fn dispatch(
    engine: Arc<InferenceEngine>,
    req: DispatchVoiceTurnRequest,
    sink: Arc<dyn VoiceEventSink>,
) -> Result<DispatchVoiceTurnResponse, String> {
    let session_id = req
        .session_id
        .clone()
        .unwrap_or_else(|| "voice-turn".to_string());
    let timeout_ms = req.sidecar_timeout_ms.unwrap_or(DEFAULT_SIDECAR_TIMEOUT_MS);

    // Compose voice context honouring the optional override.
    let mut config = VoiceConfig::default();
    if let Some(overlay) = req.config_overlay.clone() {
        config.voice_prompt_overlay = Some(overlay);
    }
    let context = compose_voice_context(&config, None);

    let class = classify_utterance(&req.utterance);
    let handle = match class {
        UtteranceClass::ToolLikely(kind) => {
            // Bridge phrase becomes an event the host plays through
            // its own TTS — we don't synthesize audio here.
            let phrase = bridge_phrase(kind);
            // Use a placeholder turn id for the bridge event so the
            // host can correlate. We use the handle's turn id below
            // once the engine mints it; the bridge event uses that
            // same id.
            let sidecar_req = GenerateRequest {
                prompt: req.utterance.clone(),
                context: context.clone(),
                ..Default::default()
            };
            let h = dispatch_voice_turn_sidecar_only(
                engine.clone(),
                req.utterance.clone(),
                sidecar_req,
            );
            send_bridge_event(&sink, &session_id, h.turn_id(), kind, phrase);
            h
        }
        UtteranceClass::Conversational => {
            let fast_req = GenerateRequest {
                prompt: req.utterance.clone(),
                context: context.clone(),
                intent: Some(IntentHint {
                    prefer_fast: true,
                    ..IntentHint::default()
                }),
                ..Default::default()
            };
            let sidecar_req = GenerateRequest {
                prompt: req.utterance.clone(),
                context,
                ..Default::default()
            };
            dispatch_voice_turn(engine, req.utterance.clone(), fast_req, sidecar_req)
        }
    };

    let turn_id = handle.turn_id();
    // Stash the control surface so `cancel_voice_turn` can reach it.
    {
        let slot = current_handle_slot().lock().await;
        // Cancel any previous in-flight turn — caller-side semantics
        // mirror the orchestrator: a new dispatch supersedes whatever
        // came before.
        if let Some(prev) = slot.as_ref() {
            prev.cancel();
        }
        drop(slot);
        let mut slot = current_handle_slot().lock().await;
        *slot = Some(handle.control.clone());
    }

    // Spawn the drain task.
    tokio::spawn(drive_handle(handle, sink, session_id, timeout_ms));

    Ok(DispatchVoiceTurnResponse { turn_id })
}

/// Cancel the in-flight voice turn, if any. Idempotent.
pub async fn cancel() {
    let mut slot = current_handle_slot().lock().await;
    if let Some(c) = slot.as_ref() {
        c.cancel();
    }
    *slot = None;
}

/// Issue a 1-token probe with `prefer_fast: true` so the fast model
/// is loaded into memory before the first user turn. Best-effort —
/// errors are logged and swallowed. Idempotent.
pub async fn prewarm(engine: Arc<InferenceEngine>) {
    let req = GenerateRequest {
        prompt: ".".to_string(),
        params: GenerateParams {
            max_tokens: 1,
            ..Default::default()
        },
        intent: Some(IntentHint {
            prefer_fast: true,
            ..IntentHint::default()
        }),
        ..Default::default()
    };
    match engine.generate(req).await {
        Ok(_) => tracing::debug!("voice fast model prewarmed"),
        Err(e) => tracing::warn!(error = %e, "voice fast model prewarm failed"),
    }
}

async fn drive_handle(
    mut handle: VoiceTurnHandle,
    sink: Arc<dyn VoiceEventSink>,
    session_id: String,
    timeout_ms: u64,
) {
    let turn_id = handle.turn_id();

    // Drive fast track first. For sidecar-only handles, the fast
    // channel was pre-closed at construction; the loop exits
    // immediately.
    while let Some(evt) = handle.fast.recv().await {
        match evt {
            StreamEvent::TextDelta(t) => {
                send_event(&sink, &session_id, &json_fast_delta(turn_id, &t));
            }
            StreamEvent::Done { .. } => {
                send_event(&sink, &session_id, &json_fast_done(turn_id));
                break;
            }
            _ => {}
        }
    }

    // Sidecar wait with timeout.
    let timeout = tokio::time::Duration::from_millis(timeout_ms);
    let outcome = tokio::time::timeout(timeout, handle.sidecar).await;
    match outcome {
        Ok(Ok(Ok(SidecarResult { text, .. }))) => {
            send_event(&sink, &session_id, &json_sidecar(turn_id, &text));
        }
        Ok(Ok(Err(VoiceTurnError::Cancelled))) => {
            send_event(&sink, &session_id, &json_cancelled(turn_id));
        }
        Ok(Ok(Err(VoiceTurnError::Inference(e)))) => {
            send_event(&sink, &session_id, &json_error(turn_id, &e));
        }
        Ok(Err(_recv)) => {
            send_event(
                &sink,
                &session_id,
                &json_error(turn_id, "sidecar channel dropped"),
            );
        }
        Err(_elapsed) => {
            send_event(
                &sink,
                &session_id,
                &json_error(turn_id, &format!("sidecar timed out after {timeout_ms}ms")),
            );
        }
    }

    // Clear the slot only if we're still the current turn.
    let mut slot = current_handle_slot().lock().await;
    if slot.as_ref().map(|c| c.turn_id) == Some(turn_id) {
        *slot = None;
    }
}

fn send_event(sink: &Arc<dyn VoiceEventSink>, session_id: &str, payload: &str) {
    sink.send(session_id, payload.to_string());
}

fn send_bridge_event(
    sink: &Arc<dyn VoiceEventSink>,
    session_id: &str,
    turn_id: u64,
    kind: ToolKind,
    phrase: &str,
) {
    let kind_str = match kind {
        ToolKind::Email => "email",
        ToolKind::Calendar => "calendar",
        ToolKind::Search => "search",
        ToolKind::Unknown => "unknown",
    };
    let payload = format!(
        r#"{{"type":"voice.turn.bridge","turn_id":{turn_id},"kind":"{kind_str}","phrase":{phrase_json}}}"#,
        phrase_json = serde_json::to_string(phrase).unwrap_or_else(|_| "\"\"".into()),
    );
    sink.send(session_id, payload);
}

fn json_fast_delta(turn_id: u64, text: &str) -> String {
    format!(
        r#"{{"type":"voice.turn.fast_delta","turn_id":{turn_id},"text":{text}}}"#,
        text = serde_json::to_string(text).unwrap_or_else(|_| "\"\"".into()),
    )
}

fn json_fast_done(turn_id: u64) -> String {
    format!(r#"{{"type":"voice.turn.fast_done","turn_id":{turn_id}}}"#)
}

fn json_sidecar(turn_id: u64, text: &str) -> String {
    format!(
        r#"{{"type":"voice.turn.sidecar","turn_id":{turn_id},"text":{text}}}"#,
        text = serde_json::to_string(text).unwrap_or_else(|_| "\"\"".into()),
    )
}

fn json_error(turn_id: u64, message: &str) -> String {
    format!(
        r#"{{"type":"voice.turn.error","turn_id":{turn_id},"error":{msg}}}"#,
        msg = serde_json::to_string(message).unwrap_or_else(|_| "\"\"".into()),
    )
}

fn json_cancelled(turn_id: u64) -> String {
    format!(r#"{{"type":"voice.turn.cancelled","turn_id":{turn_id}}}"#)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn request_round_trips_through_serde() {
        let json = r#"{"utterance":"hello","session_id":"abc","sidecar_timeout_ms":5000}"#;
        let req: DispatchVoiceTurnRequest = serde_json::from_str(json).unwrap();
        assert_eq!(req.utterance, "hello");
        assert_eq!(req.session_id.as_deref(), Some("abc"));
        assert_eq!(req.sidecar_timeout_ms, Some(5000));
        assert!(req.config_overlay.is_none());
    }

    #[test]
    fn request_minimal_form_works() {
        let json = r#"{"utterance":"hi"}"#;
        let req: DispatchVoiceTurnRequest = serde_json::from_str(json).unwrap();
        assert_eq!(req.utterance, "hi");
        assert!(req.session_id.is_none());
        assert!(req.config_overlay.is_none());
        assert!(req.sidecar_timeout_ms.is_none());
    }

    #[test]
    fn json_event_payloads_are_valid_json() {
        for s in [
            json_fast_delta(1, "hello \"world\""),
            json_fast_done(2),
            json_sidecar(3, "answer\nwith newline"),
            json_error(4, "oops"),
            json_cancelled(5),
        ] {
            let _: serde_json::Value =
                serde_json::from_str(&s).unwrap_or_else(|e| panic!("invalid JSON {s:?}: {e}"));
        }
    }
}