Skip to main content

audio_engine_core/decoder/
streaming.rs

1use std::path::Path;
2
3use symphonia::core::audio::SampleBuffer;
4use symphonia::core::codecs::{DecoderOptions, CODEC_TYPE_NULL};
5use symphonia::core::formats::FormatOptions;
6use symphonia::core::meta::MetadataOptions;
7
8use super::error::{DecodeCancelToken, DecoderError};
9use super::metadata::{extract_metadata, merge_metadata_revision, AudioInfo};
10use super::source::{
11    bytes_to_mib, configured_decode_memory_limit, open_media_source, HttpCredentials,
12    F64_SAMPLE_BYTES,
13};
14
15/// Streaming audio decoder using Symphonia.
16pub struct StreamingDecoder {
17    format_reader: Box<dyn symphonia::core::formats::FormatReader>,
18    decoder: Box<dyn symphonia::core::codecs::Decoder>,
19    track_id: u32,
20    pub info: AudioInfo,
21    sample_buf: Option<SampleBuffer<f64>>,
22    samples_output: u64,
23    finished: bool,
24    cancel_token: Option<DecodeCancelToken>,
25}
26
27impl StreamingDecoder {
28    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DecoderError> {
29        Self::open_with_credentials(path, None)
30    }
31
32    pub fn open_with_credentials<P: AsRef<Path>>(
33        path: P,
34        credentials: Option<&HttpCredentials>,
35    ) -> Result<Self, DecoderError> {
36        Self::open_with_credentials_and_cancel(path, credentials, None)
37    }
38
39    pub fn open_with_credentials_and_cancel<P: AsRef<Path>>(
40        path: P,
41        credentials: Option<&HttpCredentials>,
42        cancel_token: Option<DecodeCancelToken>,
43    ) -> Result<Self, DecoderError> {
44        let (mss, hint) = open_media_source(path.as_ref(), credentials, cancel_token.clone())?;
45
46        let format_opts = FormatOptions::default();
47        let metadata_opts = MetadataOptions::default();
48        let mut probed = symphonia::default::get_probe()
49            .format(&hint, mss, &format_opts, &metadata_opts)
50            .map_err(|e| DecoderError::Probe(e.to_string()))?;
51
52        let mut metadata = extract_metadata(&mut probed);
53
54        let mut format_reader = probed.format;
55        if let Some(revision) = format_reader.metadata().current() {
56            merge_metadata_revision(&mut metadata, revision);
57        }
58
59        let track = format_reader
60            .tracks()
61            .iter()
62            .find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
63            .ok_or(DecoderError::NoAudioTrack)?;
64
65        let track_id = track.id;
66        let codec_params = &track.codec_params;
67        let sample_rate = codec_params.sample_rate.unwrap_or(44100);
68        let channels = codec_params.channels.map(|c| c.count()).unwrap_or(2);
69        let bits_per_sample = codec_params.bits_per_sample;
70        let total_frames = codec_params.n_frames;
71        let duration_secs = total_frames.map(|f| f as f64 / sample_rate as f64);
72        let encoder_delay = codec_params.delay.unwrap_or(0);
73        let end_padding = codec_params.padding.unwrap_or(0);
74
75        if encoder_delay > 0 || end_padding > 0 {
76            log::debug!(
77                "Codec delay compensation: delay={}, padding={} samples",
78                encoder_delay,
79                end_padding
80            );
81        }
82
83        let info = AudioInfo {
84            sample_rate,
85            channels,
86            bits_per_sample,
87            total_frames,
88            duration_secs,
89            encoder_delay,
90            end_padding,
91            metadata,
92        };
93
94        let decoder_opts = DecoderOptions::default();
95        let decoder = symphonia::default::get_codecs()
96            .make(codec_params, &decoder_opts)
97            .map_err(|e| DecoderError::Decoder(e.to_string()))?;
98
99        log::info!(
100            "Opened audio source: {} Hz, {} ch, {:?}s",
101            sample_rate,
102            channels,
103            duration_secs
104        );
105
106        Ok(Self {
107            format_reader,
108            decoder,
109            track_id,
110            info,
111            sample_buf: None,
112            samples_output: 0,
113            finished: false,
114            cancel_token,
115        })
116    }
117
118    pub fn decode_next_into(&mut self, out: &mut Vec<f64>) -> Result<Option<usize>, DecoderError> {
119        if self.finished {
120            return Ok(None);
121        }
122        if self
123            .cancel_token
124            .as_ref()
125            .is_some_and(DecodeCancelToken::is_cancelled)
126        {
127            return Err(DecoderError::Canceled);
128        }
129
130        loop {
131            if self
132                .cancel_token
133                .as_ref()
134                .is_some_and(DecodeCancelToken::is_cancelled)
135            {
136                return Err(DecoderError::Canceled);
137            }
138            let packet = match self.format_reader.next_packet() {
139                Ok(p) => p,
140                Err(symphonia::core::errors::Error::IoError(e))
141                    if e.kind() == std::io::ErrorKind::UnexpectedEof =>
142                {
143                    self.finished = true;
144                    return Ok(None);
145                }
146                Err(symphonia::core::errors::Error::IoError(e))
147                    if e.kind() == std::io::ErrorKind::Interrupted =>
148                {
149                    return Err(DecoderError::Canceled);
150                }
151                Err(e) => return Err(DecoderError::Decoder(e.to_string())),
152            };
153
154            if packet.track_id() != self.track_id {
155                continue;
156            }
157
158            let decoded = match self.decoder.decode(&packet) {
159                Ok(d) => d,
160                Err(symphonia::core::errors::Error::DecodeError(_)) => continue,
161                Err(e) => return Err(DecoderError::Decoder(e.to_string())),
162            };
163
164            let spec = *decoded.spec();
165            let duration = decoded.capacity();
166
167            if self
168                .sample_buf
169                .as_ref()
170                .is_none_or(|buffer| buffer.capacity() < duration)
171            {
172                self.sample_buf = Some(SampleBuffer::new(duration as u64, spec));
173            }
174
175            let Some(sample_buf) = self.sample_buf.as_mut() else {
176                return Err(DecoderError::Decoder(
177                    "Failed to allocate decoder sample buffer".to_string(),
178                ));
179            };
180            sample_buf.copy_interleaved_ref(decoded);
181
182            let samples = sample_buf.samples();
183            let channels = self.info.channels;
184            let mut start = 0;
185            let mut end = samples.len();
186
187            let delay_frames = self.info.encoder_delay as u64;
188            let delay_samples = delay_frames * channels as u64;
189            if self.samples_output < delay_samples {
190                let skip = (delay_samples - self.samples_output).min(end as u64) as usize;
191                start += skip;
192                self.samples_output += skip as u64;
193                if start == end {
194                    continue;
195                }
196            }
197
198            let total_frames = self.info.total_frames.unwrap_or(u64::MAX);
199            let padding_frames = self.info.end_padding as u64;
200            let effective_total = total_frames.saturating_sub(padding_frames);
201            let current_frame = self.samples_output / channels as u64;
202            let frames_in_chunk = (end - start) / channels;
203
204            if current_frame + frames_in_chunk as u64 > effective_total {
205                let frames_to_keep = effective_total.saturating_sub(current_frame) as usize;
206                if frames_to_keep == 0 {
207                    self.finished = true;
208                    return Ok(None);
209                }
210                end = start + frames_to_keep * channels;
211            }
212
213            let appended = end - start;
214            out.extend_from_slice(&samples[start..end]);
215            self.samples_output += appended as u64;
216            return Ok(Some(appended));
217        }
218    }
219
220    pub fn decode_next(&mut self) -> Result<Option<Vec<f64>>, DecoderError> {
221        let mut samples = Vec::new();
222        match self.decode_next_into(&mut samples)? {
223            Some(_) => Ok(Some(samples)),
224            None => Ok(None),
225        }
226    }
227
228    pub fn decode_all(&mut self) -> Result<Vec<f64>, DecoderError> {
229        let (max_memory_mb, max_memory_bytes) = configured_decode_memory_limit();
230
231        let initial_capacity = if let Some(total_frames) = self.info.total_frames {
232            let estimated_bytes = total_frames as usize * self.info.channels * F64_SAMPLE_BYTES;
233            if estimated_bytes > max_memory_bytes {
234                let estimated_mb = bytes_to_mib(estimated_bytes);
235                return Err(DecoderError::Decoder(format!(
236                    "File too large to decode into memory: estimated {} MB (limit: {} MB). \
237                     Use streaming mode instead or increase DECODE_MAX_MEMORY_MB env var.",
238                    estimated_mb, max_memory_mb
239                )));
240            }
241
242            let total_samples = total_frames as usize * self.info.channels;
243            log::info!(
244                "Pre-allocating buffer for {} samples (~{} MB)",
245                total_samples,
246                bytes_to_mib(total_samples * F64_SAMPLE_BYTES)
247            );
248            total_samples
249        } else {
250            0
251        };
252
253        let mut all_samples = Vec::with_capacity(initial_capacity);
254        while self.decode_next_into(&mut all_samples)?.is_some() {
255            let current_bytes = all_samples.len() * F64_SAMPLE_BYTES;
256            if current_bytes > max_memory_bytes {
257                let current_mb = bytes_to_mib(current_bytes);
258                return Err(DecoderError::Decoder(format!(
259                    "Memory limit exceeded during decode: {} MB (limit: {} MB). \
260                     File may be corrupted or extremely long.",
261                    current_mb, max_memory_mb
262                )));
263            }
264        }
265
266        let delay_trimmed = self.info.encoder_delay;
267        let padding_trimmed = self.info.end_padding;
268
269        if delay_trimmed > 0 || padding_trimmed > 0 {
270            log::info!(
271                "Decoded {} samples (trimmed {} delay + {} padding for gapless)",
272                all_samples.len(),
273                delay_trimmed,
274                padding_trimmed
275            );
276        } else {
277            log::info!("Decoded {} total samples (f64)", all_samples.len());
278        }
279
280        Ok(all_samples)
281    }
282
283    pub fn seek(&mut self, time_secs: f64) -> Result<(), DecoderError> {
284        use symphonia::core::formats::SeekTo;
285        use symphonia::core::units::Time;
286
287        let seek_to = SeekTo::Time {
288            time: Time::from(time_secs),
289            track_id: Some(self.track_id),
290        };
291
292        self.format_reader
293            .seek(symphonia::core::formats::SeekMode::Coarse, seek_to)
294            .map_err(|e| DecoderError::Decoder(e.to_string()))?;
295
296        self.decoder.reset();
297        self.finished = false;
298        self.samples_output = 0;
299
300        Ok(())
301    }
302}