use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use car_eventlog::{EventKind, EventLog};
use car_inference::{GenerateRequest, InferenceEngine, StreamEvent};
use serde_json::Value;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
fn next_turn_id() -> u64 {
static COUNTER: AtomicU64 = AtomicU64::new(1);
COUNTER.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone)]
pub struct SidecarResult {
pub turn_id: u64,
pub text: String,
pub data: Option<serde_json::Value>,
}
#[derive(Debug, thiserror::Error)]
pub enum VoiceTurnError {
#[error("inference failed: {0}")]
Inference(String),
#[error("turn cancelled (barge-in or supersession)")]
Cancelled,
}
#[derive(Clone)]
pub struct VoiceTurnControl {
pub turn_id: u64,
cancel: CancellationToken,
}
impl VoiceTurnControl {
pub fn cancel(&self) {
self.cancel.cancel();
}
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
}
pub struct VoiceTurnHandle {
pub control: VoiceTurnControl,
pub fast: mpsc::Receiver<StreamEvent>,
pub sidecar: oneshot::Receiver<Result<SidecarResult, VoiceTurnError>>,
}
impl VoiceTurnHandle {
pub fn turn_id(&self) -> u64 {
self.control.turn_id
}
pub fn cancel(&self) {
self.control.cancel();
}
}
#[async_trait::async_trait]
pub trait DirectDataFetcher: Send + Sync {
async fn try_fetch(&self, utterance: &str) -> Option<Result<String, String>>;
}
#[derive(Clone)]
pub struct VoiceTelemetry {
log: Arc<Mutex<EventLog>>,
}
impl VoiceTelemetry {
pub fn new(log: Arc<Mutex<EventLog>>) -> Self {
Self { log }
}
pub fn emit(&self, kind: EventKind, turn_id: u64, extra: Vec<(&str, Value)>) {
let mut data: HashMap<String, Value> = HashMap::new();
data.insert("turn_id".to_string(), Value::from(turn_id));
for (k, v) in extra {
data.insert(k.to_string(), v);
}
if let Ok(mut guard) = self.log.lock() {
guard.append(kind, None, None, data);
}
}
}
pub fn dispatch_voice_turn(
engine: Arc<InferenceEngine>,
utterance: String,
fast_request: GenerateRequest,
sidecar_request: GenerateRequest,
) -> VoiceTurnHandle {
dispatch_voice_turn_with_telemetry(engine, utterance, fast_request, sidecar_request, None)
}
pub fn dispatch_voice_turn_with_telemetry(
engine: Arc<InferenceEngine>,
_utterance: String,
fast_request: GenerateRequest,
sidecar_request: GenerateRequest,
telemetry: Option<VoiceTelemetry>,
) -> VoiceTurnHandle {
let turn_id = next_turn_id();
let cancel = CancellationToken::new();
let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(64);
let (sidecar_tx, sidecar_rx) = oneshot::channel();
if let Some(t) = telemetry.as_ref() {
t.emit(EventKind::VoiceFastTurnStarted, turn_id, vec![]);
}
spawn_fast_task(
engine.clone(),
fast_request,
fast_tx,
cancel.clone(),
turn_id,
telemetry.clone(),
);
spawn_sidecar_task(
engine,
sidecar_request,
sidecar_tx,
cancel.clone(),
turn_id,
telemetry,
);
VoiceTurnHandle {
control: VoiceTurnControl { turn_id, cancel },
fast: fast_rx,
sidecar: sidecar_rx,
}
}
pub fn dispatch_voice_turn_sidecar_only(
engine: Arc<InferenceEngine>,
utterance: String,
sidecar_request: GenerateRequest,
) -> VoiceTurnHandle {
dispatch_voice_turn_sidecar_only_with_telemetry(engine, utterance, sidecar_request, None)
}
pub fn dispatch_voice_turn_sidecar_only_with_telemetry(
engine: Arc<InferenceEngine>,
utterance: String,
sidecar_request: GenerateRequest,
telemetry: Option<VoiceTelemetry>,
) -> VoiceTurnHandle {
dispatch_voice_turn_sidecar_only_with_classifier(
engine,
utterance,
sidecar_request,
None,
telemetry,
)
}
pub fn dispatch_voice_turn_sidecar_only_with_classifier(
engine: Arc<InferenceEngine>,
utterance: String,
sidecar_request: GenerateRequest,
fetcher: Option<Arc<dyn DirectDataFetcher>>,
telemetry: Option<VoiceTelemetry>,
) -> VoiceTurnHandle {
let turn_id = next_turn_id();
let cancel = CancellationToken::new();
let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
drop(fast_tx);
let (sidecar_tx, sidecar_rx) = oneshot::channel();
spawn_sidecar_task_classified(
engine,
utterance,
sidecar_request,
sidecar_tx,
cancel.clone(),
turn_id,
fetcher,
telemetry,
);
VoiceTurnHandle {
control: VoiceTurnControl { turn_id, cancel },
fast: fast_rx,
sidecar: sidecar_rx,
}
}
fn spawn_fast_task(
engine: Arc<InferenceEngine>,
request: GenerateRequest,
out: mpsc::Sender<StreamEvent>,
cancel: CancellationToken,
turn_id: u64,
telemetry: Option<VoiceTelemetry>,
) {
tokio::spawn(async move {
let cancelled_during = tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::debug!(turn_id, "fast task cancelled before inference start");
true
}
res = engine.generate_tracked_stream(request) => {
match res {
Ok(mut rx) => {
relay_fast_stream(&mut rx, &out, &cancel, turn_id).await;
cancel.is_cancelled()
}
Err(e) => {
tracing::error!(turn_id, error=%e, "fast turn inference failed");
false
}
}
}
};
if let Some(t) = telemetry {
if cancelled_during {
t.emit(
EventKind::VoiceTurnCancelled,
turn_id,
vec![("track", "fast".into())],
);
} else {
t.emit(EventKind::VoiceFastTurnEnded, turn_id, vec![]);
}
}
});
}
async fn relay_fast_stream(
rx: &mut mpsc::Receiver<StreamEvent>,
out: &mpsc::Sender<StreamEvent>,
cancel: &CancellationToken,
turn_id: u64,
) {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::debug!(turn_id, "fast stream cancelled mid-relay");
break;
}
evt = rx.recv() => match evt {
Some(e) => {
if out.send(e).await.is_err() {
break;
}
}
None => break,
}
}
}
}
fn spawn_sidecar_task_classified(
engine: Arc<InferenceEngine>,
utterance: String,
request: GenerateRequest,
sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
cancel: CancellationToken,
turn_id: u64,
fetcher: Option<Arc<dyn DirectDataFetcher>>,
telemetry: Option<VoiceTelemetry>,
) {
tokio::spawn(async move {
if let Some(f) = fetcher.as_ref() {
let fetch_outcome = tokio::select! {
biased;
_ = cancel.cancelled() => None,
outcome = f.try_fetch(&utterance) => outcome,
};
match fetch_outcome {
Some(Ok(text)) => {
let result = Ok(SidecarResult {
turn_id,
text: text.clone(),
data: None,
});
if let Some(t) = telemetry {
t.emit(
EventKind::VoiceSidecarResolved,
turn_id,
vec![
("text_len", Value::from(text.len())),
("source", "direct_fetch".into()),
],
);
}
let _ = sender.send(result);
return;
}
Some(Err(e)) => {
tracing::debug!(turn_id, error=%e, "DirectDataFetcher errored; falling through to LLM");
}
None => { }
}
if cancel.is_cancelled() {
let _ = sender.send(Err(VoiceTurnError::Cancelled));
if let Some(t) = telemetry {
t.emit(
EventKind::VoiceTurnCancelled,
turn_id,
vec![("track", "sidecar".into())],
);
}
return;
}
}
run_llm_sidecar(engine, request, sender, cancel, turn_id, telemetry).await;
});
}
async fn run_llm_sidecar(
engine: Arc<InferenceEngine>,
request: GenerateRequest,
sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
cancel: CancellationToken,
turn_id: u64,
telemetry: Option<VoiceTelemetry>,
) {
let result = tokio::select! {
biased;
_ = cancel.cancelled() => Err(VoiceTurnError::Cancelled),
res = engine.generate(request) => {
res.map(|text| SidecarResult { turn_id, text, data: None })
.map_err(|e| VoiceTurnError::Inference(e.to_string()))
}
};
if let Some(t) = telemetry {
match &result {
Ok(r) => t.emit(
EventKind::VoiceSidecarResolved,
turn_id,
vec![("text_len", Value::from(r.text.len()))],
),
Err(VoiceTurnError::Cancelled) => {
t.emit(
EventKind::VoiceTurnCancelled,
turn_id,
vec![("track", "sidecar".into())],
);
}
Err(VoiceTurnError::Inference(e)) => {
t.emit(
EventKind::VoiceSidecarFailed,
turn_id,
vec![("error", Value::from(e.clone()))],
);
}
}
}
let _ = sender.send(result);
}
fn spawn_sidecar_task(
engine: Arc<InferenceEngine>,
request: GenerateRequest,
sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
cancel: CancellationToken,
turn_id: u64,
telemetry: Option<VoiceTelemetry>,
) {
tokio::spawn(run_llm_sidecar(
engine, request, sender, cancel, turn_id, telemetry,
));
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn turn_ids_are_monotonic_and_unique() {
let a = next_turn_id();
let b = next_turn_id();
let c = next_turn_id();
assert!(b > a);
assert!(c > b);
}
#[test]
fn control_cancel_is_observable() {
let control = VoiceTurnControl {
turn_id: 42,
cancel: CancellationToken::new(),
};
assert!(!control.is_cancelled());
let clone = control.clone();
clone.cancel();
assert!(control.is_cancelled());
}
#[test]
fn handle_turn_id_delegates_to_control() {
let (_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
let (_stx, sidecar_rx) = oneshot::channel();
let handle = VoiceTurnHandle {
control: VoiceTurnControl {
turn_id: 7,
cancel: CancellationToken::new(),
},
fast: fast_rx,
sidecar: sidecar_rx,
};
assert_eq!(handle.turn_id(), 7);
assert!(!handle.control.is_cancelled());
handle.cancel();
assert!(handle.control.is_cancelled());
}
#[tokio::test]
async fn closed_fast_channel_recv_is_none() {
let (fast_tx, mut fast_rx) = mpsc::channel::<StreamEvent>(1);
drop(fast_tx);
assert!(fast_rx.recv().await.is_none());
}
#[tokio::test]
async fn cancellation_propagates_to_relay_fast_stream() {
let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
let (out_tx, mut out_rx) = mpsc::channel::<StreamEvent>(8);
let cancel = CancellationToken::new();
let producer = tokio::spawn(async move {
for i in 0..100u32 {
if in_tx
.send(StreamEvent::TextDelta(format!("d{i}")))
.await
.is_err()
{
break;
}
}
});
let cancel_clone = cancel.clone();
let relay = tokio::spawn(async move {
relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
});
let first = out_rx.recv().await.expect("first event");
match first {
StreamEvent::TextDelta(_) => {}
other => panic!("unexpected event: {other:?}"),
}
cancel.cancel();
tokio::time::timeout(std::time::Duration::from_secs(1), relay)
.await
.expect("relay did not exit after cancel")
.expect("relay panicked");
producer.abort();
}
#[tokio::test]
async fn direct_fetcher_hit_skips_llm_and_resolves_sidecar() {
struct Hit;
#[async_trait::async_trait]
impl DirectDataFetcher for Hit {
async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
Some(Ok("3 emails: Bob, Alice, Carol".to_string()))
}
}
let cancel = CancellationToken::new();
let (tx, rx) = oneshot::channel();
let log = Arc::new(Mutex::new(EventLog::new()));
let telemetry = VoiceTelemetry::new(log.clone());
let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
car_inference::InferenceConfig::default(),
));
spawn_sidecar_task_classified(
dummy_engine,
"any new email today".to_string(),
GenerateRequest::default(),
tx,
cancel,
99,
Some(Arc::new(Hit)),
Some(telemetry),
);
let r = rx.await.expect("oneshot delivered").expect("ok");
assert_eq!(r.turn_id, 99);
assert_eq!(r.text, "3 emails: Bob, Alice, Carol");
let g = log.lock().unwrap();
let evt = g.events().last().expect("event emitted");
assert_eq!(evt.kind, EventKind::VoiceSidecarResolved);
assert_eq!(evt.data.get("source"), Some(&Value::from("direct_fetch")));
}
#[tokio::test]
async fn direct_fetcher_miss_falls_through_but_we_observe_no_short_circuit() {
struct Miss;
#[async_trait::async_trait]
impl DirectDataFetcher for Miss {
async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
None
}
}
let cancel = CancellationToken::new();
let (tx, rx) = oneshot::channel();
let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
car_inference::InferenceConfig::default(),
));
spawn_sidecar_task_classified(
dummy_engine,
"what's the weather".to_string(),
GenerateRequest::default(),
tx,
cancel.clone(),
100,
Some(Arc::new(Miss)),
None,
);
cancel.cancel();
match rx.await.expect("oneshot delivered") {
Err(VoiceTurnError::Cancelled) => {}
other => panic!("expected Cancelled after fetcher miss + cancel, got {other:?}"),
}
}
#[test]
fn telemetry_emit_appends_to_eventlog() {
let log = Arc::new(Mutex::new(EventLog::new()));
let telemetry = VoiceTelemetry::new(log.clone());
telemetry.emit(EventKind::VoiceFastTurnStarted, 7, vec![]);
telemetry.emit(
EventKind::VoiceSidecarResolved,
7,
vec![("text_len", Value::from(42usize))],
);
let g = log.lock().unwrap();
let events = g.events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].kind, EventKind::VoiceFastTurnStarted);
assert_eq!(events[0].data.get("turn_id"), Some(&Value::from(7u64)));
assert_eq!(events[1].kind, EventKind::VoiceSidecarResolved);
assert_eq!(events[1].data.get("text_len"), Some(&Value::from(42usize)));
}
#[tokio::test]
async fn dropped_out_channel_stops_relay_without_cancel() {
let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
let (out_tx, out_rx) = mpsc::channel::<StreamEvent>(8);
let cancel = CancellationToken::new();
drop(out_rx);
let cancel_clone = cancel.clone();
let relay = tokio::spawn(async move {
relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
});
in_tx
.send(StreamEvent::TextDelta("x".into()))
.await
.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(1), relay)
.await
.expect("relay did not exit after out_rx drop")
.expect("relay panicked");
}
}