car-voice 0.15.1

Voice I/O capability for CAR — mic capture, VAD, listener/speaker traits
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
//! FFI-friendly session layer over [`Listener`].
//!
//! A [`VoiceSession`] owns a [`Listener`] plus a tokio task that drains
//! [`VoiceEvent`]s and forwards them to a [`VoiceEventSink`] as JSON
//! strings. The sink abstraction lets each FFI binding (NAPI, PyO3,
//! WebSocket server) plug in its own delivery mechanism without
//! car-voice knowing anything about napi-rs / pyo3 / tungstenite.
//!
//! Multiple concurrent sessions are first-class — a meeting captures
//! mic and system audio as two sessions sharing a single sink.
//!
//! ## JSON event shape
//!
//! Events are emitted as `{"type": "...", ...}` objects. The taxonomy
//! mirrors [`VoiceEvent`] so consumers can map directly:
//!
//! | event variant         | type field            |
//! | --------------------- | --------------------- |
//! | `SpeechStart`         | `"speech_start"`      |
//! | `SpeechEnd`           | `"speech_end"`        |
//! | `Transcript {..}`     | `"transcript"`        |
//! | `AudioChunk {..}`     | `"audio_chunk"` (meta only — samples omitted) |
//! | `BargeIn`             | `"barge_in"`          |
//! | `EnrollmentCaptured`  | `"enrollment_captured"` |
//! | `EnrollmentFailed`    | `"enrollment_failed"` |
//!
//! Plus a synthetic `"done"` event when the session ends (listener
//! channel closed or [`VoiceSession::stop`] called).

use crate::enrollment::TranscriptRole;
use crate::{Listener, Result, VoiceConfig, VoiceError, VoiceEvent};
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::info;

fn now_secs() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0)
}

/// Sink for voice events emitted by a [`VoiceSession`].
///
/// `send` is called from the session's drain task on the tokio runtime.
/// Implementations must be non-blocking — blocking the drain task
/// stalls the listener's bounded event channel and back-pressures the
/// audio pipeline.
pub trait VoiceEventSink: Send + Sync {
    /// Deliver an event JSON payload tagged with the originating session id.
    fn send(&self, session_id: &str, event_json: String);
}

/// An active voice capture + transcription session.
///
/// Construct with a configured [`Listener`] (mic, system audio, etc.),
/// then [`start`](Self::start) it with a [`VoiceConfig`] and a sink.
/// The session spawns a tokio task that drains the listener and
/// forwards events to the sink as JSON until the listener closes or
/// [`stop`](Self::stop) is called.
///
/// The listener is held behind a [`tokio::sync::Mutex`] so [`VoiceSession`]
/// itself is `Send + Sync` — required for the [`VoiceSessionRegistry`]
/// which stores sessions in a [`DashMap`]. We can't add `Sync` to the
/// [`Listener`] trait because [`crate::voice_processing_listener`] holds
/// a `Box<dyn FnOnce + Send>` that isn't `Sync`, and the trait predates
/// the registry.
pub struct VoiceSession {
    session_id: String,
    listener: Arc<Mutex<Box<dyn Listener>>>,
    drain_handle: Mutex<Option<JoinHandle<()>>>,
    stop_tx: Mutex<Option<oneshot::Sender<()>>>,
    /// Wall-clock seconds since UNIX epoch of the last activity on
    /// this session. Bumped by the drain task on every event the
    /// session produces. The registry's sweep task uses this to find
    /// idle sessions whose owners forgot to call `stop()`.
    /// `Arc` so the drain task can share the counter.
    last_active_at: Arc<AtomicU64>,
}

impl VoiceSession {
    /// Wrap a configured [`Listener`] under the given session id.
    pub fn new(session_id: impl Into<String>, listener: Box<dyn Listener>) -> Self {
        Self {
            session_id: session_id.into(),
            listener: Arc::new(Mutex::new(listener)),
            drain_handle: Mutex::new(None),
            stop_tx: Mutex::new(None),
            last_active_at: Arc::new(AtomicU64::new(now_secs())),
        }
    }

    /// Seconds since UNIX epoch of the last event observed on this
    /// session. Used by the sweep task to detect idle sessions.
    pub fn last_active_at(&self) -> u64 {
        self.last_active_at.load(Ordering::Relaxed)
    }

