creek_core/read/read_stream.rs
1use rtrb::{Consumer, Producer, RingBuffer};
2use std::path::PathBuf;
3
4use super::data::{DataBlockCacheEntry, DataBlockEntry};
5use super::error::{FatalReadError, ReadError};
6use super::{
7 ClientToServerMsg, DataBlock, Decoder, HeapData, ReadData, ReadServer, ReadStreamOptions,
8 ServerToClientMsg,
9};
10use crate::read::server::ReadServerOptions;
11use crate::{FileInfo, SERVER_WAIT_TIME};
12
13/// Describes how to search for suitable caches when seeking in a [`ReadDiskStream`].
14///
15/// If a suitable cache is found, then reading can resume immediately. If not, then
16/// the stream will need to buffer before it can read data. In this case, you may
17/// decide to either continue reading (which will return silence) or to pause
18/// playback temporarily.
19///
20/// [`ReadDiskStream`]: struct.ReadDiskStream.html
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum SeekMode {
23 /// Automatically search for a suitable cache to use. This is the default mode.
24 #[default]
25 Auto,
26 /// Only try one cache with the given index. If you already know a suitable cache,
27 /// this can be more performant than searching each cache individually.
28 TryOne(usize),
29 /// Try the given cache with the given index, and if it is not suitable, automatically
30 /// search for a suitable one. If you already know a suitable cache, this can be
31 /// more performant than searching each cache individually.
32 TryOneThenAuto(usize),
33 /// Seek without searching for a suitable cache. This **will** cause the stream
34 /// to buffer.
35 NoCache,
36}
37
38struct ReadDiskStreamOptions<D: Decoder> {
39 start_frame: usize,
40 num_cache_blocks: usize,
41 num_look_ahead_blocks: usize,
42 max_num_caches: usize,
43 block_size: usize,
44 file_info: FileInfo<D::FileParams>,
45}
46
47/// A realtime-safe disk-streaming reader of audio files.
48pub struct ReadDiskStream<D: Decoder> {
49 to_server_tx: Producer<ClientToServerMsg<D>>,
50 from_server_rx: Consumer<ServerToClientMsg<D>>,
51 close_signal_tx: Producer<Option<HeapData<D::T>>>,
52
53 heap_data: Option<HeapData<D::T>>,
54
55 current_block_index: usize,
56 next_block_index: usize,
57 current_block_start_frame: usize,
58 current_frame_in_block: usize,
59
60 temp_cache_index: usize,
61 temp_seek_cache_index: usize,
62
63 num_prefetch_blocks: usize,
64 prefetch_size: usize,
65 cache_size: usize,
66 block_size: usize,
67
68 file_info: FileInfo<D::FileParams>,
69 fatal_error: bool,
70}
71
72impl<D: Decoder> ReadDiskStream<D> {
73 /// Open a new realtime-safe disk-streaming reader.
74 ///
75 /// * `file` - The path to the file to open.
76 /// * `start_frame` - The frame in the file to start reading from.
77 /// * `stream_opts` - Additional stream options.
78 ///
79 /// # Panics
80 ///
81 /// This will panic if `stream_block_size`, `stream_num_look_ahead_blocks`,
82 /// or `stream_server_msg_channel_size` is `0`.
83 pub fn new<P: Into<PathBuf>>(
84 file: P,
85 start_frame: usize,
86 stream_opts: ReadStreamOptions<D>,
87 ) -> Result<ReadDiskStream<D>, D::OpenError> {
88 let ReadStreamOptions {
89 num_cache_blocks,
90 num_caches,
91 additional_opts,
92 num_look_ahead_blocks,
93 block_size,
94 server_msg_channel_size,
95 } = stream_opts;
96
97 assert_ne!(block_size, 0);
98 assert_ne!(num_look_ahead_blocks, 0);
99 assert_ne!(server_msg_channel_size, Some(0));
100
101 // Reserve ample space for the message channels.
102 let msg_channel_size = server_msg_channel_size
103 .unwrap_or(((num_cache_blocks + num_look_ahead_blocks) * 4) + (num_caches * 4) + 8);
104
105 let (to_server_tx, from_client_rx) =
106 RingBuffer::<ClientToServerMsg<D>>::new(msg_channel_size);
107 let (to_client_tx, from_server_rx) =
108 RingBuffer::<ServerToClientMsg<D>>::new(msg_channel_size);
109
110 // Create dedicated close signal.
111 let (close_signal_tx, close_signal_rx) = RingBuffer::<Option<HeapData<D::T>>>::new(1);
112
113 let file: PathBuf = file.into();
114
115 match ReadServer::spawn(
116 ReadServerOptions {
117 file,
118 start_frame,
119 num_prefetch_blocks: num_cache_blocks + num_look_ahead_blocks,
120 block_size,
121 additional_opts,
122 },
123 to_client_tx,
124 from_client_rx,
125 close_signal_rx,
126 ) {
127 Ok(file_info) => {
128 let client = ReadDiskStream::create(
129 ReadDiskStreamOptions {
130 start_frame,
131 num_cache_blocks,
132 num_look_ahead_blocks,
133 max_num_caches: num_caches,
134 block_size,
135 file_info,
136 },
137 to_server_tx,
138 from_server_rx,
139 close_signal_tx,
140 );
141
142 Ok(client)
143 }
144 Err(e) => Err(e),
145 }
146 }
147
148 fn create(
149 opts: ReadDiskStreamOptions<D>,
150 to_server_tx: Producer<ClientToServerMsg<D>>,
151 from_server_rx: Consumer<ServerToClientMsg<D>>,
152 close_signal_tx: Producer<Option<HeapData<D::T>>>,
153 ) -> Self {
154 let ReadDiskStreamOptions {
155 start_frame,
156 num_cache_blocks,
157 num_look_ahead_blocks,
158 max_num_caches,
159 block_size,
160 file_info,
161 } = opts;
162
163 let num_prefetch_blocks = num_cache_blocks + num_look_ahead_blocks;
164
165 let read_buffer = DataBlock::new(usize::from(file_info.num_channels), block_size);
166
167 // Reserve the last two caches as temporary caches.
168 let max_num_caches = max_num_caches + 2;
169
170 let mut caches: Vec<DataBlockCacheEntry<D::T>> = Vec::with_capacity(max_num_caches);
171 for _ in 0..max_num_caches {
172 caches.push(DataBlockCacheEntry {
173 cache: None,
174 wanted_start_frame: 0,
175 });
176 }
177
178 let temp_cache_index = max_num_caches - 1;
179 let temp_seek_cache_index = max_num_caches - 2;
180
181 let mut prefetch_buffer: Vec<DataBlockEntry<D::T>> =
182 Vec::with_capacity(num_prefetch_blocks);
183 let mut wanted_start_frame = start_frame;
184 for _ in 0..num_prefetch_blocks {
185 prefetch_buffer.push(DataBlockEntry {
186 use_cache_index: None,
187 block: None,
188 wanted_start_frame,
189 });
190
191 wanted_start_frame += block_size;
192 }
193
194 let heap_data = Some(HeapData {
195 read_buffer,
196 prefetch_buffer,
197 caches,
198 });
199
200 Self {
201 to_server_tx,
202 from_server_rx,
203 close_signal_tx,
204
205 heap_data,
206
207 current_block_index: 0,
208 next_block_index: 1,
209 current_block_start_frame: start_frame,
210 current_frame_in_block: 0,
211
212 temp_cache_index,
213 temp_seek_cache_index,
214
215 num_prefetch_blocks,
216 prefetch_size: num_prefetch_blocks * block_size,
217 cache_size: num_cache_blocks * block_size,
218 block_size,
219
220 file_info,
221 fatal_error: false,
222 }
223 }
224
225 /// Return the total number of caches available in this stream.
226 ///
227 /// This is realtime-safe.
228 pub fn num_caches(&self) -> usize {
229 // This check should never fail because it can only be `None` in the destructor.
230 if let Some(heap) = &self.heap_data {
231 heap.caches.len() - 2
232 } else {
233 0
234 }
235 }
236
237 /// Returns whether a cache can be moved seamlessly without silencing current playback (true)
238 /// or not (false).
239 ///
240 /// This is realtime-safe.
241 ///
242 /// If the position of a cache is changed while the playback stream is currently relying on it,
243 /// then it will attempt to store the cache in a temporary buffer to allow playback to resume
244 /// seamlessly.
245 ///
246 /// However, in the case where the cache is moved multiple times in quick succession while being
247 /// relied on, then any blocks relying on the oldest cache will be silenced. In this case, (false)
248 /// will be returned.
249 pub fn can_move_cache(&mut self, cache_index: usize) -> bool {
250 let Some(heap) = self.heap_data.as_ref() else {
251 // This will never return here because `heap_data` can only be `None` in the destructor.
252 return false;
253 };
254
255 let mut using_cache = false;
256 let mut using_temp_cache = false;
257 for block in &heap.prefetch_buffer {
258 if let Some(index) = block.use_cache_index {
259 if index == cache_index {
260 using_cache = true;
261 } else if index == self.temp_cache_index {
262 using_temp_cache = true;
263 }
264 }
265 }
266
267 !(using_cache && using_temp_cache)
268 }
269
270 /// Request to cache a new area in the file.
271 ///
272 /// This is realtime-safe.
273 ///
274 /// * `cache_index` - The index of the cache to use. Use `ReadDiskStream::num_caches()` to see
275 /// how many caches have been assigned to this stream.
276 /// * `start_frame` - The frame in the file to start filling in the cache from. If any portion lies
277 /// outside the end of the file, then that portion will be ignored.
278 ///
279 /// If the cache already exists, then it will be overwritten. If the cache already starts from this
280 /// position, then nothing will be done and (false) will be returned. Otherwise, (true) will be
281 /// returned.
282 ///
283 /// In the case where the position of a cache is changed while the playback stream is currently
284 /// relying on it, then it will attempt to store the cache in a temporary buffer to allow playback
285 /// to resume seamlessly.
286 ///
287 /// However, in the case where the cache is moved multiple times in quick succession while being
288 /// relied on, then any blocks relying on the oldest cache will be silenced. See
289 /// `ReadDiskStream::can_move_cache()` to check if a cache can be seamlessly moved first.
290 pub fn cache(
291 &mut self,
292 cache_index: usize,
293 start_frame: usize,
294 ) -> Result<bool, ReadError<D::FatalError>> {
295 if self.fatal_error {
296 return Err(ReadError::FatalError(FatalReadError::StreamClosed));
297 }
298
299 let Some(heap) = self.heap_data.as_mut() else {
300 // This will never return here because `heap_data` can only be `None` in the destructor.
301 return Ok(false);
302 };
303
304 if cache_index >= heap.caches.len() - 2 {
305 return Err(ReadError::CacheIndexOutOfRange {
306 index: cache_index,
307 num_caches: heap.caches.len() - 2,
308 });
309 }
310
311 if start_frame != heap.caches[cache_index].wanted_start_frame
312 || heap.caches[cache_index].cache.is_none()
313 {
314 // Check that at-least two message slots are open.
315 if self.to_server_tx.slots() < 2 + self.num_prefetch_blocks {
316 return Err(ReadError::IOServerChannelFull);
317 }
318
319 heap.caches[cache_index].wanted_start_frame = start_frame;
320 let mut cache = heap.caches[cache_index].cache.take();
321
322 // If any blocks are currently using this cache, then set this cache as the
323 // temporary cache and tell each block to use that instead.
324 let mut using_cache = false;
325 let mut using_temp_cache = false;
326 for block in heap.prefetch_buffer.iter_mut() {
327 if let Some(index) = block.use_cache_index {
328 if index == cache_index {
329 block.use_cache_index = Some(self.temp_cache_index);
330 using_cache = true;
331 } else if index == self.temp_cache_index {
332 using_temp_cache = true;
333 }
334 }
335 }
336 if using_cache {
337 if let Some(old_cache) = heap.caches[self.temp_cache_index].cache.take() {
338 // If any blocks are currently using the old temporary cache, dispose those blocks.
339 if using_temp_cache {
340 for block in heap.prefetch_buffer.iter_mut() {
341 if let Some(index) = block.use_cache_index {
342 if index == self.temp_cache_index {
343 block.use_cache_index = None;
344 if let Some(block) = block.block.take() {
345 // Tell the server to deallocate the old block.
346 // This cannot fail because we made sure that a slot is available in
347 // the previous step.
348 let _ = self
349 .to_server_tx
350 .push(ClientToServerMsg::DisposeBlock { block });
351 }
352 }
353 }
354 }
355 }
356
357 // Tell the server to deallocate the old temporary cache.
358 // This cannot fail because we made sure that a slot is available in
359 // the previous step.
360 let _ = self
361 .to_server_tx
362 .push(ClientToServerMsg::DisposeCache { cache: old_cache });
363 }
364
365 heap.caches[self.temp_cache_index].cache = cache.take();
366 }
367
368 // This cannot fail because we made sure that a slot is available in
369 // the previous step.
370 let _ = self.to_server_tx.push(ClientToServerMsg::Cache {
371 cache_index,
372 cache,
373 start_frame,
374 });
375
376 return Ok(true);
377 }
378
379 Ok(false)
380 }
381
382 /// Request to seek playback to a new position in the file.
383 ///
384 /// This is realtime-safe.
385 ///
386 /// * `frame` - The position in the file to seek to. If this lies outside of the end of
387 /// the file, then playback will return silence.
388 /// * `seek_mode` - Describes how to search for a suitable cache to use.
389 ///
390 /// If a suitable cache is found, then (true) is returned meaning that playback can resume immediately
391 /// without any buffering. Otherwise (false) is returned meaning that playback will need to
392 /// buffer first. In this case, you may choose to continue reading (which will return silence), or
393 /// to pause playback temporarily.
394 pub fn seek(
395 &mut self,
396 frame: usize,
397 seek_mode: SeekMode,
398 ) -> Result<bool, ReadError<D::FatalError>> {
399 if self.fatal_error {
400 return Err(ReadError::FatalError(FatalReadError::StreamClosed));
401 }
402
403 // Check that enough message slots are open.
404 if self.to_server_tx.slots() < 3 + self.num_prefetch_blocks {
405 return Err(ReadError::IOServerChannelFull);
406 }
407
408 let Some(heap) = self.heap_data.as_mut() else {
409 // This will never return here because `heap_data` can only be `None` in the destructor.
410 return Ok(false);
411 };
412
413 let mut found_cache = None;
414
415 if let Some(cache_index) = match seek_mode {
416 SeekMode::TryOne(cache_index) => Some(cache_index),
417 SeekMode::TryOneThenAuto(cache_index) => Some(cache_index),
418 _ => None,
419 } {
420 if heap.caches[cache_index].cache.is_some() {
421 let cache_start_frame = heap.caches[cache_index].wanted_start_frame;
422 if frame == cache_start_frame
423 || (frame > cache_start_frame && frame < cache_start_frame + self.cache_size)
424 {
425 found_cache = Some(cache_index);
426 }
427 }
428 }
429
430 if found_cache.is_none() {
431 let auto_search = match seek_mode {
432 SeekMode::Auto | SeekMode::TryOneThenAuto(_) => true,
433 SeekMode::NoCache | SeekMode::TryOne(_) => false,
434 };
435
436 if auto_search {
437 // Check previous caches.
438 for i in 0..heap.caches.len() - 2 {
439 if heap.caches[i].cache.is_some() {
440 let cache_start_frame = heap.caches[i].wanted_start_frame;
441 if frame == cache_start_frame
442 || (frame > cache_start_frame
443 && frame < cache_start_frame + self.cache_size)
444 {
445 found_cache = Some(i);
446 break;
447 }
448 }
449 }
450 }
451 }
452
453 if let Some(cache_index) = found_cache {
454 // Find the position in the old cache.
455 let cache_start_frame = heap.caches[cache_index].wanted_start_frame;
456 let mut delta = frame - cache_start_frame;
457 let mut block_i = 0;
458 while delta >= self.block_size {
459 block_i += 1;
460 delta -= self.block_size
461 }
462
463 self.current_block_start_frame = cache_start_frame + (block_i * self.block_size);
464 self.current_frame_in_block = delta;
465 self.current_block_index = block_i;
466 self.next_block_index = block_i + 1;
467 if self.next_block_index >= self.num_prefetch_blocks {
468 self.next_block_index = 0;
469 }
470
471 // Tell remaining blocks to use the cache.
472 for i in block_i..heap.prefetch_buffer.len() {
473 heap.prefetch_buffer[i].use_cache_index = Some(cache_index);
474 }
475
476 // Request the server to start fetching blocks ahead of the cache.
477 // This cannot fail because we made sure that a slot is available in
478 // the previous step.
479 let mut wanted_start_frame = cache_start_frame + self.prefetch_size;
480 let _ = self.to_server_tx.push(ClientToServerMsg::SeekTo {
481 frame: wanted_start_frame,
482 });
483
484 // Fetch remaining blocks.
485 for i in 0..block_i {
486 // This cannot fail because we made sure there are enough slots available
487 // in the previous step.
488 let _ = self.to_server_tx.push(ClientToServerMsg::ReadIntoBlock {
489 block_index: i,
490 block: heap.prefetch_buffer[i].block.take(),
491 start_frame: wanted_start_frame,
492 });
493 heap.prefetch_buffer[i].use_cache_index = None;
494 heap.prefetch_buffer[i].wanted_start_frame = wanted_start_frame;
495 wanted_start_frame += self.block_size;
496 }
497
498 Ok(true)
499 } else {
500 // Create a new temporary seek cache.
501 // This cannot fail because we made sure that a slot is available in
502 // the previous step.
503 heap.caches[self.temp_seek_cache_index].wanted_start_frame = frame;
504 let _ = self.to_server_tx.push(ClientToServerMsg::Cache {
505 cache_index: self.temp_seek_cache_index,
506 cache: heap.caches[self.temp_seek_cache_index].cache.take(),
507 start_frame: frame,
508 });
509
510 // Start from beginning of new cache.
511 self.current_block_start_frame = frame;
512 self.current_frame_in_block = 0;
513 self.current_block_index = 0;
514 self.next_block_index = 1;
515
516 // Request the server to start fetching blocks ahead of the cache.
517 // This cannot fail because we made sure that a slot is available in
518 // the previous step.
519 let _ = self.to_server_tx.push(ClientToServerMsg::SeekTo {
520 frame: self.current_block_start_frame + self.prefetch_size,
521 });
522
523 // Tell each prefetch block to use the cache.
524 for block in heap.prefetch_buffer.iter_mut() {
525 block.use_cache_index = Some(self.temp_seek_cache_index);
526 }
527
528 Ok(false)
529 }
530 }
531
532 /// Returns true if the stream is finished buffering and there is data can be read
533 /// right now, false otherwise.
534 ///
535 /// This is realtime-safe.
536 ///
537 /// In the case where `false` is returned, then you may choose to continue reading
538 /// (which will return silence), or to pause playback temporarily.
539 pub fn is_ready(&mut self) -> Result<bool, ReadError<D::FatalError>> {
540 self.poll()?;
541
542 if self.to_server_tx.is_full() {
543 return Ok(false);
544 }
545
546 let Some(heap) = self.heap_data.as_mut() else {
547 // This will never return here because `heap_data` can only be `None` in the destructor.
548 return Ok(false);
549 };
550
551 // Check if the next two blocks are ready.
552
553 if let Some(cache_index) = heap.prefetch_buffer[self.current_block_index].use_cache_index {
554 // This check should never fail because it can only be `None` in the destructor.
555 if heap.caches[cache_index].cache.is_none() {
556 // Cache has not been received yet.
557 return Ok(false);
558 }
559 } else if heap.prefetch_buffer[self.current_block_index]
560 .block
561 .is_none()
562 {
563 // Block has not been received yet.
564 return Ok(false);
565 }
566
567 if let Some(cache_index) = heap.prefetch_buffer[self.next_block_index].use_cache_index {
568 // This check should never fail because it can only be `None` in the destructor.
569 if heap.caches[cache_index].cache.is_none() {
570 // Cache has not been received yet.
571 return Ok(false);
572 }
573 } else if heap.prefetch_buffer[self.next_block_index].block.is_none() {
574 // Block has not been received yet.
575 return Ok(false);
576 }
577
578 Ok(true)
579 }
580
581 /// Blocks the current thread until the stream is done buffering.
582 ///
583 /// NOTE: This is ***not*** realtime-safe. This is only useful
584 /// for making sure a stream is ready before sending it to a realtime thread.
585 pub fn block_until_ready(&mut self) -> Result<(), ReadError<D::FatalError>> {
586 loop {
587 if self.is_ready()? {
588 break;
589 }
590
591 std::thread::sleep(SERVER_WAIT_TIME);
592 }
593
594 Ok(())
595 }
596
597 /// Blocks the current thread until the given buffer is filled.
598 ///
599 /// NOTE: This is ***not*** realtime-safe.
600 ///
601 /// This will start reading from the stream's current playhead (this can be changed
602 /// beforehand with `ReadDiskStream::seek()`). This is streaming, meaning the next call to
603 /// `fill_buffer_blocking()` or `ReadDiskStream::read()` will pick up from where the previous
604 /// call ended.
605 ///
606 /// ## Returns
607 /// This will return the number of frames that were written to the buffer. This may be less
608 /// than the length of the buffer if the end of the file was reached, so use this as a check
609 /// if the entire buffer was filled or not.
610 ///
611 /// ## Error
612 /// This will return an error if the number of channels in the buffer does not equal the number
613 /// of channels in the stream, if the length of each channel is not the same, or if there was
614 /// an internal error with reading the stream.
615 pub fn fill_buffer_blocking(
616 &mut self,
617 buffer: &mut [Vec<D::T>],
618 ) -> Result<usize, ReadError<D::FatalError>> {
619 if buffer.len() != usize::from(self.file_info.num_channels) {
620 return Err(ReadError::InvalidBuffer);
621 }
622
623 let buffer_len = buffer[0].len();
624
625 // Sanity check that all channels are the same length.
626 for ch in buffer.iter().skip(1) {
627 if ch.len() != buffer_len {
628 return Err(ReadError::InvalidBuffer);
629 }
630 }
631
632 let mut frames_written = 0;
633 while frames_written < buffer_len {
634 let mut reached_end_of_file = false;
635
636 while self.is_ready()? {
637 let read_frames = (buffer_len - frames_written).min(self.block_size);
638
639 let read_data = self.read(read_frames)?;
640 for (i, ch) in buffer.iter_mut().enumerate() {
641 (*ch)[frames_written..frames_written + read_data.num_frames()]
642 .copy_from_slice(read_data.read_channel(i));
643 }
644
645 frames_written += read_data.num_frames();
646
647 if read_data.reached_end_of_file() {
648 reached_end_of_file = true;
649 break;
650 }
651 }
652
653 if reached_end_of_file {
654 break;
655 }
656
657 std::thread::sleep(SERVER_WAIT_TIME);
658 }
659
660 Ok(frames_written)
661 }
662
663 fn poll(&mut self) -> Result<(), ReadError<D::FatalError>> {
664 if self.fatal_error {
665 return Err(ReadError::FatalError(FatalReadError::StreamClosed));
666 }
667
668 // Retrieve any data sent from the server.
669
670 let Some(heap) = self.heap_data.as_mut() else {
671 // This will never return here because `heap_data` can only be `None` in the destructor.
672 return Ok(());
673 };
674
675 loop {
676 // Check that there is at-least one slot open before popping the next message.
677 if self.to_server_tx.is_full() {
678 return Err(ReadError::IOServerChannelFull);
679 }
680
681 if let Ok(msg) = self.from_server_rx.pop() {
682 match msg {
683 ServerToClientMsg::ReadIntoBlockRes {
684 block_index,
685 block,
686 wanted_start_frame,
687 } => {
688 let prefetch_block = &mut heap.prefetch_buffer[block_index];
689
690 // Only use results from the latest request.
691 if wanted_start_frame == prefetch_block.wanted_start_frame {
692 if let Some(prefetch_block) = prefetch_block.block.take() {
693 // Tell the IO server to deallocate the old block.
694 // This cannot fail because we made sure that a slot is available in
695 // a previous step.
696 let _ = self.to_server_tx.push(ClientToServerMsg::DisposeBlock {
697 block: prefetch_block,
698 });
699 }
700
701 // Store the new block into the prefetch buffer.
702 prefetch_block.block = Some(block);
703 } else {
704 // Tell the server to deallocate the block.
705 // This cannot fail because we made sure that a slot is available in
706 // a previous step.
707 let _ = self
708 .to_server_tx
709 .push(ClientToServerMsg::DisposeBlock { block });
710 }
711 }
712 ServerToClientMsg::CacheRes {
713 cache_index,
714 cache,
715 wanted_start_frame,
716 } => {
717 let cache_entry = &mut heap.caches[cache_index];
718
719 // Only use results from the latest request.
720 if wanted_start_frame == cache_entry.wanted_start_frame {
721 if let Some(cache_entry) = cache_entry.cache.take() {
722 // Tell the IO server to deallocate the old cache.
723 // This cannot fail because we made sure that a slot is available in
724 // a previous step.
725 let _ = self
726 .to_server_tx
727 .push(ClientToServerMsg::DisposeCache { cache: cache_entry });
728 }
729
730 // Store the new cache.
731 cache_entry.cache = Some(cache);
732 } else {
733 // Tell the server to deallocate the cache.
734 // This cannot fail because we made sure that a slot is available in
735 // a previous step.
736 let _ = self
737 .to_server_tx
738 .push(ClientToServerMsg::DisposeCache { cache });
739 }
740 }
741 ServerToClientMsg::FatalError(e) => {
742 self.fatal_error = true;
743 return Err(ReadError::FatalError(FatalReadError::DecoderError(e)));
744 }
745 }
746 } else {
747 break;
748 }
749 }
750
751 Ok(())
752 }
753
754 /// Read the next chunk of `frames` in the stream from the current playhead position.
755 ///
756 /// This is realtime-safe.
757 ///
758 /// This is *streaming*, meaning the next call to `read()` will pick up where the
759 /// previous call left off.
760 ///
761 /// If the stream is currently buffering, (false) will be returned, and the playhead will still
762 /// advance but will output silence. Otherwise, data can be read and (true) is returned. To check
763 /// if the stream is ready beforehand, call `ReadDiskStream::is_ready()`.
764 ///
765 /// If the end of a file is reached, then only the amount of frames up to the end will be returned,
766 /// and playback will return silence on each subsequent call to `read()`.
767 ///
768 /// NOTE: If the number of `frames` exceeds the block size of the decoder, then that block size
769 /// will be used instead. This can be retrieved using `ReadDiskStream::block_size()`.
770 pub fn read(
771 &mut self,
772 mut frames: usize,
773 ) -> Result<ReadData<'_, D::T>, ReadError<D::FatalError>> {
774 if self.fatal_error {
775 return Err(ReadError::FatalError(FatalReadError::StreamClosed));
776 }
777
778 frames = frames.min(self.block_size);
779
780 self.poll()?;
781
782 // Check that there is at-least one slot open for when `advance_to_next_block()` is called.
783 if self.to_server_tx.is_full() {
784 return Err(ReadError::IOServerChannelFull);
785 }
786
787 // Check if the end of the file was reached.
788 if self.playhead() >= self.file_info.num_frames {
789 return Err(ReadError::EndOfFile);
790 }
791 let mut reached_end_of_file = false;
792 if self.playhead() + frames >= self.file_info.num_frames {
793 frames = self.file_info.num_frames - self.playhead();
794 reached_end_of_file = true;
795 }
796
797 let end_frame_in_block = self.current_frame_in_block + frames;
798 if end_frame_in_block > self.block_size {
799 // Data spans between two blocks, so two copies need to be performed.
800
801 let first_len = self.block_size - self.current_frame_in_block;
802 let second_len = frames - first_len;
803
804 // Copy from first block.
805 {
806 let Some(heap) = self.heap_data.as_mut() else {
807 // This will never return here because `heap_data` can only be `None` in the destructor.
808 return Err(ReadError::IOServerChannelFull);
809 };
810
811 heap.read_buffer.clear();
812
813 copy_block_into_read_buffer(
814 heap,
815 self.current_block_index,
816 self.current_frame_in_block,
817 first_len,
818 );
819 }
820
821 self.advance_to_next_block()?;
822
823 // Copy from second block
824 {
825 let Some(heap) = self.heap_data.as_mut() else {
826 // This will never return here because `heap_data` can only be `None` in the destructor.
827 return Err(ReadError::IOServerChannelFull);
828 };
829
830 copy_block_into_read_buffer(heap, self.current_block_index, 0, second_len);
831 }
832
833 self.current_frame_in_block = second_len;
834 } else {
835 // Only need to copy from current block.
836 {
837 let Some(heap) = self.heap_data.as_mut() else {
838 // This will never return here because `heap_data` can only be `None` in the destructor.
839 return Err(ReadError::IOServerChannelFull);
840 };
841
842 heap.read_buffer.clear();
843
844 copy_block_into_read_buffer(
845 heap,
846 self.current_block_index,
847 self.current_frame_in_block,
848 frames,
849 );
850 }
851
852 self.current_frame_in_block = end_frame_in_block;
853 if self.current_frame_in_block == self.block_size {
854 self.advance_to_next_block()?;
855 self.current_frame_in_block = 0;
856 }
857 }
858
859 let Some(heap) = self.heap_data.as_mut() else {
860 // This will never return here because `heap_data` can only be `None` in the destructor.
861 return Err(ReadError::IOServerChannelFull);
862 };
863
864 // This check should never fail because it can only be `None` in the destructor.
865 Ok(ReadData::new(
866 &heap.read_buffer,
867 frames,
868 reached_end_of_file,
869 ))
870 }
871
872 fn advance_to_next_block(&mut self) -> Result<(), ReadError<D::FatalError>> {
873 let Some(heap) = self.heap_data.as_mut() else {
874 // This will never return here because `heap_data` can only be `None` in the destructor.
875 return Ok(());
876 };
877
878 let entry = &mut heap.prefetch_buffer[self.current_block_index];
879
880 // Request a new block of data that is one block ahead of the
881 // latest block in the prefetch buffer.
882 let wanted_start_frame = self.current_block_start_frame + (self.prefetch_size);
883
884 entry.use_cache_index = None;
885 entry.wanted_start_frame = wanted_start_frame;
886
887 // This cannot fail because the caller function `read` makes sure there
888 // is at-least one slot open before calling this function.
889 let _ = self.to_server_tx.push(ClientToServerMsg::ReadIntoBlock {
890 block_index: self.current_block_index,
891 // Send block to be re-used by the IO server.
892 block: entry.block.take(),
893 start_frame: wanted_start_frame,
894 });
895
896 self.current_block_index += 1;
897 if self.current_block_index >= self.num_prefetch_blocks {
898 self.current_block_index = 0;
899 }
900
901 self.next_block_index += 1;
902 if self.next_block_index >= self.num_prefetch_blocks {
903 self.next_block_index = 0;
904 }
905
906 self.current_block_start_frame += self.block_size;
907
908 Ok(())
909 }
910
911 /// Return the current frame of the playhead.
912 ///
913 /// This is realtime-safe.
914 pub fn playhead(&self) -> usize {
915 self.current_block_start_frame + self.current_frame_in_block
916 }
917
918 /// Return info about the file.
919 ///
920 /// This is realtime-safe.
921 pub fn info(&self) -> &FileInfo<D::FileParams> {
922 &self.file_info
923 }
924
925 /// Return the block size used by this decoder.
926 ///
927 /// This is realtime-safe.
928 pub fn block_size(&self) -> usize {
929 self.block_size
930 }
931}
932
933impl<D: Decoder> Drop for ReadDiskStream<D> {
934 fn drop(&mut self) {
935 // Tell the server to deallocate any heap data.
936 // This cannot fail because this is the only place the signal is ever sent.
937 let _ = self.close_signal_tx.push(self.heap_data.take());
938 }
939}
940
941fn copy_block_into_read_buffer<T: Copy + Default + Send>(
942 heap: &mut HeapData<T>,
943 block_index: usize,
944 start_frame_in_block: usize,
945 frames: usize,
946) {
947 let block_entry = &heap.prefetch_buffer[block_index];
948
949 let maybe_block = match block_entry.use_cache_index {
950 Some(cache_index) => heap.caches[cache_index]
951 .cache
952 .as_ref()
953 .map(|cache| &cache.blocks[block_index]),
954 None => {
955 block_entry.block.as_ref()
956
957 // TODO: warn of buffer underflow.
958 }
959 };
960
961 let Some(block) = maybe_block else {
962 // If no block exists, output silence.
963 for buffer_ch in heap.read_buffer.block.iter_mut() {
964 buffer_ch.resize(buffer_ch.len() + frames, Default::default());
965 }
966
967 return;
968 };
969
970 for (buffer_ch, block_ch) in heap.read_buffer.block.iter_mut().zip(block.block.iter()) {
971 // If for some reason the decoder did not fill this block fully,
972 // fill the rest with zeros.
973 if block_ch.len() < start_frame_in_block + frames {
974 if block_ch.len() <= start_frame_in_block {
975 // The block has no more data to copy, fill all frames with zeros.
976 buffer_ch.resize(buffer_ch.len() + frames, Default::default());
977 } else {
978 let copy_frames = block_ch.len() - start_frame_in_block;
979
980 buffer_ch.extend_from_slice(
981 &block_ch[start_frame_in_block..start_frame_in_block + copy_frames],
982 );
983
984 buffer_ch.resize(buffer_ch.len() + frames - copy_frames, Default::default());
985 }
986 } else {
987 buffer_ch
988 .extend_from_slice(&block_ch[start_frame_in_block..start_frame_in_block + frames]);
989 };
990 }
991}