use std::sync::{Arc, OnceLock};
use car_engine::{
dispatch_voice_turn, dispatch_voice_turn_sidecar_only, SidecarResult, VoiceTurnControl,
VoiceTurnError, VoiceTurnHandle,
};
use car_inference::{
intent::IntentHint, GenerateParams, GenerateRequest, InferenceEngine, StreamEvent,
};
use car_voice::{
bridge_phrase, classify_utterance, compose_voice_context, ToolKind, UtteranceClass,
VoiceConfig, VoiceEventSink,
};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Default, Deserialize)]
pub struct DispatchVoiceTurnRequest {
pub utterance: String,
#[serde(default)]
pub session_id: Option<String>,
#[serde(default)]
pub config_overlay: Option<String>,
#[serde(default)]
pub sidecar_timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct DispatchVoiceTurnResponse {
pub turn_id: u64,
}
const DEFAULT_SIDECAR_TIMEOUT_MS: u64 = 30_000;
fn current_handle_slot() -> &'static Mutex<Option<VoiceTurnControl>> {
static SLOT: OnceLock<Mutex<Option<VoiceTurnControl>>> = OnceLock::new();
SLOT.get_or_init(|| Mutex::new(None))
}
pub async fn dispatch(
engine: Arc<InferenceEngine>,
req: DispatchVoiceTurnRequest,
sink: Arc<dyn VoiceEventSink>,
) -> Result<DispatchVoiceTurnResponse, String> {
let session_id = req
.session_id
.clone()
.unwrap_or_else(|| "voice-turn".to_string());
let timeout_ms = req.sidecar_timeout_ms.unwrap_or(DEFAULT_SIDECAR_TIMEOUT_MS);
let mut config = VoiceConfig::default();
if let Some(overlay) = req.config_overlay.clone() {
config.voice_prompt_overlay = Some(overlay);
}
let context = compose_voice_context(&config, None);
let class = classify_utterance(&req.utterance);
let handle = match class {
UtteranceClass::ToolLikely(kind) => {
let phrase = bridge_phrase(kind);
let sidecar_req = GenerateRequest {
prompt: req.utterance.clone(),
context: context.clone(),
..Default::default()
};
let h = dispatch_voice_turn_sidecar_only(
engine.clone(),
req.utterance.clone(),
sidecar_req,
);
send_bridge_event(&sink, &session_id, h.turn_id(), kind, phrase);
h
}
UtteranceClass::Conversational => {
let fast_req = GenerateRequest {
prompt: req.utterance.clone(),
context: context.clone(),
intent: Some(IntentHint {
prefer_fast: true,
..IntentHint::default()
}),
..Default::default()
};
let sidecar_req = GenerateRequest {
prompt: req.utterance.clone(),
context,
..Default::default()
};
dispatch_voice_turn(engine, req.utterance.clone(), fast_req, sidecar_req)
}
};
let turn_id = handle.turn_id();
{
let slot = current_handle_slot().lock().await;
if let Some(prev) = slot.as_ref() {
prev.cancel();
}
drop(slot);
let mut slot = current_handle_slot().lock().await;
*slot = Some(handle.control.clone());
}
tokio::spawn(drive_handle(handle, sink, session_id, timeout_ms));
Ok(DispatchVoiceTurnResponse { turn_id })
}
pub async fn cancel() {
let mut slot = current_handle_slot().lock().await;
if let Some(c) = slot.as_ref() {
c.cancel();
}
*slot = None;
}
pub async fn prewarm(engine: Arc<InferenceEngine>) {
let req = GenerateRequest {
prompt: ".".to_string(),
params: GenerateParams {
max_tokens: 1,
..Default::default()
},
intent: Some(IntentHint {
prefer_fast: true,
..IntentHint::default()
}),
..Default::default()
};
match engine.generate(req).await {
Ok(_) => tracing::debug!("voice fast model prewarmed"),
Err(e) => tracing::warn!(error = %e, "voice fast model prewarm failed"),
}
}
async fn drive_handle(
mut handle: VoiceTurnHandle,
sink: Arc<dyn VoiceEventSink>,
session_id: String,
timeout_ms: u64,
) {
let turn_id = handle.turn_id();
while let Some(evt) = handle.fast.recv().await {
match evt {
StreamEvent::TextDelta(t) => {
send_event(&sink, &session_id, &json_fast_delta(turn_id, &t));
}
StreamEvent::Done { .. } => {
send_event(&sink, &session_id, &json_fast_done(turn_id));
break;
}
_ => {}
}
}
let timeout = tokio::time::Duration::from_millis(timeout_ms);
let outcome = tokio::time::timeout(timeout, handle.sidecar).await;
match outcome {
Ok(Ok(Ok(SidecarResult { text, .. }))) => {
send_event(&sink, &session_id, &json_sidecar(turn_id, &text));
}
Ok(Ok(Err(VoiceTurnError::Cancelled))) => {
send_event(&sink, &session_id, &json_cancelled(turn_id));
}
Ok(Ok(Err(VoiceTurnError::Inference(e)))) => {
send_event(&sink, &session_id, &json_error(turn_id, &e));
}
Ok(Err(_recv)) => {
send_event(
&sink,
&session_id,
&json_error(turn_id, "sidecar channel dropped"),
);
}
Err(_elapsed) => {
send_event(
&sink,
&session_id,
&json_error(
turn_id,
&format!("sidecar timed out after {timeout_ms}ms"),
),
);
}
}
let mut slot = current_handle_slot().lock().await;
if slot.as_ref().map(|c| c.turn_id) == Some(turn_id) {
*slot = None;
}
}
fn send_event(sink: &Arc<dyn VoiceEventSink>, session_id: &str, payload: &str) {
sink.send(session_id, payload.to_string());
}
fn send_bridge_event(
sink: &Arc<dyn VoiceEventSink>,
session_id: &str,
turn_id: u64,
kind: ToolKind,
phrase: &str,
) {
let kind_str = match kind {
ToolKind::Email => "email",
ToolKind::Calendar => "calendar",
ToolKind::Search => "search",
ToolKind::Unknown => "unknown",
};
let payload = format!(
r#"{{"type":"voice.turn.bridge","turn_id":{turn_id},"kind":"{kind_str}","phrase":{phrase_json}}}"#,
phrase_json = serde_json::to_string(phrase).unwrap_or_else(|_| "\"\"".into()),
);
sink.send(session_id, payload);
}
fn json_fast_delta(turn_id: u64, text: &str) -> String {
format!(
r#"{{"type":"voice.turn.fast_delta","turn_id":{turn_id},"text":{text}}}"#,
text = serde_json::to_string(text).unwrap_or_else(|_| "\"\"".into()),
)
}
fn json_fast_done(turn_id: u64) -> String {
format!(r#"{{"type":"voice.turn.fast_done","turn_id":{turn_id}}}"#)
}
fn json_sidecar(turn_id: u64, text: &str) -> String {
format!(
r#"{{"type":"voice.turn.sidecar","turn_id":{turn_id},"text":{text}}}"#,
text = serde_json::to_string(text).unwrap_or_else(|_| "\"\"".into()),
)
}
fn json_error(turn_id: u64, message: &str) -> String {
format!(
r#"{{"type":"voice.turn.error","turn_id":{turn_id},"error":{msg}}}"#,
msg = serde_json::to_string(message).unwrap_or_else(|_| "\"\"".into()),
)
}
fn json_cancelled(turn_id: u64) -> String {
format!(r#"{{"type":"voice.turn.cancelled","turn_id":{turn_id}}}"#)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_round_trips_through_serde() {
let json = r#"{"utterance":"hello","session_id":"abc","sidecar_timeout_ms":5000}"#;
let req: DispatchVoiceTurnRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.utterance, "hello");
assert_eq!(req.session_id.as_deref(), Some("abc"));
assert_eq!(req.sidecar_timeout_ms, Some(5000));
assert!(req.config_overlay.is_none());
}
#[test]
fn request_minimal_form_works() {
let json = r#"{"utterance":"hi"}"#;
let req: DispatchVoiceTurnRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.utterance, "hi");
assert!(req.session_id.is_none());
assert!(req.config_overlay.is_none());
assert!(req.sidecar_timeout_ms.is_none());
}
#[test]
fn json_event_payloads_are_valid_json() {
for s in [
json_fast_delta(1, "hello \"world\""),
json_fast_done(2),
json_sidecar(3, "answer\nwith newline"),
json_error(4, "oops"),
json_cancelled(5),
] {
let _: serde_json::Value =
serde_json::from_str(&s).unwrap_or_else(|e| panic!("invalid JSON {s:?}: {e}"));
}
}
}