    /// Cloneable handle on the activity timestamp for the drain task.
    fn shareable_last_active(&self) -> Arc<AtomicU64> {
        self.last_active_at.clone()
    }

    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    /// Start the listener and spawn the drain task.
    ///
    /// Returns once the listener has acquired its capture device and the
    /// drain task is running. Events flow asynchronously to `sink`.
    pub async fn start(&self, config: VoiceConfig, sink: Arc<dyn VoiceEventSink>) -> Result<()> {
        let mut rx = {
            let mut listener = self.listener.lock().await;
            listener.start(config).await?
        };
        let session_id = self.session_id.clone();
        let last_active = self.shareable_last_active();
        let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
        *self.stop_tx.lock().await = Some(stop_tx);

        let handle = tokio::spawn(async move {
            loop {
                tokio::select! {
                    biased;
                    _ = &mut stop_rx => break,
                    maybe_evt = rx.recv() => {
                        match maybe_evt {
                            Some(evt) => {
                                last_active.store(now_secs(), Ordering::Relaxed);
                                sink.send(&session_id, voice_event_to_json(&evt));
                            }
                            None => break, // listener closed its sender
                        }
                    }
                }
            }
            // Drain anything buffered after the stop signal so callers
            // don't lose a final transcript that arrived between the
            // VAD's SpeechEnd and the JS-side `stop` round-trip.
            while let Ok(evt) = rx.try_recv() {
                last_active.store(now_secs(), Ordering::Relaxed);
                sink.send(&session_id, voice_event_to_json(&evt));
            }
            sink.send(&session_id, r#"{"type":"done"}"#.to_string());
        });
        *self.drain_handle.lock().await = Some(handle);
        Ok(())
    }

    /// Stop the listener, signal the drain task, and await its exit.
    ///
    /// Safe to call after the listener has already closed its sender —
    /// the drain task will have exited on its own and `JoinHandle::await`
    /// just returns.
    pub async fn stop(&self) -> Result<()> {
        if let Some(tx) = self.stop_tx.lock().await.take() {
            // Receiver may already be gone if the loop exited via channel close.
            let _ = tx.send(());
        }
        // Stop the listener; ignore NotRunning so a double-stop is idempotent
        // from the registry's perspective.
        let stop_result = {
            let mut listener = self.listener.lock().await;
            listener.stop().await
        };
        match stop_result {
            Ok(()) | Err(VoiceError::NotRunning) => {}
            Err(e) => return Err(e),
        }
        if let Some(h) = self.drain_handle.lock().await.take() {
            let _ = h.await;
        }
        Ok(())
    }
}

/// Translate an internal [`VoiceEvent`] to the FFI JSON event shape.
///
/// We use a hand-rolled mapping rather than `serde_json::to_string` on
/// `VoiceEvent` directly so we can:
///   - flatten `TranscriptRole` to a stable string form (`"user"` /
///     `"other:<id>"` / `"unknown"`),
///   - drop `AudioChunk::samples` from the default stream (consumers
///     that want raw audio can opt in with a future option flag rather
///     than paying the JSON-encoding cost on every frame).
fn voice_event_to_json(evt: &VoiceEvent) -> String {
    match evt {
        VoiceEvent::SpeechStart => r#"{"type":"speech_start"}"#.to_string(),
        VoiceEvent::SpeechEnd => r#"{"type":"speech_end"}"#.to_string(),
        VoiceEvent::Transcript {
            text,
            duration_ms,
            role,
        } => serde_json::json!({
            "type": "transcript",
            "text": text,
            "duration_ms": duration_ms,
            "role": role_to_str(role),
        })
        .to_string(),
        VoiceEvent::Partial { text, duration_ms } => serde_json::json!({
            "type": "partial",
            "text": text,
            "duration_ms": duration_ms,
        })
        .to_string(),
        VoiceEvent::AudioChunk {
            samples,
            sample_rate,
        } => serde_json::json!({
            "type": "audio_chunk",
            "sample_rate": sample_rate,
            "frame_count": samples.len(),
        })
        .to_string(),
        VoiceEvent::BargeIn => r#"{"type":"barge_in"}"#.to_string(),
        VoiceEvent::EnrollmentCaptured { label, save_path } => serde_json::json!({
            "type": "enrollment_captured",
            "label": label,
            "save_path": save_path.display().to_string(),
        })
        .to_string(),
        VoiceEvent::EnrollmentFailed { reason } => serde_json::json!({
            "type": "enrollment_failed",
            "reason": reason,
        })
        .to_string(),
    }
}

fn role_to_str(role: &TranscriptRole) -> String {
    match role {
        TranscriptRole::EnrolledUser => "enrolled_user".to_string(),
        TranscriptRole::OtherSpeaker { local_id } => format!("other:{}", local_id),
        TranscriptRole::Unknown => "unknown".to_string(),
    }
}

/// Concurrent registry of active voice sessions, keyed by session id.
///
/// FFI bindings hold a single registry instance (e.g. behind a static
/// `OnceLock`) and use it to start, look up, and stop sessions across
/// the FFI boundary. Designed to support multiple concurrent sessions
/// — meeting capture wants two (mic + system audio) sharing one
/// [`VoiceEventSink`].
///
/// ## Idle-session sweep
///
/// On construction the registry spawns a background task that sweeps
/// every `sweep_interval_secs` (default 60s) and reaps any session
/// whose `last_active_at` is older than `idle_timeout_secs` (default
/// 300s). The sweep task holds only a `Weak` reference back to the
/// registry, so when the last `Arc` drops it exits cleanly and the
/// registry — including any sessions still holding listener handles
/// — drops normally.
///
/// Misbehaving callers that drop a WebSocket / FFI binding without
/// calling `stop(session_id)` no longer leak listeners forever.
pub struct VoiceSessionRegistry {
    sessions: DashMap<String, VoiceSession>,
    sweep_handle: std::sync::Mutex<Option<JoinHandle<()>>>,
    idle_timeout_secs: u64,
    sweep_interval_secs: u64,
}

const DEFAULT_IDLE_TIMEOUT_SECS: u64 = 300;
const DEFAULT_SWEEP_INTERVAL_SECS: u64 = 60;

impl Default for VoiceSessionRegistry {
    fn default() -> Self {
        Self {
            sessions: DashMap::new(),
            sweep_handle: std::sync::Mutex::new(None),
            idle_timeout_secs: DEFAULT_IDLE_TIMEOUT_SECS,
            sweep_interval_secs: DEFAULT_SWEEP_INTERVAL_SECS,
        }
    }
}

impl VoiceSessionRegistry {
    /// Build a registry with default sweep settings (60s sweep
    /// interval, 300s idle timeout). The sweep task starts when
    /// [`Self::start_sweeper`] is called on the resulting `Arc`.
    pub fn new() -> Self {
        Self::default()
    }

