Skip to main content

wavekat_core/
audio_io.rs

1//! Audio source/sink traits.
2//!
3//! These are the seam the WaveKat audio pipeline is drawn against:
4//! whatever produces audio (a microphone, a TTS engine, a WAV file)
5//! implements [`AudioSource`]; whatever consumes it (a speaker, an
6//! RTP encoder, an ASR worker) implements [`AudioSink`]. Concrete
7//! impls live in the consuming crates — cpal-backed mic/speaker in
8//! `wavekat-voice`, a future agent-driven impl in `wavekat-agent`,
9//! and so on — so that adding a new producer or consumer is "implement
10//! the trait" rather than "rewrite the RTP path."
11//!
12//! The traits speak in [`AudioFrame<'static>`]: sample-rate-tagged
13//! frames so consumers can resample to whatever rate the codec wants
14//! without either side of the trait having to know the codec exists.
15
16use core::future::Future;
17
18use crate::AudioFrame;
19
20/// Produces owned [`AudioFrame`]s. `next_frame().await` returns the
21/// next frame when one is available, or `None` once the source has
22/// run out (file ended, device closed, dialogue terminated). Each
23/// frame's [`AudioFrame::sample_rate`] is set by the implementation —
24/// consumers resample as needed.
25pub trait AudioSource: Send {
26    fn next_frame(&mut self) -> impl Future<Output = Option<AudioFrame<'static>>> + Send;
27}
28
29/// Consumes audio frames. Implementations may drop frames on
30/// backpressure rather than block the caller; the alternative —
31/// stalling — is worse on the RTP receive path, where it stalls the
32/// whole pipeline.
33pub trait AudioSink: Send {
34    fn write_frame(&mut self, frame: AudioFrame<'_>) -> impl Future<Output = ()> + Send;
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40
41    /// In-memory sink: collects frames in a Vec.
42    #[derive(Default)]
43    struct VecSink {
44        frames: Vec<AudioFrame<'static>>,
45    }
46
47    impl AudioSink for VecSink {
48        async fn write_frame(&mut self, frame: AudioFrame<'_>) {
49            self.frames.push(frame.into_owned());
50        }
51    }
52
53    /// Source that yields a single frame then ends.
54    struct OnceSource {
55        frame: Option<AudioFrame<'static>>,
56    }
57
58    impl AudioSource for OnceSource {
59        async fn next_frame(&mut self) -> Option<AudioFrame<'static>> {
60            self.frame.take()
61        }
62    }
63
64    /// Source that drains a queue of pre-loaded frames, then signals
65    /// exhaustion with `None`. Mirrors what a file-backed or
66    /// agent-backed source looks like in practice.
67    struct QueueSource {
68        frames: std::collections::VecDeque<AudioFrame<'static>>,
69    }
70
71    impl AudioSource for QueueSource {
72        async fn next_frame(&mut self) -> Option<AudioFrame<'static>> {
73            self.frames.pop_front()
74        }
75    }
76
77    /// Sink that drops every frame past `cap`. Models the "drop on
78    /// backpressure" pattern the trait docs call out.
79    struct CapSink {
80        cap: usize,
81        frames: Vec<AudioFrame<'static>>,
82        dropped: usize,
83    }
84
85    impl AudioSink for CapSink {
86        async fn write_frame(&mut self, frame: AudioFrame<'_>) {
87            if self.frames.len() >= self.cap {
88                self.dropped += 1;
89                return;
90            }
91            self.frames.push(frame.into_owned());
92        }
93    }
94
95    #[tokio::test]
96    async fn traits_compose_end_to_end() {
97        let mut source = OnceSource {
98            frame: Some(AudioFrame::from_vec(vec![0.5, -0.5], 8000)),
99        };
100        let mut sink = VecSink::default();
101
102        let frame = source.next_frame().await.expect("frame");
103        sink.write_frame(frame).await;
104        assert!(source.next_frame().await.is_none());
105
106        assert_eq!(sink.frames.len(), 1);
107        assert_eq!(sink.frames[0].samples(), &[0.5, -0.5]);
108        assert_eq!(sink.frames[0].sample_rate(), 8000);
109    }
110
111    #[tokio::test]
112    async fn source_drains_in_order_then_returns_none() {
113        let mut source = QueueSource {
114            frames: vec![
115                AudioFrame::from_vec(vec![0.1], 8000),
116                AudioFrame::from_vec(vec![0.2], 8000),
117                AudioFrame::from_vec(vec![0.3], 8000),
118            ]
119            .into(),
120        };
121        let mut sink = VecSink::default();
122        while let Some(f) = source.next_frame().await {
123            sink.write_frame(f).await;
124        }
125        assert_eq!(sink.frames.len(), 3);
126        assert_eq!(sink.frames[0].samples(), &[0.1]);
127        assert_eq!(sink.frames[1].samples(), &[0.2]);
128        assert_eq!(sink.frames[2].samples(), &[0.3]);
129
130        // Past exhaustion the source must keep returning None — callers
131        // rely on this to unblock their drain loop.
132        assert!(source.next_frame().await.is_none());
133        assert!(source.next_frame().await.is_none());
134    }
135
136    #[tokio::test]
137    async fn sink_with_capacity_drops_overflow() {
138        let mut sink = CapSink {
139            cap: 2,
140            frames: Vec::new(),
141            dropped: 0,
142        };
143        for i in 0..5 {
144            sink.write_frame(AudioFrame::from_vec(vec![i as f32], 8000))
145                .await;
146        }
147        assert_eq!(sink.frames.len(), 2);
148        assert_eq!(sink.dropped, 3);
149        // Cap policy is FIFO-keep / tail-drop here; the first two went
150        // through, the rest got dropped.
151        assert_eq!(sink.frames[0].samples(), &[0.0]);
152        assert_eq!(sink.frames[1].samples(), &[1.0]);
153    }
154
155    #[tokio::test]
156    async fn frame_sample_rate_round_trips_through_sink() {
157        // Different impls emit at their native rates — the sink must
158        // preserve `sample_rate` so the consumer can resample
159        // downstream without losing the original rate.
160        let mut sink = VecSink::default();
161        sink.write_frame(AudioFrame::from_vec(vec![0.0], 8000))
162            .await;
163        sink.write_frame(AudioFrame::from_vec(vec![0.0], 16_000))
164            .await;
165        sink.write_frame(AudioFrame::from_vec(vec![0.0], 48_000))
166            .await;
167        let rates: Vec<u32> = sink.frames.iter().map(|f| f.sample_rate()).collect();
168        assert_eq!(rates, vec![8000, 16_000, 48_000]);
169    }
170
171    /// Compile-time check that the trait bounds (the trait is `Send`,
172    /// and so are the returned futures) actually compose. If anything
173    /// drops the `Send` bound, this test stops compiling — which is
174    /// what we want, because the RTP path holds these impls across
175    /// `.await` points in a multi-threaded runtime.
176    #[test]
177    fn impls_are_send() {
178        fn assert_send<T: Send>(_: &T) {}
179        let source = OnceSource { frame: None };
180        let sink = VecSink::default();
181        assert_send(&source);
182        assert_send(&sink);
183    }
184}