Skip to main content

car_engine/
voice_turn.rs

1//! Two-track voice turn orchestration.
2//!
3//! Quip's `SidecarReasoningService` + `MediaPlatformService.HandleUtteranceAsync`
4//! ported to CAR. See [`docs/proposals/voice-sidecar-orchestration.md`] and
5//! [`docs/proposals/voice-sidecar-orchestration-plan.md`] for the design
6//! rationale and implementation plan.
7//!
8//! The two-track shape:
9//!
10//! ```text
11//!                      user utterance
12//!                            │
13//!                            ▼
14//!           ┌────────────────────────────────────┐
15//!           │                                    │
16//!        FAST_TURN_1 (<500ms first audio)    SIDECAR
17//!        streaming LLM, voice-context        full LLM with tools (2-8s)
18//!           │                                    │
19//!           ▼                                    │
20//!        TTS streaming → play              oneshot::Sender<SidecarResult>
21//! ```
22//!
23//! The caller obtains a [`VoiceTurnHandle`], drains the fast stream into
24//! TTS immediately, awaits the sidecar with a timeout, and plays the
25//! result if it arrives in time. [`VoiceTurnControl`] is a cheap clonable
26//! handle for cross-task cancellation (see plan §6.8).
27
28use std::collections::HashMap;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::sync::{Arc, Mutex};
31
32use car_eventlog::{EventKind, EventLog};
33use car_inference::{GenerateRequest, InferenceEngine, StreamEvent};
34use serde_json::Value;
35use tokio::sync::{mpsc, oneshot};
36use tokio_util::sync::CancellationToken;
37
38/// Mint a new turn id. Process-wide, monotonic, never reused.
39fn next_turn_id() -> u64 {
40    static COUNTER: AtomicU64 = AtomicU64::new(1);
41    COUNTER.fetch_add(1, Ordering::Relaxed)
42}
43
44/// Result delivered by the sidecar track once the substantive answer
45/// has been produced (or a fast-data path has hit).
46#[derive(Debug, Clone)]
47pub struct SidecarResult {
48    /// The turn this result belongs to. Callers MUST gate playback
49    /// on `result.turn_id == current_turn_id` so stale results from
50    /// superseded turns are dropped after a barge-in.
51    pub turn_id: u64,
52    /// Substantive answer to play through TTS.
53    pub text: String,
54    /// Optional structured data (tool results, etc.) for callers that
55    /// want to render something richer than text-to-speech.
56    pub data: Option<serde_json::Value>,
57}
58
59/// Failure modes distinguishable for telemetry / fallback policy.
60#[derive(Debug, thiserror::Error)]
61pub enum VoiceTurnError {
62    /// The underlying inference call failed. The string carries the
63    /// upstream error's display form (we don't bubble the concrete
64    /// `InferenceError` to keep this enum FFI-friendly).
65    #[error("inference failed: {0}")]
66    Inference(String),
67    /// The turn was cancelled — typically a barge-in or supersession
68    /// by a newer utterance.
69    #[error("turn cancelled (barge-in or supersession)")]
70    Cancelled,
71}
72
73/// Cheap, clonable control surface for an in-flight voice turn.
74///
75/// Stored separately from the receivers so an orchestrator can keep a
76/// copy in `Mutex<Option<VoiceTurnControl>>` and cancel from a different
77/// task than the one driving the streams. See plan §6.8.
78#[derive(Clone)]
79pub struct VoiceTurnControl {
80    /// Unique id for this turn; threaded through [`SidecarResult`] and
81    /// telemetry. Use it to drop stale results after barge-in.
82    pub turn_id: u64,
83    cancel: CancellationToken,
84}
85
86impl VoiceTurnControl {
87    /// Cancel the in-flight turn. Idempotent.
88    pub fn cancel(&self) {
89        self.cancel.cancel();
90    }
91
92    /// Has the turn been cancelled?
93    pub fn is_cancelled(&self) -> bool {
94        self.cancel.is_cancelled()
95    }
96}
97
98/// Two-track handle returned by [`dispatch_voice_turn`].
99///
100/// `fast` streams [`StreamEvent`]s for the fast track (text deltas,
101/// tool-call markers, usage, done). `sidecar` resolves once with the
102/// substantive answer or an error. `control` is a cheap clonable
103/// surface that can be cached by the orchestrator for cancellation
104/// from another task.
105///
106/// Caller MUST drive both receivers. Dropping `fast` cancels the fast
107/// task; dropping `sidecar` cancels the sidecar task. To cancel both
108/// without driving them, call [`Self::cancel`] (or
109/// [`VoiceTurnControl::cancel`] on a stored clone).
110pub struct VoiceTurnHandle {
111    pub control: VoiceTurnControl,
112    pub fast: mpsc::Receiver<StreamEvent>,
113    pub sidecar: oneshot::Receiver<Result<SidecarResult, VoiceTurnError>>,
114}
115
116impl VoiceTurnHandle {
117    /// Convenience accessor for `self.control.turn_id`.
118    pub fn turn_id(&self) -> u64 {
119        self.control.turn_id
120    }
121
122    /// Cancel both tracks. Idempotent.
123    pub fn cancel(&self) {
124        self.control.cancel();
125    }
126}
127
128/// Foreign-implemented data fetcher. When attached, the sidecar task
129/// gives this fetcher the utterance first; on `Some(Ok(text))` the
130/// LLM is skipped entirely and the text becomes the sidecar's answer.
131///
132/// On `None` (no match) or `Some(Err(e))` (match but fetch failed),
133/// the sidecar falls through to the LLM path. Use this for the
134/// "fast data path" optimization in quip's playbook: email/calendar
135/// queries that are pure data lookups don't need an LLM round trip.
136///
137/// Implementations are typically host-side (calendar API, mail API)
138/// and should already format their output for voice narration —
139/// short, no markdown. See `car_voice::format_for_voice` for a
140/// minimal helper.
141#[async_trait::async_trait]
142pub trait DirectDataFetcher: Send + Sync {
143    /// Try to satisfy the utterance directly without invoking the LLM.
144    ///
145    /// - `Some(Ok(text))` → use this as the sidecar answer.
146    /// - `Some(Err(msg))` → match recognised but fetch failed; fall
147    ///   through to LLM (msg is logged, not surfaced).
148    /// - `None` → not a fast-data candidate; fall through to LLM.
149    async fn try_fetch(&self, utterance: &str) -> Option<Result<String, String>>;
150}
151
152/// Optional telemetry sink for voice turn events. Wraps an
153/// `Arc<Mutex<EventLog>>` so emissions are thread-safe across the
154/// fast and sidecar tasks. Pass `None` to disable.
155#[derive(Clone)]
156pub struct VoiceTelemetry {
157    log: Arc<Mutex<EventLog>>,
158}
159
160impl VoiceTelemetry {
161    /// Wrap an existing event log for voice telemetry.
162    pub fn new(log: Arc<Mutex<EventLog>>) -> Self {
163        Self { log }
164    }
165
166    /// Append a voice eventlog entry. Callers in `car-voice`'s
167    /// orchestrator emit `VoiceBridgePlayed` and `VoiceSidecarTimedOut`
168    /// through this; the engine-side spawn helpers emit the rest.
169    pub fn emit(&self, kind: EventKind, turn_id: u64, extra: Vec<(&str, Value)>) {
170        let mut data: HashMap<String, Value> = HashMap::new();
171        data.insert("turn_id".to_string(), Value::from(turn_id));
172        for (k, v) in extra {
173            data.insert(k.to_string(), v);
174        }
175        if let Ok(mut guard) = self.log.lock() {
176            guard.append(kind, None, None, data);
177        }
178    }
179}
180
181/// Dispatch a voice-turn utterance with a fast-and-sidecar split.
182///
183/// `engine` runs both inference calls. `fast_request` is streamed
184/// through the returned `VoiceTurnHandle::fast` channel; `sidecar_request`
185/// runs to completion and resolves the `sidecar` oneshot once.
186///
187/// Both inference requests must be pre-composed by the caller — including
188/// any voice-context overlay (use [`car_voice::compose_voice_context`]).
189/// This crate stays neutral on prompt construction.
190///
191/// Wraps [`dispatch_voice_turn_with_telemetry`] with no telemetry — use
192/// the `_with_telemetry` variant when you want eventlog emissions.
193pub fn dispatch_voice_turn(
194    engine: Arc<InferenceEngine>,
195    utterance: String,
196    fast_request: GenerateRequest,
197    sidecar_request: GenerateRequest,
198) -> VoiceTurnHandle {
199    dispatch_voice_turn_with_telemetry(engine, utterance, fast_request, sidecar_request, None)
200}
201
202/// Same as [`dispatch_voice_turn`] but emits voice eventlog kinds
203/// (`VoiceFastTurnStarted`, `VoiceFastTurnEnded`, `VoiceSidecarResolved`,
204/// `VoiceSidecarFailed`, `VoiceSidecarTimedOut`, `VoiceTurnCancelled`)
205/// to the supplied telemetry sink as the turn progresses.
206pub fn dispatch_voice_turn_with_telemetry(
207    engine: Arc<InferenceEngine>,
208    _utterance: String,
209    fast_request: GenerateRequest,
210    sidecar_request: GenerateRequest,
211    telemetry: Option<VoiceTelemetry>,
212) -> VoiceTurnHandle {
213    let turn_id = next_turn_id();
214    let cancel = CancellationToken::new();
215    let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(64);
216    let (sidecar_tx, sidecar_rx) = oneshot::channel();
217
218    if let Some(t) = telemetry.as_ref() {
219        t.emit(EventKind::VoiceFastTurnStarted, turn_id, vec![]);
220    }
221
222    spawn_fast_task(
223        engine.clone(),
224        fast_request,
225        fast_tx,
226        cancel.clone(),
227        turn_id,
228        telemetry.clone(),
229    );
230    spawn_sidecar_task(
231        engine,
232        sidecar_request,
233        sidecar_tx,
234        cancel.clone(),
235        turn_id,
236        telemetry,
237    );
238
239    VoiceTurnHandle {
240        control: VoiceTurnControl { turn_id, cancel },
241        fast: fast_rx,
242        sidecar: sidecar_rx,
243    }
244}
245
246/// Variant of [`dispatch_voice_turn`] that skips the fast inference call.
247///
248/// Use when the caller has already played a hardcoded bridge phrase
249/// (the "STRUCTURAL HALLUCINATION FIX" in quip's terms) and only needs
250/// the sidecar's substantive answer. The returned handle's `fast`
251/// channel is pre-closed so callers don't have to special-case the
252/// shape.
253pub fn dispatch_voice_turn_sidecar_only(
254    engine: Arc<InferenceEngine>,
255    utterance: String,
256    sidecar_request: GenerateRequest,
257) -> VoiceTurnHandle {
258    dispatch_voice_turn_sidecar_only_with_telemetry(engine, utterance, sidecar_request, None)
259}
260
261/// Same as [`dispatch_voice_turn_sidecar_only`] but emits voice
262/// eventlog kinds for the sidecar half of the turn.
263pub fn dispatch_voice_turn_sidecar_only_with_telemetry(
264    engine: Arc<InferenceEngine>,
265    utterance: String,
266    sidecar_request: GenerateRequest,
267    telemetry: Option<VoiceTelemetry>,
268) -> VoiceTurnHandle {
269    dispatch_voice_turn_sidecar_only_with_classifier(
270        engine,
271        utterance,
272        sidecar_request,
273        None,
274        telemetry,
275    )
276}
277
278/// Variant of [`dispatch_voice_turn_sidecar_only`] that consults a
279/// [`DirectDataFetcher`] before invoking the LLM. On a fetcher hit
280/// the LLM call is skipped entirely — the fetcher's text becomes
281/// the sidecar's `SidecarResult`. On miss, falls through to the
282/// LLM-based sidecar exactly like the bare `_with_telemetry` variant.
283pub fn dispatch_voice_turn_sidecar_only_with_classifier(
284    engine: Arc<InferenceEngine>,
285    utterance: String,
286    sidecar_request: GenerateRequest,
287    fetcher: Option<Arc<dyn DirectDataFetcher>>,
288    telemetry: Option<VoiceTelemetry>,
289) -> VoiceTurnHandle {
290    let turn_id = next_turn_id();
291    let cancel = CancellationToken::new();
292    // Closed channel — recv() returns None immediately. Capacity 1 is
293    // arbitrary; we never send anything.
294    let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
295    drop(fast_tx);
296    let (sidecar_tx, sidecar_rx) = oneshot::channel();
297
298    spawn_sidecar_task_classified(
299        engine,
300        utterance,
301        sidecar_request,
302        sidecar_tx,
303        cancel.clone(),
304        turn_id,
305        fetcher,
306        telemetry,
307    );
308
309    VoiceTurnHandle {
310        control: VoiceTurnControl { turn_id, cancel },
311        fast: fast_rx,
312        sidecar: sidecar_rx,
313    }
314}
315
316fn spawn_fast_task(
317    engine: Arc<InferenceEngine>,
318    request: GenerateRequest,
319    out: mpsc::Sender<StreamEvent>,
320    cancel: CancellationToken,
321    turn_id: u64,
322    telemetry: Option<VoiceTelemetry>,
323) {
324    tokio::spawn(async move {
325        let cancelled_during = tokio::select! {
326            biased;
327            _ = cancel.cancelled() => {
328                tracing::debug!(turn_id, "fast task cancelled before inference start");
329                true
330            }
331            res = engine.generate_tracked_stream(request) => {
332                match res {
333                    Ok(mut rx) => {
334                        relay_fast_stream(&mut rx, &out, &cancel, turn_id).await;
335                        cancel.is_cancelled()
336                    }
337                    Err(e) => {
338                        tracing::error!(turn_id, error=%e, "fast turn inference failed");
339                        false
340                    }
341                }
342            }
343        };
344        if let Some(t) = telemetry {
345            if cancelled_during {
346                t.emit(
347                    EventKind::VoiceTurnCancelled,
348                    turn_id,
349                    vec![("track", "fast".into())],
350                );
351            } else {
352                t.emit(EventKind::VoiceFastTurnEnded, turn_id, vec![]);
353            }
354        }
355    });
356}
357
358async fn relay_fast_stream(
359    rx: &mut mpsc::Receiver<StreamEvent>,
360    out: &mpsc::Sender<StreamEvent>,
361    cancel: &CancellationToken,
362    turn_id: u64,
363) {
364    loop {
365        tokio::select! {
366            biased;
367            _ = cancel.cancelled() => {
368                tracing::debug!(turn_id, "fast stream cancelled mid-relay");
369                break;
370            }
371            evt = rx.recv() => match evt {
372                Some(e) => {
373                    if out.send(e).await.is_err() {
374                        // Receiver dropped — caller has stopped listening.
375                        break;
376                    }
377                }
378                None => break,
379            }
380        }
381    }
382}
383
384fn spawn_sidecar_task_classified(
385    engine: Arc<InferenceEngine>,
386    utterance: String,
387    request: GenerateRequest,
388    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
389    cancel: CancellationToken,
390    turn_id: u64,
391    fetcher: Option<Arc<dyn DirectDataFetcher>>,
392    telemetry: Option<VoiceTelemetry>,
393) {
394    tokio::spawn(async move {
395        // Try the direct fetcher first. On hit, skip the LLM entirely.
396        if let Some(f) = fetcher.as_ref() {
397            let fetch_outcome = tokio::select! {
398                biased;
399                _ = cancel.cancelled() => None,
400                outcome = f.try_fetch(&utterance) => outcome,
401            };
402            match fetch_outcome {
403                Some(Ok(text)) => {
404                    let result = Ok(SidecarResult {
405                        turn_id,
406                        text: text.clone(),
407                        data: None,
408                    });
409                    if let Some(t) = telemetry {
410                        t.emit(
411                            EventKind::VoiceSidecarResolved,
412                            turn_id,
413                            vec![
414                                ("text_len", Value::from(text.len())),
415                                ("source", "direct_fetch".into()),
416                            ],
417                        );
418                    }
419                    let _ = sender.send(result);
420                    return;
421                }
422                Some(Err(e)) => {
423                    tracing::debug!(turn_id, error=%e, "DirectDataFetcher errored; falling through to LLM");
424                }
425                None => { /* no match — fall through */ }
426            }
427            // If the fetch attempt was cancelled (the cancel branch ran),
428            // honour that immediately rather than starting an LLM call.
429            if cancel.is_cancelled() {
430                let _ = sender.send(Err(VoiceTurnError::Cancelled));
431                if let Some(t) = telemetry {
432                    t.emit(
433                        EventKind::VoiceTurnCancelled,
434                        turn_id,
435                        vec![("track", "sidecar".into())],
436                    );
437                }
438                return;
439            }
440        }
441        run_llm_sidecar(engine, request, sender, cancel, turn_id, telemetry).await;
442    });
443}
444
445async fn run_llm_sidecar(
446    engine: Arc<InferenceEngine>,
447    request: GenerateRequest,
448    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
449    cancel: CancellationToken,
450    turn_id: u64,
451    telemetry: Option<VoiceTelemetry>,
452) {
453    let result = tokio::select! {
454        biased;
455        _ = cancel.cancelled() => Err(VoiceTurnError::Cancelled),
456        res = engine.generate(request) => {
457            res.map(|text| SidecarResult { turn_id, text, data: None })
458               .map_err(|e| VoiceTurnError::Inference(e.to_string()))
459        }
460    };
461    if let Some(t) = telemetry {
462        match &result {
463            Ok(r) => t.emit(
464                EventKind::VoiceSidecarResolved,
465                turn_id,
466                vec![("text_len", Value::from(r.text.len()))],
467            ),
468            Err(VoiceTurnError::Cancelled) => {
469                t.emit(
470                    EventKind::VoiceTurnCancelled,
471                    turn_id,
472                    vec![("track", "sidecar".into())],
473                );
474            }
475            Err(VoiceTurnError::Inference(e)) => {
476                t.emit(
477                    EventKind::VoiceSidecarFailed,
478                    turn_id,
479                    vec![("error", Value::from(e.clone()))],
480                );
481            }
482        }
483    }
484    let _ = sender.send(result);
485}
486
487fn spawn_sidecar_task(
488    engine: Arc<InferenceEngine>,
489    request: GenerateRequest,
490    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
491    cancel: CancellationToken,
492    turn_id: u64,
493    telemetry: Option<VoiceTelemetry>,
494) {
495    tokio::spawn(run_llm_sidecar(
496        engine, request, sender, cancel, turn_id, telemetry,
497    ));
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn turn_ids_are_monotonic_and_unique() {
506        let a = next_turn_id();
507        let b = next_turn_id();
508        let c = next_turn_id();
509        assert!(b > a);
510        assert!(c > b);
511    }
512
513    #[test]
514    fn control_cancel_is_observable() {
515        let control = VoiceTurnControl {
516            turn_id: 42,
517            cancel: CancellationToken::new(),
518        };
519        assert!(!control.is_cancelled());
520        let clone = control.clone();
521        clone.cancel();
522        assert!(control.is_cancelled());
523    }
524
525    #[test]
526    fn handle_turn_id_delegates_to_control() {
527        let (_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
528        let (_stx, sidecar_rx) = oneshot::channel();
529        let handle = VoiceTurnHandle {
530            control: VoiceTurnControl {
531                turn_id: 7,
532                cancel: CancellationToken::new(),
533            },
534            fast: fast_rx,
535            sidecar: sidecar_rx,
536        };
537        assert_eq!(handle.turn_id(), 7);
538        assert!(!handle.control.is_cancelled());
539        handle.cancel();
540        assert!(handle.control.is_cancelled());
541    }
542
543    #[tokio::test]
544    async fn closed_fast_channel_recv_is_none() {
545        // Mirrors the channel shape that `dispatch_voice_turn_sidecar_only`
546        // produces — a pre-closed mpsc so callers can `.recv()` without
547        // special-casing the sidecar-only flavour.
548        let (fast_tx, mut fast_rx) = mpsc::channel::<StreamEvent>(1);
549        drop(fast_tx);
550        assert!(fast_rx.recv().await.is_none());
551    }
552
553    #[tokio::test]
554    async fn cancellation_propagates_to_relay_fast_stream() {
555        // Wire a producer rx, an out channel, and a cancellation token.
556        // Cancelling the token should stop the relay even while the
557        // producer is still alive.
558        let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
559        let (out_tx, mut out_rx) = mpsc::channel::<StreamEvent>(8);
560        let cancel = CancellationToken::new();
561
562        // Producer pushes until the channel fills. Bounded mpsc back-pressure
563        // means it will block; we cancel the relay before it drains.
564        let producer = tokio::spawn(async move {
565            for i in 0..100u32 {
566                if in_tx
567                    .send(StreamEvent::TextDelta(format!("d{i}")))
568                    .await
569                    .is_err()
570                {
571                    break;
572                }
573            }
574        });
575
576        let cancel_clone = cancel.clone();
577        let relay = tokio::spawn(async move {
578            relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
579        });
580
581        // Drain at least one event, then cancel.
582        let first = out_rx.recv().await.expect("first event");
583        match first {
584            StreamEvent::TextDelta(_) => {}
585            other => panic!("unexpected event: {other:?}"),
586        }
587        cancel.cancel();
588
589        // Relay should exit promptly.
590        tokio::time::timeout(std::time::Duration::from_secs(1), relay)
591            .await
592            .expect("relay did not exit after cancel")
593            .expect("relay panicked");
594
595        producer.abort();
596    }
597
598    #[tokio::test]
599    async fn direct_fetcher_hit_skips_llm_and_resolves_sidecar() {
600        struct Hit;
601        #[async_trait::async_trait]
602        impl DirectDataFetcher for Hit {
603            async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
604                Some(Ok("3 emails: Bob, Alice, Carol".to_string()))
605            }
606        }
607        let cancel = CancellationToken::new();
608        let (tx, rx) = oneshot::channel();
609        let log = Arc::new(Mutex::new(EventLog::new()));
610        let telemetry = VoiceTelemetry::new(log.clone());
611        // We never reach the engine — its generate() would panic on a
612        // bare InferenceEngine without backends. The fetcher hit path
613        // must short-circuit before that.
614        let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
615            car_inference::InferenceConfig::default(),
616        ));
617        spawn_sidecar_task_classified(
618            dummy_engine,
619            "any new email today".to_string(),
620            GenerateRequest::default(),
621            tx,
622            cancel,
623            99,
624            Some(Arc::new(Hit)),
625            Some(telemetry),
626        );
627        let r = rx.await.expect("oneshot delivered").expect("ok");
628        assert_eq!(r.turn_id, 99);
629        assert_eq!(r.text, "3 emails: Bob, Alice, Carol");
630        // Resolved event tagged source=direct_fetch.
631        let g = log.lock().unwrap();
632        let evt = g.events().last().expect("event emitted");
633        assert_eq!(evt.kind, EventKind::VoiceSidecarResolved);
634        assert_eq!(evt.data.get("source"), Some(&Value::from("direct_fetch")));
635    }
636
637    #[tokio::test]
638    async fn direct_fetcher_miss_falls_through_but_we_observe_no_short_circuit() {
639        // We only verify that the fetch path enters the LLM branch by
640        // confirming the spawn task hasn't resolved the oneshot via the
641        // fetcher hit path. We can't drive a real LLM here, so we
642        // cancel quickly and observe the Cancelled error instead of a
643        // SidecarResult — proves the fetcher's None did not produce a
644        // direct-resolve.
645        struct Miss;
646        #[async_trait::async_trait]
647        impl DirectDataFetcher for Miss {
648            async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
649                None
650            }
651        }
652        let cancel = CancellationToken::new();
653        let (tx, rx) = oneshot::channel();
654        let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
655            car_inference::InferenceConfig::default(),
656        ));
657        spawn_sidecar_task_classified(
658            dummy_engine,
659            "what's the weather".to_string(),
660            GenerateRequest::default(),
661            tx,
662            cancel.clone(),
663            100,
664            Some(Arc::new(Miss)),
665            None,
666        );
667        // Cancel before the LLM has a chance to fail; we expect Cancelled.
668        cancel.cancel();
669        match rx.await.expect("oneshot delivered") {
670            Err(VoiceTurnError::Cancelled) => {}
671            other => panic!("expected Cancelled after fetcher miss + cancel, got {other:?}"),
672        }
673    }
674
675    #[test]
676    fn telemetry_emit_appends_to_eventlog() {
677        let log = Arc::new(Mutex::new(EventLog::new()));
678        let telemetry = VoiceTelemetry::new(log.clone());
679        telemetry.emit(EventKind::VoiceFastTurnStarted, 7, vec![]);
680        telemetry.emit(
681            EventKind::VoiceSidecarResolved,
682            7,
683            vec![("text_len", Value::from(42usize))],
684        );
685        let g = log.lock().unwrap();
686        let events = g.events();
687        assert_eq!(events.len(), 2);
688        assert_eq!(events[0].kind, EventKind::VoiceFastTurnStarted);
689        assert_eq!(events[0].data.get("turn_id"), Some(&Value::from(7u64)));
690        assert_eq!(events[1].kind, EventKind::VoiceSidecarResolved);
691        assert_eq!(events[1].data.get("text_len"), Some(&Value::from(42usize)));
692    }
693
694    #[tokio::test]
695    async fn dropped_out_channel_stops_relay_without_cancel() {
696        let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
697        let (out_tx, out_rx) = mpsc::channel::<StreamEvent>(8);
698        let cancel = CancellationToken::new();
699
700        // Caller drops their receiver — the relay should exit on the
701        // next send attempt.
702        drop(out_rx);
703
704        let cancel_clone = cancel.clone();
705        let relay = tokio::spawn(async move {
706            relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
707        });
708
709        // Push one event so the relay's send fails.
710        in_tx
711            .send(StreamEvent::TextDelta("x".into()))
712            .await
713            .unwrap();
714
715        tokio::time::timeout(std::time::Duration::from_secs(1), relay)
716            .await
717            .expect("relay did not exit after out_rx drop")
718            .expect("relay panicked");
719    }
720}