audio_engine_core/decoder/
streaming.rs1use std::path::Path;
2
3use symphonia::core::audio::SampleBuffer;
4use symphonia::core::codecs::{DecoderOptions, CODEC_TYPE_NULL};
5use symphonia::core::formats::FormatOptions;
6use symphonia::core::meta::MetadataOptions;
7
8use super::error::{DecodeCancelToken, DecoderError};
9use super::metadata::{extract_metadata, merge_metadata_revision, AudioInfo};
10use super::source::{
11 bytes_to_mib, configured_decode_memory_limit, open_media_source, HttpCredentials,
12 F64_SAMPLE_BYTES,
13};
14
15pub struct StreamingDecoder {
17 format_reader: Box<dyn symphonia::core::formats::FormatReader>,
18 decoder: Box<dyn symphonia::core::codecs::Decoder>,
19 track_id: u32,
20 pub info: AudioInfo,
21 sample_buf: Option<SampleBuffer<f64>>,
22 samples_output: u64,
23 finished: bool,
24 cancel_token: Option<DecodeCancelToken>,
25}
26
27impl StreamingDecoder {
28 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DecoderError> {
29 Self::open_with_credentials(path, None)
30 }
31
32 pub fn open_with_credentials<P: AsRef<Path>>(
33 path: P,
34 credentials: Option<&HttpCredentials>,
35 ) -> Result<Self, DecoderError> {
36 Self::open_with_credentials_and_cancel(path, credentials, None)
37 }
38
39 pub fn open_with_credentials_and_cancel<P: AsRef<Path>>(
40 path: P,
41 credentials: Option<&HttpCredentials>,
42 cancel_token: Option<DecodeCancelToken>,
43 ) -> Result<Self, DecoderError> {
44 let (mss, hint) = open_media_source(path.as_ref(), credentials, cancel_token.clone())?;
45
46 let format_opts = FormatOptions::default();
47 let metadata_opts = MetadataOptions::default();
48 let mut probed = symphonia::default::get_probe()
49 .format(&hint, mss, &format_opts, &metadata_opts)
50 .map_err(|e| DecoderError::Probe(e.to_string()))?;
51
52 let mut metadata = extract_metadata(&mut probed);
53
54 let mut format_reader = probed.format;
55 if let Some(revision) = format_reader.metadata().current() {
56 merge_metadata_revision(&mut metadata, revision);
57 }
58
59 let track = format_reader
60 .tracks()
61 .iter()
62 .find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
63 .ok_or(DecoderError::NoAudioTrack)?;
64
65 let track_id = track.id;
66 let codec_params = &track.codec_params;
67 let sample_rate = codec_params.sample_rate.unwrap_or(44100);
68 let channels = codec_params.channels.map(|c| c.count()).unwrap_or(2);
69 let bits_per_sample = codec_params.bits_per_sample;
70 let total_frames = codec_params.n_frames;
71 let duration_secs = total_frames.map(|f| f as f64 / sample_rate as f64);
72 let encoder_delay = codec_params.delay.unwrap_or(0);
73 let end_padding = codec_params.padding.unwrap_or(0);
74
75 if encoder_delay > 0 || end_padding > 0 {
76 log::debug!(
77 "Codec delay compensation: delay={}, padding={} samples",
78 encoder_delay,
79 end_padding
80 );
81 }
82
83 let info = AudioInfo {
84 sample_rate,
85 channels,
86 bits_per_sample,
87 total_frames,
88 duration_secs,
89 encoder_delay,
90 end_padding,
91 metadata,
92 };
93
94 let decoder_opts = DecoderOptions::default();
95 let decoder = symphonia::default::get_codecs()
96 .make(codec_params, &decoder_opts)
97 .map_err(|e| DecoderError::Decoder(e.to_string()))?;
98
99 log::info!(
100 "Opened audio source: {} Hz, {} ch, {:?}s",
101 sample_rate,
102 channels,
103 duration_secs
104 );
105
106 Ok(Self {
107 format_reader,
108 decoder,
109 track_id,
110 info,
111 sample_buf: None,
112 samples_output: 0,
113 finished: false,
114 cancel_token,
115 })
116 }
117
118 pub fn decode_next_into(&mut self, out: &mut Vec<f64>) -> Result<Option<usize>, DecoderError> {
119 if self.finished {
120 return Ok(None);
121 }
122 if self
123 .cancel_token
124 .as_ref()
125 .is_some_and(DecodeCancelToken::is_cancelled)
126 {
127 return Err(DecoderError::Canceled);
128 }
129
130 loop {
131 if self
132 .cancel_token
133 .as_ref()
134 .is_some_and(DecodeCancelToken::is_cancelled)
135 {
136 return Err(DecoderError::Canceled);
137 }
138 let packet = match self.format_reader.next_packet() {
139 Ok(p) => p,
140 Err(symphonia::core::errors::Error::IoError(e))
141 if e.kind() == std::io::ErrorKind::UnexpectedEof =>
142 {
143 self.finished = true;
144 return Ok(None);
145 }
146 Err(symphonia::core::errors::Error::IoError(e))
147 if e.kind() == std::io::ErrorKind::Interrupted =>
148 {
149 return Err(DecoderError::Canceled);
150 }
151 Err(e) => return Err(DecoderError::Decoder(e.to_string())),
152 };
153
154 if packet.track_id() != self.track_id {
155 continue;
156 }
157
158 let decoded = match self.decoder.decode(&packet) {
159 Ok(d) => d,
160 Err(symphonia::core::errors::Error::DecodeError(_)) => continue,
161 Err(e) => return Err(DecoderError::Decoder(e.to_string())),
162 };
163
164 let spec = *decoded.spec();
165 let duration = decoded.capacity();
166
167 if self
168 .sample_buf
169 .as_ref()
170 .is_none_or(|buffer| buffer.capacity() < duration)
171 {
172 self.sample_buf = Some(SampleBuffer::new(duration as u64, spec));
173 }
174
175 let Some(sample_buf) = self.sample_buf.as_mut() else {
176 return Err(DecoderError::Decoder(
177 "Failed to allocate decoder sample buffer".to_string(),
178 ));
179 };
180 sample_buf.copy_interleaved_ref(decoded);
181
182 let samples = sample_buf.samples();
183 let channels = self.info.channels;
184 let mut start = 0;
185 let mut end = samples.len();
186
187 let delay_frames = self.info.encoder_delay as u64;
188 let delay_samples = delay_frames * channels as u64;
189 if self.samples_output < delay_samples {
190 let skip = (delay_samples - self.samples_output).min(end as u64) as usize;
191 start += skip;
192 self.samples_output += skip as u64;
193 if start == end {
194 continue;
195 }
196 }
197
198 let total_frames = self.info.total_frames.unwrap_or(u64::MAX);
199 let padding_frames = self.info.end_padding as u64;
200 let effective_total = total_frames.saturating_sub(padding_frames);
201 let current_frame = self.samples_output / channels as u64;
202 let frames_in_chunk = (end - start) / channels;
203
204 if current_frame + frames_in_chunk as u64 > effective_total {
205 let frames_to_keep = effective_total.saturating_sub(current_frame) as usize;
206 if frames_to_keep == 0 {
207 self.finished = true;
208 return Ok(None);
209 }
210 end = start + frames_to_keep * channels;
211 }
212
213 let appended = end - start;
214 out.extend_from_slice(&samples[start..end]);
215 self.samples_output += appended as u64;
216 return Ok(Some(appended));
217 }
218 }
219
220 pub fn decode_next(&mut self) -> Result<Option<Vec<f64>>, DecoderError> {
221 let mut samples = Vec::new();
222 match self.decode_next_into(&mut samples)? {
223 Some(_) => Ok(Some(samples)),
224 None => Ok(None),
225 }
226 }
227
228 pub fn decode_all(&mut self) -> Result<Vec<f64>, DecoderError> {
229 let (max_memory_mb, max_memory_bytes) = configured_decode_memory_limit();
230
231 let initial_capacity = if let Some(total_frames) = self.info.total_frames {
232 let estimated_bytes = total_frames as usize * self.info.channels * F64_SAMPLE_BYTES;
233 if estimated_bytes > max_memory_bytes {
234 let estimated_mb = bytes_to_mib(estimated_bytes);
235 return Err(DecoderError::Decoder(format!(
236 "File too large to decode into memory: estimated {} MB (limit: {} MB). \
237 Use streaming mode instead or increase DECODE_MAX_MEMORY_MB env var.",
238 estimated_mb, max_memory_mb
239 )));
240 }
241
242 let total_samples = total_frames as usize * self.info.channels;
243 log::info!(
244 "Pre-allocating buffer for {} samples (~{} MB)",
245 total_samples,
246 bytes_to_mib(total_samples * F64_SAMPLE_BYTES)
247 );
248 total_samples
249 } else {
250 0
251 };
252
253 let mut all_samples = Vec::with_capacity(initial_capacity);
254 while self.decode_next_into(&mut all_samples)?.is_some() {
255 let current_bytes = all_samples.len() * F64_SAMPLE_BYTES;
256 if current_bytes > max_memory_bytes {
257 let current_mb = bytes_to_mib(current_bytes);
258 return Err(DecoderError::Decoder(format!(
259 "Memory limit exceeded during decode: {} MB (limit: {} MB). \
260 File may be corrupted or extremely long.",
261 current_mb, max_memory_mb
262 )));
263 }
264 }
265
266 let delay_trimmed = self.info.encoder_delay;
267 let padding_trimmed = self.info.end_padding;
268
269 if delay_trimmed > 0 || padding_trimmed > 0 {
270 log::info!(
271 "Decoded {} samples (trimmed {} delay + {} padding for gapless)",
272 all_samples.len(),
273 delay_trimmed,
274 padding_trimmed
275 );
276 } else {
277 log::info!("Decoded {} total samples (f64)", all_samples.len());
278 }
279
280 Ok(all_samples)
281 }
282
283 pub fn seek(&mut self, time_secs: f64) -> Result<(), DecoderError> {
284 use symphonia::core::formats::SeekTo;
285 use symphonia::core::units::Time;
286
287 let seek_to = SeekTo::Time {
288 time: Time::from(time_secs),
289 track_id: Some(self.track_id),
290 };
291
292 self.format_reader
293 .seek(symphonia::core::formats::SeekMode::Coarse, seek_to)
294 .map_err(|e| DecoderError::Decoder(e.to_string()))?;
295
296 self.decoder.reset();
297 self.finished = false;
298 self.samples_output = 0;
299
300 Ok(())
301 }
302}