    /// Override the idle timeout. Sessions older than this are reaped
    /// by the sweep task.
    pub fn with_idle_timeout_secs(mut self, secs: u64) -> Self {
        self.idle_timeout_secs = secs;
        self
    }

    /// Override the sweep interval. Lower = more responsive cleanup
    /// at the cost of background CPU.
    pub fn with_sweep_interval_secs(mut self, secs: u64) -> Self {
        self.sweep_interval_secs = secs;
        self
    }

    /// Start the background sweep task. Idempotent — calling twice
    /// is a no-op (the existing handle is kept).
    ///
    /// The task takes a `Weak` reference back so the registry's
    /// `Arc` can drop normally and end the loop. Embedders typically
    /// call this once at startup right after wrapping the registry
    /// in `Arc`.
    ///
    /// Must be called from within a tokio runtime context — uses
    /// `Handle::try_current()` to spawn. If no runtime is available,
    /// emits a warning and skips spawning; the caller can retry from
    /// a runtime context. This makes the call safe (no panic) from
    /// any FFI init path that might run before the runtime is set up.
    pub fn start_sweeper(self: &Arc<Self>) {
        let mut guard = match self.sweep_handle.lock() {
            Ok(g) => g,
            Err(p) => p.into_inner(),
        };
        if guard.is_some() {
            return;
        }
        let rt = match tokio::runtime::Handle::try_current() {
            Ok(h) => h,
            Err(_) => {
                tracing::warn!(
                    "VoiceSessionRegistry::start_sweeper called outside a tokio runtime; \
                     sweeper not started — idle sessions will not be reaped automatically"
                );
                return;
            }
        };
        let weak = Arc::downgrade(self);
        let interval_secs = self.sweep_interval_secs;
        let handle = rt.spawn(async move {
            run_sweeper(weak, interval_secs).await;
        });
        *guard = Some(handle);
    }

