1use std::collections::HashMap;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::sync::{Arc, Mutex};
31
32use car_eventlog::{EventKind, EventLog};
33use car_inference::{GenerateRequest, InferenceEngine, StreamEvent};
34use serde_json::Value;
35use tokio::sync::{mpsc, oneshot};
36use tokio_util::sync::CancellationToken;
37
38fn next_turn_id() -> u64 {
40 static COUNTER: AtomicU64 = AtomicU64::new(1);
41 COUNTER.fetch_add(1, Ordering::Relaxed)
42}
43
44#[derive(Debug, Clone)]
47pub struct SidecarResult {
48 pub turn_id: u64,
52 pub text: String,
54 pub data: Option<serde_json::Value>,
57}
58
59#[derive(Debug, thiserror::Error)]
61pub enum VoiceTurnError {
62 #[error("inference failed: {0}")]
66 Inference(String),
67 #[error("turn cancelled (barge-in or supersession)")]
70 Cancelled,
71}
72
73#[derive(Clone)]
79pub struct VoiceTurnControl {
80 pub turn_id: u64,
83 cancel: CancellationToken,
84}
85
86impl VoiceTurnControl {
87 pub fn cancel(&self) {
89 self.cancel.cancel();
90 }
91
92 pub fn is_cancelled(&self) -> bool {
94 self.cancel.is_cancelled()
95 }
96}
97
98pub struct VoiceTurnHandle {
111 pub control: VoiceTurnControl,
112 pub fast: mpsc::Receiver<StreamEvent>,
113 pub sidecar: oneshot::Receiver<Result<SidecarResult, VoiceTurnError>>,
114}
115
116impl VoiceTurnHandle {
117 pub fn turn_id(&self) -> u64 {
119 self.control.turn_id
120 }
121
122 pub fn cancel(&self) {
124 self.control.cancel();
125 }
126}
127
128#[async_trait::async_trait]
142pub trait DirectDataFetcher: Send + Sync {
143 async fn try_fetch(&self, utterance: &str) -> Option<Result<String, String>>;
150}
151
152#[derive(Clone)]
156pub struct VoiceTelemetry {
157 log: Arc<Mutex<EventLog>>,
158}
159
160impl VoiceTelemetry {
161 pub fn new(log: Arc<Mutex<EventLog>>) -> Self {
163 Self { log }
164 }
165
166 pub fn emit(&self, kind: EventKind, turn_id: u64, extra: Vec<(&str, Value)>) {
170 let mut data: HashMap<String, Value> = HashMap::new();
171 data.insert("turn_id".to_string(), Value::from(turn_id));
172 for (k, v) in extra {
173 data.insert(k.to_string(), v);
174 }
175 if let Ok(mut guard) = self.log.lock() {
176 guard.append(kind, None, None, data);
177 }
178 }
179}
180
181pub fn dispatch_voice_turn(
194 engine: Arc<InferenceEngine>,
195 utterance: String,
196 fast_request: GenerateRequest,
197 sidecar_request: GenerateRequest,
198) -> VoiceTurnHandle {
199 dispatch_voice_turn_with_telemetry(engine, utterance, fast_request, sidecar_request, None)
200}
201
202pub fn dispatch_voice_turn_with_telemetry(
207 engine: Arc<InferenceEngine>,
208 _utterance: String,
209 fast_request: GenerateRequest,
210 sidecar_request: GenerateRequest,
211 telemetry: Option<VoiceTelemetry>,
212) -> VoiceTurnHandle {
213 let turn_id = next_turn_id();
214 let cancel = CancellationToken::new();
215 let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(64);
216 let (sidecar_tx, sidecar_rx) = oneshot::channel();
217
218 if let Some(t) = telemetry.as_ref() {
219 t.emit(EventKind::VoiceFastTurnStarted, turn_id, vec![]);
220 }
221
222 spawn_fast_task(
223 engine.clone(),
224 fast_request,
225 fast_tx,
226 cancel.clone(),
227 turn_id,
228 telemetry.clone(),
229 );
230 spawn_sidecar_task(
231 engine,
232 sidecar_request,
233 sidecar_tx,
234 cancel.clone(),
235 turn_id,
236 telemetry,
237 );
238
239 VoiceTurnHandle {
240 control: VoiceTurnControl { turn_id, cancel },
241 fast: fast_rx,
242 sidecar: sidecar_rx,
243 }
244}
245
246pub fn dispatch_voice_turn_sidecar_only(
254 engine: Arc<InferenceEngine>,
255 utterance: String,
256 sidecar_request: GenerateRequest,
257) -> VoiceTurnHandle {
258 dispatch_voice_turn_sidecar_only_with_telemetry(engine, utterance, sidecar_request, None)
259}
260
261pub fn dispatch_voice_turn_sidecar_only_with_telemetry(
264 engine: Arc<InferenceEngine>,
265 utterance: String,
266 sidecar_request: GenerateRequest,
267 telemetry: Option<VoiceTelemetry>,
268) -> VoiceTurnHandle {
269 dispatch_voice_turn_sidecar_only_with_classifier(
270 engine,
271 utterance,
272 sidecar_request,
273 None,
274 telemetry,
275 )
276}
277
278pub fn dispatch_voice_turn_sidecar_only_with_classifier(
284 engine: Arc<InferenceEngine>,
285 utterance: String,
286 sidecar_request: GenerateRequest,
287 fetcher: Option<Arc<dyn DirectDataFetcher>>,
288 telemetry: Option<VoiceTelemetry>,
289) -> VoiceTurnHandle {
290 let turn_id = next_turn_id();
291 let cancel = CancellationToken::new();
292 let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
295 drop(fast_tx);
296 let (sidecar_tx, sidecar_rx) = oneshot::channel();
297
298 spawn_sidecar_task_classified(
299 engine,
300 utterance,
301 sidecar_request,
302 sidecar_tx,
303 cancel.clone(),
304 turn_id,
305 fetcher,
306 telemetry,
307 );
308
309 VoiceTurnHandle {
310 control: VoiceTurnControl { turn_id, cancel },
311 fast: fast_rx,
312 sidecar: sidecar_rx,
313 }
314}
315
316fn spawn_fast_task(
317 engine: Arc<InferenceEngine>,
318 request: GenerateRequest,
319 out: mpsc::Sender<StreamEvent>,
320 cancel: CancellationToken,
321 turn_id: u64,
322 telemetry: Option<VoiceTelemetry>,
323) {
324 tokio::spawn(async move {
325 let cancelled_during = tokio::select! {
326 biased;
327 _ = cancel.cancelled() => {
328 tracing::debug!(turn_id, "fast task cancelled before inference start");
329 true
330 }
331 res = engine.generate_tracked_stream(request) => {
332 match res {
333 Ok(mut rx) => {
334 relay_fast_stream(&mut rx, &out, &cancel, turn_id).await;
335 cancel.is_cancelled()
336 }
337 Err(e) => {
338 tracing::error!(turn_id, error=%e, "fast turn inference failed");
339 false
340 }
341 }
342 }
343 };
344 if let Some(t) = telemetry {
345 if cancelled_during {
346 t.emit(
347 EventKind::VoiceTurnCancelled,
348 turn_id,
349 vec![("track", "fast".into())],
350 );
351 } else {
352 t.emit(EventKind::VoiceFastTurnEnded, turn_id, vec![]);
353 }
354 }
355 });
356}
357
358async fn relay_fast_stream(
359 rx: &mut mpsc::Receiver<StreamEvent>,
360 out: &mpsc::Sender<StreamEvent>,
361 cancel: &CancellationToken,
362 turn_id: u64,
363) {
364 loop {
365 tokio::select! {
366 biased;
367 _ = cancel.cancelled() => {
368 tracing::debug!(turn_id, "fast stream cancelled mid-relay");
369 break;
370 }
371 evt = rx.recv() => match evt {
372 Some(e) => {
373 if out.send(e).await.is_err() {
374 break;
376 }
377 }
378 None => break,
379 }
380 }
381 }
382}
383
384fn spawn_sidecar_task_classified(
385 engine: Arc<InferenceEngine>,
386 utterance: String,
387 request: GenerateRequest,
388 sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
389 cancel: CancellationToken,
390 turn_id: u64,
391 fetcher: Option<Arc<dyn DirectDataFetcher>>,
392 telemetry: Option<VoiceTelemetry>,
393) {
394 tokio::spawn(async move {
395 if let Some(f) = fetcher.as_ref() {
397 let fetch_outcome = tokio::select! {
398 biased;
399 _ = cancel.cancelled() => None,
400 outcome = f.try_fetch(&utterance) => outcome,
401 };
402 match fetch_outcome {
403 Some(Ok(text)) => {
404 let result = Ok(SidecarResult {
405 turn_id,
406 text: text.clone(),
407 data: None,
408 });
409 if let Some(t) = telemetry {
410 t.emit(
411 EventKind::VoiceSidecarResolved,
412 turn_id,
413 vec![
414 ("text_len", Value::from(text.len())),
415 ("source", "direct_fetch".into()),
416 ],
417 );
418 }
419 let _ = sender.send(result);
420 return;
421 }
422 Some(Err(e)) => {
423 tracing::debug!(turn_id, error=%e, "DirectDataFetcher errored; falling through to LLM");
424 }
425 None => { }
426 }
427 if cancel.is_cancelled() {
430 let _ = sender.send(Err(VoiceTurnError::Cancelled));
431 if let Some(t) = telemetry {
432 t.emit(
433 EventKind::VoiceTurnCancelled,
434 turn_id,
435 vec![("track", "sidecar".into())],
436 );
437 }
438 return;
439 }
440 }
441 run_llm_sidecar(engine, request, sender, cancel, turn_id, telemetry).await;
442 });
443}
444
445async fn run_llm_sidecar(
446 engine: Arc<InferenceEngine>,
447 request: GenerateRequest,
448 sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
449 cancel: CancellationToken,
450 turn_id: u64,
451 telemetry: Option<VoiceTelemetry>,
452) {
453 let result = tokio::select! {
454 biased;
455 _ = cancel.cancelled() => Err(VoiceTurnError::Cancelled),
456 res = engine.generate(request) => {
457 res.map(|text| SidecarResult { turn_id, text, data: None })
458 .map_err(|e| VoiceTurnError::Inference(e.to_string()))
459 }
460 };
461 if let Some(t) = telemetry {
462 match &result {
463 Ok(r) => t.emit(
464 EventKind::VoiceSidecarResolved,
465 turn_id,
466 vec![("text_len", Value::from(r.text.len()))],
467 ),
468 Err(VoiceTurnError::Cancelled) => {
469 t.emit(
470 EventKind::VoiceTurnCancelled,
471 turn_id,
472 vec![("track", "sidecar".into())],
473 );
474 }
475 Err(VoiceTurnError::Inference(e)) => {
476 t.emit(
477 EventKind::VoiceSidecarFailed,
478 turn_id,
479 vec![("error", Value::from(e.clone()))],
480 );
481 }
482 }
483 }
484 let _ = sender.send(result);
485}
486
487fn spawn_sidecar_task(
488 engine: Arc<InferenceEngine>,
489 request: GenerateRequest,
490 sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
491 cancel: CancellationToken,
492 turn_id: u64,
493 telemetry: Option<VoiceTelemetry>,
494) {
495 tokio::spawn(run_llm_sidecar(
496 engine, request, sender, cancel, turn_id, telemetry,
497 ));
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn turn_ids_are_monotonic_and_unique() {
506 let a = next_turn_id();
507 let b = next_turn_id();
508 let c = next_turn_id();
509 assert!(b > a);
510 assert!(c > b);
511 }
512
513 #[test]
514 fn control_cancel_is_observable() {
515 let control = VoiceTurnControl {
516 turn_id: 42,
517 cancel: CancellationToken::new(),
518 };
519 assert!(!control.is_cancelled());
520 let clone = control.clone();
521 clone.cancel();
522 assert!(control.is_cancelled());
523 }
524
525 #[test]
526 fn handle_turn_id_delegates_to_control() {
527 let (_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
528 let (_stx, sidecar_rx) = oneshot::channel();
529 let handle = VoiceTurnHandle {
530 control: VoiceTurnControl {
531 turn_id: 7,
532 cancel: CancellationToken::new(),
533 },
534 fast: fast_rx,
535 sidecar: sidecar_rx,
536 };
537 assert_eq!(handle.turn_id(), 7);
538 assert!(!handle.control.is_cancelled());
539 handle.cancel();
540 assert!(handle.control.is_cancelled());
541 }
542
543 #[tokio::test]
544 async fn closed_fast_channel_recv_is_none() {
545 let (fast_tx, mut fast_rx) = mpsc::channel::<StreamEvent>(1);
549 drop(fast_tx);
550 assert!(fast_rx.recv().await.is_none());
551 }
552
553 #[tokio::test]
554 async fn cancellation_propagates_to_relay_fast_stream() {
555 let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
559 let (out_tx, mut out_rx) = mpsc::channel::<StreamEvent>(8);
560 let cancel = CancellationToken::new();
561
562 let producer = tokio::spawn(async move {
565 for i in 0..100u32 {
566 if in_tx
567 .send(StreamEvent::TextDelta(format!("d{i}")))
568 .await
569 .is_err()
570 {
571 break;
572 }
573 }
574 });
575
576 let cancel_clone = cancel.clone();
577 let relay = tokio::spawn(async move {
578 relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
579 });
580
581 let first = out_rx.recv().await.expect("first event");
583 match first {
584 StreamEvent::TextDelta(_) => {}
585 other => panic!("unexpected event: {other:?}"),
586 }
587 cancel.cancel();
588
589 tokio::time::timeout(std::time::Duration::from_secs(1), relay)
591 .await
592 .expect("relay did not exit after cancel")
593 .expect("relay panicked");
594
595 producer.abort();
596 }
597
598 #[tokio::test]
599 async fn direct_fetcher_hit_skips_llm_and_resolves_sidecar() {
600 struct Hit;
601 #[async_trait::async_trait]
602 impl DirectDataFetcher for Hit {
603 async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
604 Some(Ok("3 emails: Bob, Alice, Carol".to_string()))
605 }
606 }
607 let cancel = CancellationToken::new();
608 let (tx, rx) = oneshot::channel();
609 let log = Arc::new(Mutex::new(EventLog::new()));
610 let telemetry = VoiceTelemetry::new(log.clone());
611 let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
615 car_inference::InferenceConfig::default(),
616 ));
617 spawn_sidecar_task_classified(
618 dummy_engine,
619 "any new email today".to_string(),
620 GenerateRequest::default(),
621 tx,
622 cancel,
623 99,
624 Some(Arc::new(Hit)),
625 Some(telemetry),
626 );
627 let r = rx.await.expect("oneshot delivered").expect("ok");
628 assert_eq!(r.turn_id, 99);
629 assert_eq!(r.text, "3 emails: Bob, Alice, Carol");
630 let g = log.lock().unwrap();
632 let evt = g.events().last().expect("event emitted");
633 assert_eq!(evt.kind, EventKind::VoiceSidecarResolved);
634 assert_eq!(evt.data.get("source"), Some(&Value::from("direct_fetch")));
635 }
636
637 #[tokio::test]
638 async fn direct_fetcher_miss_falls_through_but_we_observe_no_short_circuit() {
639 struct Miss;
646 #[async_trait::async_trait]
647 impl DirectDataFetcher for Miss {
648 async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
649 None
650 }
651 }
652 let cancel = CancellationToken::new();
653 let (tx, rx) = oneshot::channel();
654 let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
655 car_inference::InferenceConfig::default(),
656 ));
657 spawn_sidecar_task_classified(
658 dummy_engine,
659 "what's the weather".to_string(),
660 GenerateRequest::default(),
661 tx,
662 cancel.clone(),
663 100,
664 Some(Arc::new(Miss)),
665 None,
666 );
667 cancel.cancel();
669 match rx.await.expect("oneshot delivered") {
670 Err(VoiceTurnError::Cancelled) => {}
671 other => panic!("expected Cancelled after fetcher miss + cancel, got {other:?}"),
672 }
673 }
674
675 #[test]
676 fn telemetry_emit_appends_to_eventlog() {
677 let log = Arc::new(Mutex::new(EventLog::new()));
678 let telemetry = VoiceTelemetry::new(log.clone());
679 telemetry.emit(EventKind::VoiceFastTurnStarted, 7, vec![]);
680 telemetry.emit(
681 EventKind::VoiceSidecarResolved,
682 7,
683 vec![("text_len", Value::from(42usize))],
684 );
685 let g = log.lock().unwrap();
686 let events = g.events();
687 assert_eq!(events.len(), 2);
688 assert_eq!(events[0].kind, EventKind::VoiceFastTurnStarted);
689 assert_eq!(events[0].data.get("turn_id"), Some(&Value::from(7u64)));
690 assert_eq!(events[1].kind, EventKind::VoiceSidecarResolved);
691 assert_eq!(events[1].data.get("text_len"), Some(&Value::from(42usize)));
692 }
693
694 #[tokio::test]
695 async fn dropped_out_channel_stops_relay_without_cancel() {
696 let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
697 let (out_tx, out_rx) = mpsc::channel::<StreamEvent>(8);
698 let cancel = CancellationToken::new();
699
700 drop(out_rx);
703
704 let cancel_clone = cancel.clone();
705 let relay = tokio::spawn(async move {
706 relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
707 });
708
709 in_tx
711 .send(StreamEvent::TextDelta("x".into()))
712 .await
713 .unwrap();
714
715 tokio::time::timeout(std::time::Duration::from_secs(1), relay)
716 .await
717 .expect("relay did not exit after out_rx drop")
718 .expect("relay panicked");
719 }
720}