ebur128_stream/sink.rs
1//! Optional `futures::Sink<Vec<f32>>` adapter for the analyzer.
2//!
3//! Enabled by the `tokio` Cargo feature. Pass owned `Vec<f32>` chunks
4//! through the sink and the analyzer ingests them as interleaved
5//! samples. The sink is always ready — pushing samples is synchronous
6//! and never blocks — but the trait conformance lets you slot the
7//! analyzer into a Tokio / Futures pipeline as a stream consumer:
8//!
9//! ```ignore
10//! use ebur128_stream::{AnalyzerBuilder, Channel, Mode, AnalyzerSink};
11//! use futures_util::SinkExt;
12//!
13//! let analyzer = AnalyzerBuilder::new()
14//! .sample_rate(48_000)
15//! .channels(&[Channel::Left, Channel::Right])
16//! .modes(Mode::Integrated | Mode::TruePeak)
17//! .build()?;
18//! let mut sink = AnalyzerSink::new(analyzer);
19//!
20//! let chunk: Vec<f32> = vec![0.0; 9_600];
21//! sink.send(chunk).await?;
22//! let report = sink.into_inner().unwrap().finalize();
23//! # Ok::<(), Box<dyn std::error::Error>>(())
24//! ```
25//!
26//! The crate does not depend on `tokio` itself — only `futures-sink`
27//! for the `Sink` trait. Use any executor that drives the sink.
28
29use core::pin::Pin;
30use core::task::{Context, Poll};
31
32use crate::analyzer::Analyzer;
33use crate::error::Error;
34
35/// A [`futures_sink::Sink`] that ingests `Vec<f32>` interleaved sample
36/// chunks into the wrapped [`Analyzer`].
37///
38/// The sink is permanently in the ready state; `send` and friends never
39/// block waiting for capacity. Errors from
40/// [`Analyzer::push_interleaved`] surface as `Sink::Error`.
41#[derive(Debug)]
42pub struct AnalyzerSink {
43 analyzer: Option<Analyzer>,
44}
45
46impl AnalyzerSink {
47 /// Wrap the analyzer.
48 #[must_use]
49 pub fn new(analyzer: Analyzer) -> Self {
50 Self {
51 analyzer: Some(analyzer),
52 }
53 }
54
55 /// Recover the inner analyzer, e.g. to call
56 /// [`Analyzer::finalize`].
57 ///
58 /// Returns `None` if the sink was already consumed.
59 pub fn into_inner(mut self) -> Option<Analyzer> {
60 self.analyzer.take()
61 }
62
63 /// Borrow the inner analyzer for snapshotting.
64 pub fn analyzer_mut(&mut self) -> Option<&mut Analyzer> {
65 self.analyzer.as_mut()
66 }
67}
68
69impl futures_sink::Sink<Vec<f32>> for AnalyzerSink {
70 type Error = Error;
71
72 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 Poll::Ready(Ok(()))
74 }
75
76 fn start_send(self: Pin<&mut Self>, item: Vec<f32>) -> Result<(), Self::Error> {
77 let this = self.get_mut();
78 if let Some(a) = this.analyzer.as_mut() {
79 a.push_interleaved::<f32>(&item)?;
80 }
81 Ok(())
82 }
83
84 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 Poll::Ready(Ok(()))
86 }
87
88 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89 Poll::Ready(Ok(()))
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use crate::{Channel, Mode};
97 use core::task::Waker;
98 use futures_sink::Sink;
99
100 #[test]
101 fn sink_ingests_chunks() {
102 let analyzer = crate::AnalyzerBuilder::new()
103 .sample_rate(48_000)
104 .channels(&[Channel::Left, Channel::Right])
105 .modes(Mode::Momentary)
106 .build()
107 .unwrap();
108 let mut sink = AnalyzerSink::new(analyzer);
109 // `Waker::noop` is stable since 1.85 — no unsafe needed.
110 let waker: &Waker = Waker::noop();
111 let mut cx = Context::from_waker(waker);
112
113 let mut s = Pin::new(&mut sink);
114 match s.as_mut().poll_ready(&mut cx) {
115 Poll::Ready(Ok(())) => {}
116 _ => panic!("AnalyzerSink is always ready"),
117 }
118 s.as_mut().start_send(vec![0.1f32; 9_600]).unwrap();
119
120 let a = sink.into_inner().unwrap();
121 let mut a = a;
122 let snap = a.snapshot();
123 assert!((snap.programme_duration_seconds() - 0.1).abs() < 1e-9);
124 }
125}