    /// Insert a session, refusing if the id is already taken.
    pub fn insert(&self, session: VoiceSession) -> Result<()> {
        let id = session.session_id.clone();
        if self.sessions.contains_key(&id) {
            return Err(VoiceError::AlreadyRunning);
        }
        self.sessions.insert(id, session);
        Ok(())
    }

    /// Stop and remove the session with the given id.
    pub async fn stop(&self, session_id: &str) -> Result<()> {
        let session = self
            .sessions
            .remove(session_id)
            .ok_or(VoiceError::NotRunning)?
            .1;
        session.stop().await
    }

    /// True if a session with this id is currently registered.
    pub fn contains(&self, session_id: &str) -> bool {
        self.sessions.contains_key(session_id)
    }

    /// IDs of all currently registered sessions.
    pub fn list(&self) -> Vec<String> {
        self.sessions.iter().map(|e| e.key().clone()).collect()
    }

    /// Look up a session and apply a closure with shared access.
    /// Returns `NotRunning` if the id is unknown.
    pub fn with<F, R>(&self, session_id: &str, f: F) -> Result<R>
    where
        F: FnOnce(&VoiceSession) -> R,
    {
        let entry = self
            .sessions
            .get(session_id)
            .ok_or(VoiceError::NotRunning)?;
        Ok(f(entry.value()))
    }

