Skip to main content

ebur128_stream/
sink.rs

1//! Optional `futures::Sink<Vec<f32>>` adapter for the analyzer.
2//!
3//! Enabled by the `tokio` Cargo feature. Pass owned `Vec<f32>` chunks
4//! through the sink and the analyzer ingests them as interleaved
5//! samples. The sink is always ready — pushing samples is synchronous
6//! and never blocks — but the trait conformance lets you slot the
7//! analyzer into a Tokio / Futures pipeline as a stream consumer:
8//!
9//! ```ignore
10//! use ebur128_stream::{AnalyzerBuilder, Channel, Mode, AnalyzerSink};
11//! use futures_util::SinkExt;
12//!
13//! let analyzer = AnalyzerBuilder::new()
14//!     .sample_rate(48_000)
15//!     .channels(&[Channel::Left, Channel::Right])
16//!     .modes(Mode::Integrated | Mode::TruePeak)
17//!     .build()?;
18//! let mut sink = AnalyzerSink::new(analyzer);
19//!
20//! let chunk: Vec<f32> = vec![0.0; 9_600];
21//! sink.send(chunk).await?;
22//! let report = sink.into_inner().unwrap().finalize();
23//! # Ok::<(), Box<dyn std::error::Error>>(())
24//! ```
25//!
26//! The crate does not depend on `tokio` itself — only `futures-sink`
27//! for the `Sink` trait. Use any executor that drives the sink.
28
29use core::pin::Pin;
30use core::task::{Context, Poll};
31
32use crate::analyzer::Analyzer;
33use crate::error::Error;
34
35/// A [`futures_sink::Sink`] that ingests `Vec<f32>` interleaved sample
36/// chunks into the wrapped [`Analyzer`].
37///
38/// The sink is permanently in the ready state; `send` and friends never
39/// block waiting for capacity. Errors from
40/// [`Analyzer::push_interleaved`] surface as `Sink::Error`.
41#[derive(Debug)]
42pub struct AnalyzerSink {
43    analyzer: Option<Analyzer>,
44}
45
46impl AnalyzerSink {
47    /// Wrap the analyzer.
48    #[must_use]
49    pub fn new(analyzer: Analyzer) -> Self {
50        Self {
51            analyzer: Some(analyzer),
52        }
53    }
54
55    /// Recover the inner analyzer, e.g. to call
56    /// [`Analyzer::finalize`].
57    ///
58    /// Returns `None` if the sink was already consumed.
59    pub fn into_inner(mut self) -> Option<Analyzer> {
60        self.analyzer.take()
61    }
62
63    /// Borrow the inner analyzer for snapshotting.
64    pub fn analyzer_mut(&mut self) -> Option<&mut Analyzer> {
65        self.analyzer.as_mut()
66    }
67}
68
69impl futures_sink::Sink<Vec<f32>> for AnalyzerSink {
70    type Error = Error;
71
72    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        Poll::Ready(Ok(()))
74    }
75
76    fn start_send(self: Pin<&mut Self>, item: Vec<f32>) -> Result<(), Self::Error> {
77        let this = self.get_mut();
78        if let Some(a) = this.analyzer.as_mut() {
79            a.push_interleaved::<f32>(&item)?;
80        }
81        Ok(())
82    }
83
84    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        Poll::Ready(Ok(()))
86    }
87
88    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        Poll::Ready(Ok(()))
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96    use crate::{Channel, Mode};
97    use core::task::Waker;
98    use futures_sink::Sink;
99
100    #[test]
101    fn sink_ingests_chunks() {
102        let analyzer = crate::AnalyzerBuilder::new()
103            .sample_rate(48_000)
104            .channels(&[Channel::Left, Channel::Right])
105            .modes(Mode::Momentary)
106            .build()
107            .unwrap();
108        let mut sink = AnalyzerSink::new(analyzer);
109        // `Waker::noop` is stable since 1.85 — no unsafe needed.
110        let waker: &Waker = Waker::noop();
111        let mut cx = Context::from_waker(waker);
112
113        let mut s = Pin::new(&mut sink);
114        match s.as_mut().poll_ready(&mut cx) {
115            Poll::Ready(Ok(())) => {}
116            _ => panic!("AnalyzerSink is always ready"),
117        }
118        s.as_mut().start_send(vec![0.1f32; 9_600]).unwrap();
119
120        let a = sink.into_inner().unwrap();
121        let mut a = a;
122        let snap = a.snapshot();
123        assert!((snap.programme_duration_seconds() - 0.1).abs() < 1e-9);
124    }
125}