Skip to main content

car_engine/
voice_turn.rs

1//! Two-track voice turn orchestration.
2//!
3//! Quip's `SidecarReasoningService` + `MediaPlatformService.HandleUtteranceAsync`
4//! ported to CAR. See [`docs/proposals/voice-sidecar-orchestration.md`] and
5//! [`docs/proposals/voice-sidecar-orchestration-plan.md`] for the design
6//! rationale and implementation plan.
7//!
8//! The two-track shape:
9//!
10//! ```text
11//!                      user utterance
12//!                            │
13//!                            ▼
14//!           ┌────────────────────────────────────┐
15//!           │                                    │
16//!        FAST_TURN_1 (<500ms first audio)    SIDECAR
17//!        streaming LLM, voice-context        full LLM with tools (2-8s)
18//!           │                                    │
19//!           ▼                                    │
20//!        TTS streaming → play              oneshot::Sender<SidecarResult>
21//! ```
22//!
23//! The caller obtains a [`VoiceTurnHandle`], drains the fast stream into
24//! TTS immediately, awaits the sidecar with a timeout, and plays the
25//! result if it arrives in time. [`VoiceTurnControl`] is a cheap clonable
26//! handle for cross-task cancellation (see plan §6.8).
27
28use std::collections::HashMap;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::sync::{Arc, Mutex};
31
32use car_eventlog::{EventKind, EventLog};
33use car_inference::{GenerateRequest, InferenceEngine, StreamEvent};
34use serde_json::Value;
35use tokio::sync::{mpsc, oneshot};
36use tokio_util::sync::CancellationToken;
37
38/// Mint a new turn id. Process-wide, monotonic, never reused.
39fn next_turn_id() -> u64 {
40    static COUNTER: AtomicU64 = AtomicU64::new(1);
41    COUNTER.fetch_add(1, Ordering::Relaxed)
42}
43
44/// Result delivered by the sidecar track once the substantive answer
45/// has been produced (or a fast-data path has hit).
46#[derive(Debug, Clone)]
47pub struct SidecarResult {
48    /// The turn this result belongs to. Callers MUST gate playback
49    /// on `result.turn_id == current_turn_id` so stale results from
50    /// superseded turns are dropped after a barge-in.
51    pub turn_id: u64,
52    /// Substantive answer to play through TTS.
53    pub text: String,
54    /// Optional structured data (tool results, etc.) for callers that
55    /// want to render something richer than text-to-speech.
56    pub data: Option<serde_json::Value>,
57}
58
59/// Failure modes distinguishable for telemetry / fallback policy.
60#[derive(Debug, thiserror::Error)]
61pub enum VoiceTurnError {
62    /// The underlying inference call failed. The string carries the
63    /// upstream error's display form (we don't bubble the concrete
64    /// `InferenceError` to keep this enum FFI-friendly).
65    #[error("inference failed: {0}")]
66    Inference(String),
67    /// The turn was cancelled — typically a barge-in or supersession
68    /// by a newer utterance.
69    #[error("turn cancelled (barge-in or supersession)")]
70    Cancelled,
71}
72
73/// Cheap, clonable control surface for an in-flight voice turn.
74///
75/// Stored separately from the receivers so an orchestrator can keep a
76/// copy in `Mutex<Option<VoiceTurnControl>>` and cancel from a different
77/// task than the one driving the streams. See plan §6.8.
78#[derive(Clone)]
79pub struct VoiceTurnControl {
80    /// Unique id for this turn; threaded through [`SidecarResult`] and
81    /// telemetry. Use it to drop stale results after barge-in.
82    pub turn_id: u64,
83    cancel: CancellationToken,
84}
85
86impl VoiceTurnControl {
87    /// Cancel the in-flight turn. Idempotent.
88    pub fn cancel(&self) {
89        self.cancel.cancel();
90    }
91
92    /// Has the turn been cancelled?
93    pub fn is_cancelled(&self) -> bool {
94        self.cancel.is_cancelled()
95    }
96}
97
98/// Two-track handle returned by [`dispatch_voice_turn`].
99///
100/// `fast` streams [`StreamEvent`]s for the fast track (text deltas,
101/// tool-call markers, usage, done). `sidecar` resolves once with the
102/// substantive answer or an error. `control` is a cheap clonable
103/// surface that can be cached by the orchestrator for cancellation
104/// from another task.
105///
106/// Caller MUST drive both receivers. Dropping `fast` cancels the fast
107/// task; dropping `sidecar` cancels the sidecar task. To cancel both
108/// without driving them, call [`Self::cancel`] (or
109/// [`VoiceTurnControl::cancel`] on a stored clone).
110pub struct VoiceTurnHandle {
111    pub control: VoiceTurnControl,
112    pub fast: mpsc::Receiver<StreamEvent>,
113    pub sidecar: oneshot::Receiver<Result<SidecarResult, VoiceTurnError>>,
114}
115
116impl VoiceTurnHandle {
117    /// Convenience accessor for `self.control.turn_id`.
118    pub fn turn_id(&self) -> u64 {
119        self.control.turn_id
120    }
121
122    /// Cancel both tracks. Idempotent.
123    pub fn cancel(&self) {
124        self.control.cancel();
125    }
126}
127
128/// Foreign-implemented data fetcher. When attached, the sidecar task
129/// gives this fetcher the utterance first; on `Some(Ok(text))` the
130/// LLM is skipped entirely and the text becomes the sidecar's answer.
131///
132/// On `None` (no match) or `Some(Err(e))` (match but fetch failed),
133/// the sidecar falls through to the LLM path. Use this for the
134/// "fast data path" optimization in quip's playbook: email/calendar
135/// queries that are pure data lookups don't need an LLM round trip.
136///
137/// Implementations are typically host-side (calendar API, mail API)
138/// and should already format their output for voice narration —
139/// short, no markdown. See `car_voice::format_for_voice` for a
140/// minimal helper.
141#[async_trait::async_trait]
142pub trait DirectDataFetcher: Send + Sync {
143    /// Try to satisfy the utterance directly without invoking the LLM.
144    ///
145    /// - `Some(Ok(text))` → use this as the sidecar answer.
146    /// - `Some(Err(msg))` → match recognised but fetch failed; fall
147    ///   through to LLM (msg is logged, not surfaced).
148    /// - `None` → not a fast-data candidate; fall through to LLM.
149    async fn try_fetch(&self, utterance: &str) -> Option<Result<String, String>>;
150}
151
152/// Optional telemetry sink for voice turn events. Wraps an
153/// `Arc<Mutex<EventLog>>` so emissions are thread-safe across the
154/// fast and sidecar tasks. Pass `None` to disable.
155#[derive(Clone)]
156pub struct VoiceTelemetry {
157    log: Arc<Mutex<EventLog>>,
158}
159
160impl VoiceTelemetry {
161    /// Wrap an existing event log for voice telemetry.
162    pub fn new(log: Arc<Mutex<EventLog>>) -> Self {
163        Self { log }
164    }
165
166    /// Append a voice eventlog entry. Callers in `car-voice`'s
167    /// orchestrator emit `VoiceBridgePlayed` and `VoiceSidecarTimedOut`
168    /// through this; the engine-side spawn helpers emit the rest.
169    pub fn emit(&self, kind: EventKind, turn_id: u64, extra: Vec<(&str, Value)>) {
170        let mut data: HashMap<String, Value> = HashMap::new();
171        data.insert("turn_id".to_string(), Value::from(turn_id));
172        for (k, v) in extra {
173            data.insert(k.to_string(), v);
174        }
175        if let Ok(mut guard) = self.log.lock() {
176            guard.append(kind, None, None, data);
177        }
178    }
179}
180
181/// Dispatch a voice-turn utterance with a fast-and-sidecar split.
182///
183/// `engine` runs both inference calls. `fast_request` is streamed
184/// through the returned `VoiceTurnHandle::fast` channel; `sidecar_request`
185/// runs to completion and resolves the `sidecar` oneshot once.
186///
187/// Both inference requests must be pre-composed by the caller — including
188/// any voice-context overlay (use [`car_voice::compose_voice_context`]).
189/// This crate stays neutral on prompt construction.
190///
191/// Wraps [`dispatch_voice_turn_with_telemetry`] with no telemetry — use
192/// the `_with_telemetry` variant when you want eventlog emissions.
193pub fn dispatch_voice_turn(
194    engine: Arc<InferenceEngine>,
195    utterance: String,
196    fast_request: GenerateRequest,
197    sidecar_request: GenerateRequest,
198) -> VoiceTurnHandle {
199    dispatch_voice_turn_with_telemetry(engine, utterance, fast_request, sidecar_request, None)
200}
201
202/// Same as [`dispatch_voice_turn`] but emits voice eventlog kinds
203/// (`VoiceFastTurnStarted`, `VoiceFastTurnEnded`, `VoiceSidecarResolved`,
204/// `VoiceSidecarFailed`, `VoiceSidecarTimedOut`, `VoiceTurnCancelled`)
205/// to the supplied telemetry sink as the turn progresses.
206pub fn dispatch_voice_turn_with_telemetry(
207    engine: Arc<InferenceEngine>,
208    _utterance: String,
209    fast_request: GenerateRequest,
210    sidecar_request: GenerateRequest,
211    telemetry: Option<VoiceTelemetry>,
212) -> VoiceTurnHandle {
213    let turn_id = next_turn_id();
214    let cancel = CancellationToken::new();
215    let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(64);
216    let (sidecar_tx, sidecar_rx) = oneshot::channel();
217
218    if let Some(t) = telemetry.as_ref() {
219        t.emit(EventKind::VoiceFastTurnStarted, turn_id, vec![]);
220    }
221
222    spawn_fast_task(
223        engine.clone(),
224        fast_request,
225        fast_tx,
226        cancel.clone(),
227        turn_id,
228        telemetry.clone(),
229    );
230    spawn_sidecar_task(
231        engine,
232        sidecar_request,
233        sidecar_tx,
234        cancel.clone(),
235        turn_id,
236        telemetry,
237    );
238
239    VoiceTurnHandle {
240        control: VoiceTurnControl { turn_id, cancel },
241        fast: fast_rx,
242        sidecar: sidecar_rx,
243    }
244}
245
246/// Variant of [`dispatch_voice_turn`] that skips the fast inference call.
247///
248/// Use when the caller has already played a hardcoded bridge phrase
249/// (the "STRUCTURAL HALLUCINATION FIX" in quip's terms) and only needs
250/// the sidecar's substantive answer. The returned handle's `fast`
251/// channel is pre-closed so callers don't have to special-case the
252/// shape.
253pub fn dispatch_voice_turn_sidecar_only(
254    engine: Arc<InferenceEngine>,
255    utterance: String,
256    sidecar_request: GenerateRequest,
257) -> VoiceTurnHandle {
258    dispatch_voice_turn_sidecar_only_with_telemetry(engine, utterance, sidecar_request, None)
259}
260
261/// Same as [`dispatch_voice_turn_sidecar_only`] but emits voice
262/// eventlog kinds for the sidecar half of the turn.
263pub fn dispatch_voice_turn_sidecar_only_with_telemetry(
264    engine: Arc<InferenceEngine>,
265    utterance: String,
266    sidecar_request: GenerateRequest,
267    telemetry: Option<VoiceTelemetry>,
268) -> VoiceTurnHandle {
269    dispatch_voice_turn_sidecar_only_with_classifier(
270        engine,
271        utterance,
272        sidecar_request,
273        None,
274        telemetry,
275    )
276}
277
278/// Variant of [`dispatch_voice_turn_sidecar_only`] that consults a
279/// [`DirectDataFetcher`] before invoking the LLM. On a fetcher hit
280/// the LLM call is skipped entirely — the fetcher's text becomes
281/// the sidecar's `SidecarResult`. On miss, falls through to the
282/// LLM-based sidecar exactly like the bare `_with_telemetry` variant.
283pub fn dispatch_voice_turn_sidecar_only_with_classifier(
284    engine: Arc<InferenceEngine>,
285    utterance: String,
286    sidecar_request: GenerateRequest,
287    fetcher: Option<Arc<dyn DirectDataFetcher>>,
288    telemetry: Option<VoiceTelemetry>,
289) -> VoiceTurnHandle {
290    let turn_id = next_turn_id();
291    let cancel = CancellationToken::new();
292    // Closed channel — recv() returns None immediately. Capacity 1 is
293    // arbitrary; we never send anything.
294    let (fast_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
295    drop(fast_tx);
296    let (sidecar_tx, sidecar_rx) = oneshot::channel();
297
298    spawn_sidecar_task_classified(
299        engine,
300        utterance,
301        sidecar_request,
302        sidecar_tx,
303        cancel.clone(),
304        turn_id,
305        fetcher,
306        telemetry,
307    );
308
309    VoiceTurnHandle {
310        control: VoiceTurnControl { turn_id, cancel },
311        fast: fast_rx,
312        sidecar: sidecar_rx,
313    }
314}
315
316fn spawn_fast_task(
317    engine: Arc<InferenceEngine>,
318    request: GenerateRequest,
319    out: mpsc::Sender<StreamEvent>,
320    cancel: CancellationToken,
321    turn_id: u64,
322    telemetry: Option<VoiceTelemetry>,
323) {
324    tokio::spawn(async move {
325        let cancelled_during = tokio::select! {
326            biased;
327            _ = cancel.cancelled() => {
328                tracing::debug!(turn_id, "fast task cancelled before inference start");
329                true
330            }
331            res = engine.generate_tracked_stream(request) => {
332                match res {
333                    Ok(mut rx) => {
334                        relay_fast_stream(&mut rx, &out, &cancel, turn_id).await;
335                        cancel.is_cancelled()
336                    }
337                    Err(e) => {
338                        tracing::error!(turn_id, error=%e, "fast turn inference failed");
339                        false
340                    }
341                }
342            }
343        };
344        if let Some(t) = telemetry {
345            if cancelled_during {
346                t.emit(EventKind::VoiceTurnCancelled, turn_id, vec![("track", "fast".into())]);
347            } else {
348                t.emit(EventKind::VoiceFastTurnEnded, turn_id, vec![]);
349            }
350        }
351    });
352}
353
354async fn relay_fast_stream(
355    rx: &mut mpsc::Receiver<StreamEvent>,
356    out: &mpsc::Sender<StreamEvent>,
357    cancel: &CancellationToken,
358    turn_id: u64,
359) {
360    loop {
361        tokio::select! {
362            biased;
363            _ = cancel.cancelled() => {
364                tracing::debug!(turn_id, "fast stream cancelled mid-relay");
365                break;
366            }
367            evt = rx.recv() => match evt {
368                Some(e) => {
369                    if out.send(e).await.is_err() {
370                        // Receiver dropped — caller has stopped listening.
371                        break;
372                    }
373                }
374                None => break,
375            }
376        }
377    }
378}
379
380fn spawn_sidecar_task_classified(
381    engine: Arc<InferenceEngine>,
382    utterance: String,
383    request: GenerateRequest,
384    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
385    cancel: CancellationToken,
386    turn_id: u64,
387    fetcher: Option<Arc<dyn DirectDataFetcher>>,
388    telemetry: Option<VoiceTelemetry>,
389) {
390    tokio::spawn(async move {
391        // Try the direct fetcher first. On hit, skip the LLM entirely.
392        if let Some(f) = fetcher.as_ref() {
393            let fetch_outcome = tokio::select! {
394                biased;
395                _ = cancel.cancelled() => None,
396                outcome = f.try_fetch(&utterance) => outcome,
397            };
398            match fetch_outcome {
399                Some(Ok(text)) => {
400                    let result = Ok(SidecarResult {
401                        turn_id,
402                        text: text.clone(),
403                        data: None,
404                    });
405                    if let Some(t) = telemetry {
406                        t.emit(
407                            EventKind::VoiceSidecarResolved,
408                            turn_id,
409                            vec![
410                                ("text_len", Value::from(text.len())),
411                                ("source", "direct_fetch".into()),
412                            ],
413                        );
414                    }
415                    let _ = sender.send(result);
416                    return;
417                }
418                Some(Err(e)) => {
419                    tracing::debug!(turn_id, error=%e, "DirectDataFetcher errored; falling through to LLM");
420                }
421                None => { /* no match — fall through */ }
422            }
423            // If the fetch attempt was cancelled (the cancel branch ran),
424            // honour that immediately rather than starting an LLM call.
425            if cancel.is_cancelled() {
426                let _ = sender.send(Err(VoiceTurnError::Cancelled));
427                if let Some(t) = telemetry {
428                    t.emit(
429                        EventKind::VoiceTurnCancelled,
430                        turn_id,
431                        vec![("track", "sidecar".into())],
432                    );
433                }
434                return;
435            }
436        }
437        run_llm_sidecar(engine, request, sender, cancel, turn_id, telemetry).await;
438    });
439}
440
441async fn run_llm_sidecar(
442    engine: Arc<InferenceEngine>,
443    request: GenerateRequest,
444    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
445    cancel: CancellationToken,
446    turn_id: u64,
447    telemetry: Option<VoiceTelemetry>,
448) {
449    let result = tokio::select! {
450        biased;
451        _ = cancel.cancelled() => Err(VoiceTurnError::Cancelled),
452        res = engine.generate(request) => {
453            res.map(|text| SidecarResult { turn_id, text, data: None })
454               .map_err(|e| VoiceTurnError::Inference(e.to_string()))
455        }
456    };
457    if let Some(t) = telemetry {
458        match &result {
459            Ok(r) => t.emit(
460                EventKind::VoiceSidecarResolved,
461                turn_id,
462                vec![("text_len", Value::from(r.text.len()))],
463            ),
464            Err(VoiceTurnError::Cancelled) => {
465                t.emit(EventKind::VoiceTurnCancelled, turn_id, vec![("track", "sidecar".into())]);
466            }
467            Err(VoiceTurnError::Inference(e)) => {
468                t.emit(
469                    EventKind::VoiceSidecarFailed,
470                    turn_id,
471                    vec![("error", Value::from(e.clone()))],
472                );
473            }
474        }
475    }
476    let _ = sender.send(result);
477}
478
479fn spawn_sidecar_task(
480    engine: Arc<InferenceEngine>,
481    request: GenerateRequest,
482    sender: oneshot::Sender<Result<SidecarResult, VoiceTurnError>>,
483    cancel: CancellationToken,
484    turn_id: u64,
485    telemetry: Option<VoiceTelemetry>,
486) {
487    tokio::spawn(run_llm_sidecar(
488        engine, request, sender, cancel, turn_id, telemetry,
489    ));
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn turn_ids_are_monotonic_and_unique() {
498        let a = next_turn_id();
499        let b = next_turn_id();
500        let c = next_turn_id();
501        assert!(b > a);
502        assert!(c > b);
503    }
504
505    #[test]
506    fn control_cancel_is_observable() {
507        let control = VoiceTurnControl {
508            turn_id: 42,
509            cancel: CancellationToken::new(),
510        };
511        assert!(!control.is_cancelled());
512        let clone = control.clone();
513        clone.cancel();
514        assert!(control.is_cancelled());
515    }
516
517    #[test]
518    fn handle_turn_id_delegates_to_control() {
519        let (_tx, fast_rx) = mpsc::channel::<StreamEvent>(1);
520        let (_stx, sidecar_rx) = oneshot::channel();
521        let handle = VoiceTurnHandle {
522            control: VoiceTurnControl {
523                turn_id: 7,
524                cancel: CancellationToken::new(),
525            },
526            fast: fast_rx,
527            sidecar: sidecar_rx,
528        };
529        assert_eq!(handle.turn_id(), 7);
530        assert!(!handle.control.is_cancelled());
531        handle.cancel();
532        assert!(handle.control.is_cancelled());
533    }
534
535    #[tokio::test]
536    async fn closed_fast_channel_recv_is_none() {
537        // Mirrors the channel shape that `dispatch_voice_turn_sidecar_only`
538        // produces — a pre-closed mpsc so callers can `.recv()` without
539        // special-casing the sidecar-only flavour.
540        let (fast_tx, mut fast_rx) = mpsc::channel::<StreamEvent>(1);
541        drop(fast_tx);
542        assert!(fast_rx.recv().await.is_none());
543    }
544
545    #[tokio::test]
546    async fn cancellation_propagates_to_relay_fast_stream() {
547        // Wire a producer rx, an out channel, and a cancellation token.
548        // Cancelling the token should stop the relay even while the
549        // producer is still alive.
550        let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
551        let (out_tx, mut out_rx) = mpsc::channel::<StreamEvent>(8);
552        let cancel = CancellationToken::new();
553
554        // Producer pushes until the channel fills. Bounded mpsc back-pressure
555        // means it will block; we cancel the relay before it drains.
556        let producer = tokio::spawn(async move {
557            for i in 0..100u32 {
558                if in_tx.send(StreamEvent::TextDelta(format!("d{i}"))).await.is_err() {
559                    break;
560                }
561            }
562        });
563
564        let cancel_clone = cancel.clone();
565        let relay = tokio::spawn(async move {
566            relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
567        });
568
569        // Drain at least one event, then cancel.
570        let first = out_rx.recv().await.expect("first event");
571        match first {
572            StreamEvent::TextDelta(_) => {}
573            other => panic!("unexpected event: {other:?}"),
574        }
575        cancel.cancel();
576
577        // Relay should exit promptly.
578        tokio::time::timeout(std::time::Duration::from_secs(1), relay)
579            .await
580            .expect("relay did not exit after cancel")
581            .expect("relay panicked");
582
583        producer.abort();
584    }
585
586    #[tokio::test]
587    async fn direct_fetcher_hit_skips_llm_and_resolves_sidecar() {
588        struct Hit;
589        #[async_trait::async_trait]
590        impl DirectDataFetcher for Hit {
591            async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
592                Some(Ok("3 emails: Bob, Alice, Carol".to_string()))
593            }
594        }
595        let cancel = CancellationToken::new();
596        let (tx, rx) = oneshot::channel();
597        let log = Arc::new(Mutex::new(EventLog::new()));
598        let telemetry = VoiceTelemetry::new(log.clone());
599        // We never reach the engine — its generate() would panic on a
600        // bare InferenceEngine without backends. The fetcher hit path
601        // must short-circuit before that.
602        let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
603            car_inference::InferenceConfig::default(),
604        ));
605        spawn_sidecar_task_classified(
606            dummy_engine,
607            "any new email today".to_string(),
608            GenerateRequest::default(),
609            tx,
610            cancel,
611            99,
612            Some(Arc::new(Hit)),
613            Some(telemetry),
614        );
615        let r = rx.await.expect("oneshot delivered").expect("ok");
616        assert_eq!(r.turn_id, 99);
617        assert_eq!(r.text, "3 emails: Bob, Alice, Carol");
618        // Resolved event tagged source=direct_fetch.
619        let g = log.lock().unwrap();
620        let evt = g.events().last().expect("event emitted");
621        assert_eq!(evt.kind, EventKind::VoiceSidecarResolved);
622        assert_eq!(evt.data.get("source"), Some(&Value::from("direct_fetch")));
623    }
624
625    #[tokio::test]
626    async fn direct_fetcher_miss_falls_through_but_we_observe_no_short_circuit() {
627        // We only verify that the fetch path enters the LLM branch by
628        // confirming the spawn task hasn't resolved the oneshot via the
629        // fetcher hit path. We can't drive a real LLM here, so we
630        // cancel quickly and observe the Cancelled error instead of a
631        // SidecarResult — proves the fetcher's None did not produce a
632        // direct-resolve.
633        struct Miss;
634        #[async_trait::async_trait]
635        impl DirectDataFetcher for Miss {
636            async fn try_fetch(&self, _u: &str) -> Option<Result<String, String>> {
637                None
638            }
639        }
640        let cancel = CancellationToken::new();
641        let (tx, rx) = oneshot::channel();
642        let dummy_engine = Arc::new(car_inference::InferenceEngine::new(
643            car_inference::InferenceConfig::default(),
644        ));
645        spawn_sidecar_task_classified(
646            dummy_engine,
647            "what's the weather".to_string(),
648            GenerateRequest::default(),
649            tx,
650            cancel.clone(),
651            100,
652            Some(Arc::new(Miss)),
653            None,
654        );
655        // Cancel before the LLM has a chance to fail; we expect Cancelled.
656        cancel.cancel();
657        match rx.await.expect("oneshot delivered") {
658            Err(VoiceTurnError::Cancelled) => {}
659            other => panic!("expected Cancelled after fetcher miss + cancel, got {other:?}"),
660        }
661    }
662
663    #[test]
664    fn telemetry_emit_appends_to_eventlog() {
665        let log = Arc::new(Mutex::new(EventLog::new()));
666        let telemetry = VoiceTelemetry::new(log.clone());
667        telemetry.emit(EventKind::VoiceFastTurnStarted, 7, vec![]);
668        telemetry.emit(
669            EventKind::VoiceSidecarResolved,
670            7,
671            vec![("text_len", Value::from(42usize))],
672        );
673        let g = log.lock().unwrap();
674        let events = g.events();
675        assert_eq!(events.len(), 2);
676        assert_eq!(events[0].kind, EventKind::VoiceFastTurnStarted);
677        assert_eq!(events[0].data.get("turn_id"), Some(&Value::from(7u64)));
678        assert_eq!(events[1].kind, EventKind::VoiceSidecarResolved);
679        assert_eq!(events[1].data.get("text_len"), Some(&Value::from(42usize)));
680    }
681
682    #[tokio::test]
683    async fn dropped_out_channel_stops_relay_without_cancel() {
684        let (in_tx, mut in_rx) = mpsc::channel::<StreamEvent>(8);
685        let (out_tx, out_rx) = mpsc::channel::<StreamEvent>(8);
686        let cancel = CancellationToken::new();
687
688        // Caller drops their receiver — the relay should exit on the
689        // next send attempt.
690        drop(out_rx);
691
692        let cancel_clone = cancel.clone();
693        let relay = tokio::spawn(async move {
694            relay_fast_stream(&mut in_rx, &out_tx, &cancel_clone, 1).await;
695        });
696
697        // Push one event so the relay's send fails.
698        in_tx.send(StreamEvent::TextDelta("x".into())).await.unwrap();
699
700        tokio::time::timeout(std::time::Duration::from_secs(1), relay)
701            .await
702            .expect("relay did not exit after out_rx drop")
703            .expect("relay panicked");
704    }
705}