    /// Sweep one pass: collect ids whose last activity is older than
    /// `idle_timeout_secs`, then stop and remove each. Returns the
    /// reaped ids so callers (mostly tests) can verify behaviour.
    pub async fn reap_idle(&self) -> Vec<String> {
        let cutoff = now_secs().saturating_sub(self.idle_timeout_secs);
        let stale: Vec<String> = self
            .sessions
            .iter()
            .filter(|e| e.value().last_active_at() < cutoff)
            .map(|e| e.key().clone())
            .collect();
        for id in &stale {
            // `stop` removes from the map and stops the listener.
            // Errors here mean the session was already gone (race
            // with explicit stop) — log and continue.
            if let Err(e) = self.stop(id).await {
                info!(session_id = %id, error = %e, "voice session reap: stop returned error");
            } else {
                info!(session_id = %id, "voice session reaped (idle)");
            }
        }
        stale
    }
}

async fn run_sweeper(weak: Weak<VoiceSessionRegistry>, interval_secs: u64) {
    let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
    // First tick fires immediately; skip it to honour the configured
    // sweep cadence on startup.
    interval.tick().await;
    loop {
        interval.tick().await;
        match weak.upgrade() {
            Some(reg) => {
                let _ = reg.reap_idle().await;
            }
            None => break, // registry dropped, exit cleanly
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Listener;
    use async_trait::async_trait;
    use std::path::PathBuf;
    use std::sync::Mutex;
    use tokio::sync::mpsc;

    /// Listener that emits a fixed script of events then closes.
    ///
    /// The tx half is moved into the spawned task and not retained on
    /// `self` — once the script is exhausted the sender drops and the
    /// receiver sees end-of-stream, which is what the drain loop needs
    /// to emit its synthetic "done" event.
    struct ScriptedListener {
        script: Vec<VoiceEvent>,
        running: bool,
    }

    impl ScriptedListener {
        fn new(script: Vec<VoiceEvent>) -> Self {
            Self {
                script,
                running: false,
            }
        }
    }

    #[async_trait]
    impl Listener for ScriptedListener {
        async fn start(&mut self, _config: VoiceConfig) -> Result<mpsc::Receiver<VoiceEvent>> {
            let (tx, rx) = mpsc::channel(16);
            self.running = true;
            let script = std::mem::take(&mut self.script);
            tokio::spawn(async move {
                for evt in script {
                    if tx.send(evt).await.is_err() {
                        break;
                    }
                }
                // dropping tx here closes the receiver -> drain task exits
            });
            Ok(rx)
        }

        async fn stop(&mut self) -> Result<()> {
            self.running = false;
            Ok(())
        }

        fn is_running(&self) -> bool {
            self.running
        }
    }

    /// Sink that collects events into a Mutex<Vec<(String, String)>>.
    #[derive(Default)]
    struct CollectingSink {
        events: Mutex<Vec<(String, String)>>,
    }

    impl VoiceEventSink for CollectingSink {
        fn send(&self, session_id: &str, event_json: String) {
            self.events
                .lock()
                .unwrap()
                .push((session_id.to_string(), event_json));
        }
    }

    #[tokio::test]
    async fn drains_listener_events_to_sink_as_json() {
        let listener = ScriptedListener::new(vec![
            VoiceEvent::SpeechStart,
            VoiceEvent::Transcript {
                text: "hello world".into(),
                duration_ms: 1100,
                role: TranscriptRole::EnrolledUser,
            },
            VoiceEvent::SpeechEnd,
        ]);
        let sink: Arc<CollectingSink> = Arc::new(CollectingSink::default());
        let session = VoiceSession::new("s1", Box::new(listener));
        session
            .start(VoiceConfig::default(), sink.clone())
            .await
            .unwrap();
        // Wait for the listener to finish emitting and the drain task to
        // see the channel close + emit the synthetic "done" event.
        // The session doesn't need an explicit stop here — the listener's
        // sender drops on its own.
        // Poll up to 1s for the "done" event.
        for _ in 0..50 {
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
            if sink
                .events
                .lock()
                .unwrap()
                .last()
                .map(|(_, j)| j.contains(r#""done""#))
                .unwrap_or(false)
            {
                break;
            }
        }
        let events = sink.events.lock().unwrap().clone();
        let types: Vec<&str> = events
            .iter()
            .map(|(_, j)| {
                if j.contains(r#""speech_start""#) {
                    "speech_start"
                } else if j.contains(r#""transcript""#) {
                    "transcript"
                } else if j.contains(r#""speech_end""#) {
                    "speech_end"
                } else if j.contains(r#""done""#) {
                    "done"
                } else {
                    "other"
                }
            })
            .collect();
        assert_eq!(
            types,
            vec!["speech_start", "transcript", "speech_end", "done"]
        );
        // Session ids are stamped on every event.
        for (sid, _) in &events {
            assert_eq!(sid, "s1");
        }
        // Transcript JSON contains text and role.
        let transcript_json = events
            .iter()
            .find(|(_, j)| j.contains(r#""transcript""#))
            .map(|(_, j)| j.clone())
            .unwrap();
        assert!(transcript_json.contains(r#""text":"hello world""#));
        assert!(transcript_json.contains(r#""role":"enrolled_user""#));
        assert!(transcript_json.contains(r#""duration_ms":1100"#));
    }

    #[tokio::test]
    async fn registry_rejects_duplicate_session_ids() {
        let registry = VoiceSessionRegistry::new();
        let l1 = Box::new(ScriptedListener::new(vec![]));
        let l2 = Box::new(ScriptedListener::new(vec![]));
        registry.insert(VoiceSession::new("dup", l1)).unwrap();
        let err = registry
            .insert(VoiceSession::new("dup", l2))
            .expect_err("duplicate id should error");
        matches!(err, VoiceError::AlreadyRunning);
        assert_eq!(registry.list(), vec!["dup".to_string()]);
    }

    #[tokio::test]
    async fn reap_idle_reaps_stale_keeps_fresh() {
        // 60s idle window so the "fresh" session is comfortably inside
        // it across any plausible test latency.
        let registry = Arc::new(
            VoiceSessionRegistry::new()
                .with_idle_timeout_secs(60)
                .with_sweep_interval_secs(60),
        );
        let sink: Arc<CollectingSink> = Arc::new(CollectingSink::default());

        let stale = VoiceSession::new("stale", Box::new(ScriptedListener::new(vec![])));
        stale
            .start(VoiceConfig::default(), sink.clone())
            .await
            .unwrap();
        // Backdate the activity stamp by ~1 hour to land outside the window.
        stale
            .last_active_at
            .store(now_secs().saturating_sub(3600), Ordering::Relaxed);

        let fresh = VoiceSession::new("fresh", Box::new(ScriptedListener::new(vec![])));
        fresh
            .start(VoiceConfig::default(), sink.clone())
            .await
            .unwrap();

        registry.insert(stale).unwrap();
        registry.insert(fresh).unwrap();

        let reaped = registry.reap_idle().await;
        assert_eq!(reaped, vec!["stale".to_string()]);
        assert!(!registry.contains("stale"));
        assert!(registry.contains("fresh"));
    }

    #[tokio::test]
    async fn reap_idle_returns_empty_when_no_sessions_are_stale() {
        let registry = Arc::new(VoiceSessionRegistry::new().with_idle_timeout_secs(60));
        let sink: Arc<CollectingSink> = Arc::new(CollectingSink::default());
        let session = VoiceSession::new("s1", Box::new(ScriptedListener::new(vec![])));
        session.start(VoiceConfig::default(), sink).await.unwrap();
        registry.insert(session).unwrap();
        assert!(registry.reap_idle().await.is_empty());
        assert!(registry.contains("s1"));
    }

    #[tokio::test]
    async fn sweeper_exits_when_registry_dropped() {
        let registry = Arc::new(
            VoiceSessionRegistry::new()
                .with_sweep_interval_secs(1)
                .with_idle_timeout_secs(1),
        );
        registry.start_sweeper();
        // Take the spawned handle out of the registry so we can await it
        // after dropping the last strong Arc — proves the loop exited
        // via the Weak::upgrade None branch and not because we forgot
        // to spawn.
        let handle = registry
            .sweep_handle
            .lock()
            .unwrap()
            .take()
            .expect("sweeper should be spawned");
        drop(registry);
        // The sweeper polls upgrade once per interval (1s) and exits
        // when it's None. 5s is plenty.
        tokio::time::timeout(std::time::Duration::from_secs(5), handle)
            .await
            .expect("sweeper task did not exit within 5s after registry drop")
            .expect("sweeper task panicked");
    }

    #[tokio::test]
    async fn start_sweeper_is_idempotent() {
        let registry = Arc::new(VoiceSessionRegistry::new());
        registry.start_sweeper();
        registry.start_sweeper();
        // Two calls produce one handle.
        assert!(registry.sweep_handle.lock().unwrap().is_some());
    }

    #[tokio::test]
    async fn registry_stop_removes_session() {
        let registry = VoiceSessionRegistry::new();
        let listener = Box::new(ScriptedListener::new(vec![VoiceEvent::SpeechStart]));
        let sink: Arc<CollectingSink> = Arc::new(CollectingSink::default());
        let session = VoiceSession::new("s1", listener);
        session
            .start(VoiceConfig::default(), sink.clone())
            .await
            .unwrap();
        registry.insert(session).unwrap();
        assert!(registry.contains("s1"));
        registry.stop("s1").await.unwrap();
        assert!(!registry.contains("s1"));
    }

    #[test]
    fn role_to_str_covers_all_variants() {
        assert_eq!(role_to_str(&TranscriptRole::EnrolledUser), "enrolled_user");
        assert_eq!(role_to_str(&TranscriptRole::Unknown), "unknown");
        assert_eq!(
            role_to_str(&TranscriptRole::OtherSpeaker {
                local_id: "alice".into()
            }),
            "other:alice"
        );
    }

    #[test]
    fn partial_event_serializes_with_text_and_duration() {
        let json = voice_event_to_json(&VoiceEvent::Partial {
            text: "hello wor".to_string(),
            duration_ms: 750,
        });
        assert!(json.contains(r#""type":"partial""#));
        assert!(json.contains(r#""text":"hello wor""#));
        assert!(json.contains(r#""duration_ms":750"#));
    }

    #[test]
    fn audio_chunk_emits_meta_only() {
        let json = voice_event_to_json(&VoiceEvent::AudioChunk {
            samples: vec![0i16; 480],
            sample_rate: 16000,
        });
        assert!(json.contains(r#""sample_rate":16000"#));
        assert!(json.contains(r#""frame_count":480"#));
        assert!(!json.contains("samples"));
    }

    #[test]
    fn enrollment_event_carries_path() {
        let json = voice_event_to_json(&VoiceEvent::EnrollmentCaptured {
            label: "alice".into(),
            save_path: PathBuf::from("/tmp/alice.toml"),
        });
        assert!(json.contains(r#""label":"alice""#));
        assert!(json.contains("/tmp/alice.toml"));
    }
}