creek_core/read/
read_stream.rs

1use rtrb::{Consumer, Producer, RingBuffer};
2use std::path::PathBuf;
3
4use super::data::{DataBlockCacheEntry, DataBlockEntry};
5use super::error::{FatalReadError, ReadError};
6use super::{
7    ClientToServerMsg, DataBlock, Decoder, HeapData, ReadData, ReadServer, ReadStreamOptions,
8    ServerToClientMsg,
9};
10use crate::read::server::ReadServerOptions;
11use crate::{FileInfo, SERVER_WAIT_TIME};
12
13/// Describes how to search for suitable caches when seeking in a [`ReadDiskStream`].
14///
15/// If a suitable cache is found, then reading can resume immediately. If not, then
16/// the stream will need to buffer before it can read data. In this case, you may
17/// decide to either continue reading (which will return silence) or to pause
18/// playback temporarily.
19///
20/// [`ReadDiskStream`]: struct.ReadDiskStream.html
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum SeekMode {
23    /// Automatically search for a suitable cache to use. This is the default mode.
24    #[default]
25    Auto,
26    /// Only try one cache with the given index. If you already know a suitable cache,
27    /// this can be more performant than searching each cache individually.
28    TryOne(usize),
29    /// Try the given cache with the given index, and if it is not suitable, automatically
30    /// search for a suitable one. If you already know a suitable cache, this can be
31    /// more performant than searching each cache individually.
32    TryOneThenAuto(usize),
33    /// Seek without searching for a suitable cache. This **will** cause the stream
34    /// to buffer.
35    NoCache,
36}
37
38struct ReadDiskStreamOptions<D: Decoder> {
39    start_frame: usize,
40    num_cache_blocks: usize,
41    num_look_ahead_blocks: usize,
42    max_num_caches: usize,
43    block_size: usize,
44    file_info: FileInfo<D::FileParams>,
45}
46
47/// A realtime-safe disk-streaming reader of audio files.
48pub struct ReadDiskStream<D: Decoder> {
49    to_server_tx: Producer<ClientToServerMsg<D>>,
50    from_server_rx: Consumer<ServerToClientMsg<D>>,
51    close_signal_tx: Producer<Option<HeapData<D::T>>>,
52
53    heap_data: Option<HeapData<D::T>>,
54
55    current_block_index: usize,
56    next_block_index: usize,
57    current_block_start_frame: usize,
58    current_frame_in_block: usize,
59
60    temp_cache_index: usize,
61    temp_seek_cache_index: usize,
62
63    num_prefetch_blocks: usize,
64    prefetch_size: usize,
65    cache_size: usize,
66    block_size: usize,
67
68    file_info: FileInfo<D::FileParams>,
69    fatal_error: bool,
70}
71
72impl<D: Decoder> ReadDiskStream<D> {
73    /// Open a new realtime-safe disk-streaming reader.
74    ///
75    /// * `file` - The path to the file to open.
76    /// * `start_frame` - The frame in the file to start reading from.
77    /// * `stream_opts` - Additional stream options.
78    ///
79    /// # Panics
80    ///
81    /// This will panic if `stream_block_size`, `stream_num_look_ahead_blocks`,
82    /// or `stream_server_msg_channel_size` is `0`.
83    pub fn new<P: Into<PathBuf>>(
84        file: P,
85        start_frame: usize,
86        stream_opts: ReadStreamOptions<D>,
87    ) -> Result<ReadDiskStream<D>, D::OpenError> {
88        let ReadStreamOptions {
89            num_cache_blocks,
90            num_caches,
91            additional_opts,
92            num_look_ahead_blocks,
93            block_size,
94            server_msg_channel_size,
95        } = stream_opts;
96
97        assert_ne!(block_size, 0);
98        assert_ne!(num_look_ahead_blocks, 0);
99        assert_ne!(server_msg_channel_size, Some(0));
100
101        // Reserve ample space for the message channels.
102        let msg_channel_size = server_msg_channel_size
103            .unwrap_or(((num_cache_blocks + num_look_ahead_blocks) * 4) + (num_caches * 4) + 8);
104
105        let (to_server_tx, from_client_rx) =
106            RingBuffer::<ClientToServerMsg<D>>::new(msg_channel_size);
107        let (to_client_tx, from_server_rx) =
108            RingBuffer::<ServerToClientMsg<D>>::new(msg_channel_size);
109
110        // Create dedicated close signal.
111        let (close_signal_tx, close_signal_rx) = RingBuffer::<Option<HeapData<D::T>>>::new(1);
112
113        let file: PathBuf = file.into();
114
115        match ReadServer::spawn(
116            ReadServerOptions {
117                file,
118                start_frame,
119                num_prefetch_blocks: num_cache_blocks + num_look_ahead_blocks,
120                block_size,
121                additional_opts,
122            },
123            to_client_tx,
124            from_client_rx,
125            close_signal_rx,
126        ) {
127            Ok(file_info) => {
128                let client = ReadDiskStream::create(
129                    ReadDiskStreamOptions {
130                        start_frame,
131                        num_cache_blocks,
132                        num_look_ahead_blocks,
133                        max_num_caches: num_caches,
134                        block_size,
135                        file_info,
136                    },
137                    to_server_tx,
138                    from_server_rx,
139                    close_signal_tx,
140                );
141
142                Ok(client)
143            }
144            Err(e) => Err(e),
145        }
146    }
147
148    fn create(
149        opts: ReadDiskStreamOptions<D>,
150        to_server_tx: Producer<ClientToServerMsg<D>>,
151        from_server_rx: Consumer<ServerToClientMsg<D>>,
152        close_signal_tx: Producer<Option<HeapData<D::T>>>,
153    ) -> Self {
154        let ReadDiskStreamOptions {
155            start_frame,
156            num_cache_blocks,
157            num_look_ahead_blocks,
158            max_num_caches,
159            block_size,
160            file_info,
161        } = opts;
162
163        let num_prefetch_blocks = num_cache_blocks + num_look_ahead_blocks;
164
165        let read_buffer = DataBlock::new(usize::from(file_info.num_channels), block_size);
166
167        // Reserve the last two caches as temporary caches.
168        let max_num_caches = max_num_caches + 2;
169
170        let mut caches: Vec<DataBlockCacheEntry<D::T>> = Vec::with_capacity(max_num_caches);
171        for _ in 0..max_num_caches {
172            caches.push(DataBlockCacheEntry {
173                cache: None,
174                wanted_start_frame: 0,
175            });
176        }
177
178        let temp_cache_index = max_num_caches - 1;
179        let temp_seek_cache_index = max_num_caches - 2;
180
181        let mut prefetch_buffer: Vec<DataBlockEntry<D::T>> =
182            Vec::with_capacity(num_prefetch_blocks);
183        let mut wanted_start_frame = start_frame;
184        for _ in 0..num_prefetch_blocks {
185            prefetch_buffer.push(DataBlockEntry {
186                use_cache_index: None,
187                block: None,
188                wanted_start_frame,
189            });
190
191            wanted_start_frame += block_size;
192        }
193
194        let heap_data = Some(HeapData {
195            read_buffer,
196            prefetch_buffer,
197            caches,
198        });
199
200        Self {
201            to_server_tx,
202            from_server_rx,
203            close_signal_tx,
204
205            heap_data,
206
207            current_block_index: 0,
208            next_block_index: 1,
209            current_block_start_frame: start_frame,
210            current_frame_in_block: 0,
211
212            temp_cache_index,
213            temp_seek_cache_index,
214
215            num_prefetch_blocks,
216            prefetch_size: num_prefetch_blocks * block_size,
217            cache_size: num_cache_blocks * block_size,
218            block_size,
219
220            file_info,
221            fatal_error: false,
222        }
223    }
224
225    /// Return the total number of caches available in this stream.
226    ///
227    /// This is realtime-safe.
228    pub fn num_caches(&self) -> usize {
229        // This check should never fail because it can only be `None` in the destructor.
230        if let Some(heap) = &self.heap_data {
231            heap.caches.len() - 2
232        } else {
233            0
234        }
235    }
236
237    /// Returns whether a cache can be moved seamlessly without silencing current playback (true)
238    /// or not (false).
239    ///
240    /// This is realtime-safe.
241    ///
242    /// If the position of a cache is changed while the playback stream is currently relying on it,
243    /// then it will attempt to store the cache in a temporary buffer to allow playback to resume
244    /// seamlessly.
245    ///
246    /// However, in the case where the cache is moved multiple times in quick succession while being
247    /// relied on, then any blocks relying on the oldest cache will be silenced. In this case, (false)
248    /// will be returned.
249    pub fn can_move_cache(&mut self, cache_index: usize) -> bool {
250        let Some(heap) = self.heap_data.as_ref() else {
251            // This will never return here because `heap_data` can only be `None` in the destructor.
252            return false;
253        };
254
255        let mut using_cache = false;
256        let mut using_temp_cache = false;
257        for block in &heap.prefetch_buffer {
258            if let Some(index) = block.use_cache_index {
259                if index == cache_index {
260                    using_cache = true;
261                } else if index == self.temp_cache_index {
262                    using_temp_cache = true;
263                }
264            }
265        }
266
267        !(using_cache && using_temp_cache)
268    }
269
270    /// Request to cache a new area in the file.
271    ///
272    /// This is realtime-safe.
273    ///
274    /// * `cache_index` - The index of the cache to use. Use `ReadDiskStream::num_caches()` to see
275    ///    how many caches have been assigned to this stream.
276    /// * `start_frame` - The frame in the file to start filling in the cache from. If any portion lies
277    ///    outside the end of the file, then that portion will be ignored.
278    ///
279    /// If the cache already exists, then it will be overwritten. If the cache already starts from this
280    /// position, then nothing will be done and (false) will be returned. Otherwise, (true) will be
281    /// returned.
282    ///
283    /// In the case where the position of a cache is changed while the playback stream is currently
284    /// relying on it, then it will attempt to store the cache in a temporary buffer to allow playback
285    /// to resume seamlessly.
286    ///
287    /// However, in the case where the cache is moved multiple times in quick succession while being
288    /// relied on, then any blocks relying on the oldest cache will be silenced. See
289    /// `ReadDiskStream::can_move_cache()` to check if a cache can be seamlessly moved first.
290    pub fn cache(
291        &mut self,
292        cache_index: usize,
293        start_frame: usize,
294    ) -> Result<bool, ReadError<D::FatalError>> {
295        if self.fatal_error {
296            return Err(ReadError::FatalError(FatalReadError::StreamClosed));
297        }
298
299        let Some(heap) = self.heap_data.as_mut() else {
300            // This will never return here because `heap_data` can only be `None` in the destructor.
301            return Ok(false);
302        };
303
304        if cache_index >= heap.caches.len() - 2 {
305            return Err(ReadError::CacheIndexOutOfRange {
306                index: cache_index,
307                num_caches: heap.caches.len() - 2,
308            });
309        }
310
311        if start_frame != heap.caches[cache_index].wanted_start_frame
312            || heap.caches[cache_index].cache.is_none()
313        {
314            // Check that at-least two message slots are open.
315            if self.to_server_tx.slots() < 2 + self.num_prefetch_blocks {
316                return Err(ReadError::IOServerChannelFull);
317            }
318
319            heap.caches[cache_index].wanted_start_frame = start_frame;
320            let mut cache = heap.caches[cache_index].cache.take();
321
322            // If any blocks are currently using this cache, then set this cache as the
323            // temporary cache and tell each block to use that instead.
324            let mut using_cache = false;
325            let mut using_temp_cache = false;
326            for block in heap.prefetch_buffer.iter_mut() {
327                if let Some(index) = block.use_cache_index {
328                    if index == cache_index {
329                        block.use_cache_index = Some(self.temp_cache_index);
330                        using_cache = true;
331                    } else if index == self.temp_cache_index {
332                        using_temp_cache = true;
333                    }
334                }
335            }
336            if using_cache {
337                if let Some(old_cache) = heap.caches[self.temp_cache_index].cache.take() {
338                    // If any blocks are currently using the old temporary cache, dispose those blocks.
339                    if using_temp_cache {
340                        for block in heap.prefetch_buffer.iter_mut() {
341                            if let Some(index) = block.use_cache_index {
342                                if index == self.temp_cache_index {
343                                    block.use_cache_index = None;
344                                    if let Some(block) = block.block.take() {
345                                        // Tell the server to deallocate the old block.
346                                        // This cannot fail because we made sure that a slot is available in
347                                        // the previous step.
348                                        let _ = self
349                                            .to_server_tx
350                                            .push(ClientToServerMsg::DisposeBlock { block });
351                                    }
352                                }
353                            }
354                        }
355                    }
356
357                    // Tell the server to deallocate the old temporary cache.
358                    // This cannot fail because we made sure that a slot is available in
359                    // the previous step.
360                    let _ = self
361                        .to_server_tx
362                        .push(ClientToServerMsg::DisposeCache { cache: old_cache });
363                }
364
365                heap.caches[self.temp_cache_index].cache = cache.take();
366            }
367
368            // This cannot fail because we made sure that a slot is available in
369            // the previous step.
370            let _ = self.to_server_tx.push(ClientToServerMsg::Cache {
371                cache_index,
372                cache,
373                start_frame,
374            });
375
376            return Ok(true);
377        }
378
379        Ok(false)
380    }
381
382    /// Request to seek playback to a new position in the file.
383    ///
384    /// This is realtime-safe.
385    ///
386    /// * `frame` - The position in the file to seek to. If this lies outside of the end of
387    ///    the file, then playback will return silence.
388    /// * `seek_mode` - Describes how to search for a suitable cache to use.
389    ///
390    /// If a suitable cache is found, then (true) is returned meaning that playback can resume immediately
391    /// without any buffering. Otherwise (false) is returned meaning that playback will need to
392    /// buffer first. In this case, you may choose to continue reading (which will return silence), or
393    /// to pause playback temporarily.
394    pub fn seek(
395        &mut self,
396        frame: usize,
397        seek_mode: SeekMode,
398    ) -> Result<bool, ReadError<D::FatalError>> {
399        if self.fatal_error {
400            return Err(ReadError::FatalError(FatalReadError::StreamClosed));
401        }
402
403        // Check that enough message slots are open.
404        if self.to_server_tx.slots() < 3 + self.num_prefetch_blocks {
405            return Err(ReadError::IOServerChannelFull);
406        }
407
408        let Some(heap) = self.heap_data.as_mut() else {
409            // This will never return here because `heap_data` can only be `None` in the destructor.
410            return Ok(false);
411        };
412
413        let mut found_cache = None;
414
415        if let Some(cache_index) = match seek_mode {
416            SeekMode::TryOne(cache_index) => Some(cache_index),
417            SeekMode::TryOneThenAuto(cache_index) => Some(cache_index),
418            _ => None,
419        } {
420            if heap.caches[cache_index].cache.is_some() {
421                let cache_start_frame = heap.caches[cache_index].wanted_start_frame;
422                if frame == cache_start_frame
423                    || (frame > cache_start_frame && frame < cache_start_frame + self.cache_size)
424                {
425                    found_cache = Some(cache_index);
426                }
427            }
428        }
429
430        if found_cache.is_none() {
431            let auto_search = match seek_mode {
432                SeekMode::Auto | SeekMode::TryOneThenAuto(_) => true,
433                SeekMode::NoCache | SeekMode::TryOne(_) => false,
434            };
435
436            if auto_search {
437                // Check previous caches.
438                for i in 0..heap.caches.len() - 2 {
439                    if heap.caches[i].cache.is_some() {
440                        let cache_start_frame = heap.caches[i].wanted_start_frame;
441                        if frame == cache_start_frame
442                            || (frame > cache_start_frame
443                                && frame < cache_start_frame + self.cache_size)
444                        {
445                            found_cache = Some(i);
446                            break;
447                        }
448                    }
449                }
450            }
451        }
452
453        if let Some(cache_index) = found_cache {
454            // Find the position in the old cache.
455            let cache_start_frame = heap.caches[cache_index].wanted_start_frame;
456            let mut delta = frame - cache_start_frame;
457            let mut block_i = 0;
458            while delta >= self.block_size {
459                block_i += 1;
460                delta -= self.block_size
461            }
462
463            self.current_block_start_frame = cache_start_frame + (block_i * self.block_size);
464            self.current_frame_in_block = delta;
465            self.current_block_index = block_i;
466            self.next_block_index = block_i + 1;
467            if self.next_block_index >= self.num_prefetch_blocks {
468                self.next_block_index = 0;
469            }
470
471            // Tell remaining blocks to use the cache.
472            for i in block_i..heap.prefetch_buffer.len() {
473                heap.prefetch_buffer[i].use_cache_index = Some(cache_index);
474            }
475
476            // Request the server to start fetching blocks ahead of the cache.
477            // This cannot fail because we made sure that a slot is available in
478            // the previous step.
479            let mut wanted_start_frame = cache_start_frame + self.prefetch_size;
480            let _ = self.to_server_tx.push(ClientToServerMsg::SeekTo {
481                frame: wanted_start_frame,
482            });
483
484            // Fetch remaining blocks.
485            for i in 0..block_i {
486                // This cannot fail because we made sure there are enough slots available
487                // in the previous step.
488                let _ = self.to_server_tx.push(ClientToServerMsg::ReadIntoBlock {
489                    block_index: i,
490                    block: heap.prefetch_buffer[i].block.take(),
491                    start_frame: wanted_start_frame,
492                });
493                heap.prefetch_buffer[i].use_cache_index = None;
494                heap.prefetch_buffer[i].wanted_start_frame = wanted_start_frame;
495                wanted_start_frame += self.block_size;
496            }
497
498            Ok(true)
499        } else {
500            // Create a new temporary seek cache.
501            // This cannot fail because we made sure that a slot is available in
502            // the previous step.
503            heap.caches[self.temp_seek_cache_index].wanted_start_frame = frame;
504            let _ = self.to_server_tx.push(ClientToServerMsg::Cache {
505                cache_index: self.temp_seek_cache_index,
506                cache: heap.caches[self.temp_seek_cache_index].cache.take(),
507                start_frame: frame,
508            });
509
510            // Start from beginning of new cache.
511            self.current_block_start_frame = frame;
512            self.current_frame_in_block = 0;
513            self.current_block_index = 0;
514            self.next_block_index = 1;
515
516            // Request the server to start fetching blocks ahead of the cache.
517            // This cannot fail because we made sure that a slot is available in
518            // the previous step.
519            let _ = self.to_server_tx.push(ClientToServerMsg::SeekTo {
520                frame: self.current_block_start_frame + self.prefetch_size,
521            });
522
523            // Tell each prefetch block to use the cache.
524            for block in heap.prefetch_buffer.iter_mut() {
525                block.use_cache_index = Some(self.temp_seek_cache_index);
526            }
527
528            Ok(false)
529        }
530    }
531
532    /// Returns true if the stream is finished buffering and there is data can be read
533    /// right now, false otherwise.
534    ///
535    /// This is realtime-safe.
536    ///
537    /// In the case where `false` is returned, then you may choose to continue reading
538    /// (which will return silence), or to pause playback temporarily.
539    pub fn is_ready(&mut self) -> Result<bool, ReadError<D::FatalError>> {
540        self.poll()?;
541
542        if self.to_server_tx.is_full() {
543            return Ok(false);
544        }
545
546        let Some(heap) = self.heap_data.as_mut() else {
547            // This will never return here because `heap_data` can only be `None` in the destructor.
548            return Ok(false);
549        };
550
551        // Check if the next two blocks are ready.
552
553        if let Some(cache_index) = heap.prefetch_buffer[self.current_block_index].use_cache_index {
554            // This check should never fail because it can only be `None` in the destructor.
555            if heap.caches[cache_index].cache.is_none() {
556                // Cache has not been received yet.
557                return Ok(false);
558            }
559        } else if heap.prefetch_buffer[self.current_block_index]
560            .block
561            .is_none()
562        {
563            // Block has not been received yet.
564            return Ok(false);
565        }
566
567        if let Some(cache_index) = heap.prefetch_buffer[self.next_block_index].use_cache_index {
568            // This check should never fail because it can only be `None` in the destructor.
569            if heap.caches[cache_index].cache.is_none() {
570                // Cache has not been received yet.
571                return Ok(false);
572            }
573        } else if heap.prefetch_buffer[self.next_block_index].block.is_none() {
574            // Block has not been received yet.
575            return Ok(false);
576        }
577
578        Ok(true)
579    }
580
581    /// Blocks the current thread until the stream is done buffering.
582    ///
583    /// NOTE: This is ***not*** realtime-safe. This is only useful
584    /// for making sure a stream is ready before sending it to a realtime thread.
585    pub fn block_until_ready(&mut self) -> Result<(), ReadError<D::FatalError>> {
586        loop {
587            if self.is_ready()? {
588                break;
589            }
590
591            std::thread::sleep(SERVER_WAIT_TIME);
592        }
593
594        Ok(())
595    }
596
597    /// Blocks the current thread until the given buffer is filled.
598    ///
599    /// NOTE: This is ***not*** realtime-safe.
600    ///
601    /// This will start reading from the stream's current playhead (this can be changed
602    /// beforehand with `ReadDiskStream::seek()`). This is streaming, meaning the next call to
603    /// `fill_buffer_blocking()` or `ReadDiskStream::read()` will pick up from where the previous
604    /// call ended.
605    ///
606    /// ## Returns
607    /// This will return the number of frames that were written to the buffer. This may be less
608    /// than the length of the buffer if the end of the file was reached, so use this as a check
609    /// if the entire buffer was filled or not.
610    ///
611    /// ## Error
612    /// This will return an error if the number of channels in the buffer does not equal the number
613    /// of channels in the stream, if the length of each channel is not the same, or if there was
614    /// an internal error with reading the stream.
615    pub fn fill_buffer_blocking(
616        &mut self,
617        buffer: &mut [Vec<D::T>],
618    ) -> Result<usize, ReadError<D::FatalError>> {
619        if buffer.len() != usize::from(self.file_info.num_channels) {
620            return Err(ReadError::InvalidBuffer);
621        }
622
623        let buffer_len = buffer[0].len();
624
625        // Sanity check that all channels are the same length.
626        for ch in buffer.iter().skip(1) {
627            if ch.len() != buffer_len {
628                return Err(ReadError::InvalidBuffer);
629            }
630        }
631
632        let mut frames_written = 0;
633        while frames_written < buffer_len {
634            let mut reached_end_of_file = false;
635
636            while self.is_ready()? {
637                let read_frames = (buffer_len - frames_written).min(self.block_size);
638
639                let read_data = self.read(read_frames)?;
640                for (i, ch) in buffer.iter_mut().enumerate() {
641                    (*ch)[frames_written..frames_written + read_data.num_frames()]
642                        .copy_from_slice(read_data.read_channel(i));
643                }
644
645                frames_written += read_data.num_frames();
646
647                if read_data.reached_end_of_file() {
648                    reached_end_of_file = true;
649                    break;
650                }
651            }
652
653            if reached_end_of_file {
654                break;
655            }
656
657            std::thread::sleep(SERVER_WAIT_TIME);
658        }
659
660        Ok(frames_written)
661    }
662
663    fn poll(&mut self) -> Result<(), ReadError<D::FatalError>> {
664        if self.fatal_error {
665            return Err(ReadError::FatalError(FatalReadError::StreamClosed));
666        }
667
668        // Retrieve any data sent from the server.
669
670        let Some(heap) = self.heap_data.as_mut() else {
671            // This will never return here because `heap_data` can only be `None` in the destructor.
672            return Ok(());
673        };
674
675        loop {
676            // Check that there is at-least one slot open before popping the next message.
677            if self.to_server_tx.is_full() {
678                return Err(ReadError::IOServerChannelFull);
679            }
680
681            if let Ok(msg) = self.from_server_rx.pop() {
682                match msg {
683                    ServerToClientMsg::ReadIntoBlockRes {
684                        block_index,
685                        block,
686                        wanted_start_frame,
687                    } => {
688                        let prefetch_block = &mut heap.prefetch_buffer[block_index];
689
690                        // Only use results from the latest request.
691                        if wanted_start_frame == prefetch_block.wanted_start_frame {
692                            if let Some(prefetch_block) = prefetch_block.block.take() {
693                                // Tell the IO server to deallocate the old block.
694                                // This cannot fail because we made sure that a slot is available in
695                                // a previous step.
696                                let _ = self.to_server_tx.push(ClientToServerMsg::DisposeBlock {
697                                    block: prefetch_block,
698                                });
699                            }
700
701                            // Store the new block into the prefetch buffer.
702                            prefetch_block.block = Some(block);
703                        } else {
704                            // Tell the server to deallocate the block.
705                            // This cannot fail because we made sure that a slot is available in
706                            // a previous step.
707                            let _ = self
708                                .to_server_tx
709                                .push(ClientToServerMsg::DisposeBlock { block });
710                        }
711                    }
712                    ServerToClientMsg::CacheRes {
713                        cache_index,
714                        cache,
715                        wanted_start_frame,
716                    } => {
717                        let cache_entry = &mut heap.caches[cache_index];
718
719                        // Only use results from the latest request.
720                        if wanted_start_frame == cache_entry.wanted_start_frame {
721                            if let Some(cache_entry) = cache_entry.cache.take() {
722                                // Tell the IO server to deallocate the old cache.
723                                // This cannot fail because we made sure that a slot is available in
724                                // a previous step.
725                                let _ = self
726                                    .to_server_tx
727                                    .push(ClientToServerMsg::DisposeCache { cache: cache_entry });
728                            }
729
730                            // Store the new cache.
731                            cache_entry.cache = Some(cache);
732                        } else {
733                            // Tell the server to deallocate the cache.
734                            // This cannot fail because we made sure that a slot is available in
735                            // a previous step.
736                            let _ = self
737                                .to_server_tx
738                                .push(ClientToServerMsg::DisposeCache { cache });
739                        }
740                    }
741                    ServerToClientMsg::FatalError(e) => {
742                        self.fatal_error = true;
743                        return Err(ReadError::FatalError(FatalReadError::DecoderError(e)));
744                    }
745                }
746            } else {
747                break;
748            }
749        }
750
751        Ok(())
752    }
753
754    /// Read the next chunk of `frames` in the stream from the current playhead position.
755    ///
756    /// This is realtime-safe.
757    ///
758    /// This is *streaming*, meaning the next call to `read()` will pick up where the
759    /// previous call left off.
760    ///
761    /// If the stream is currently buffering, (false) will be returned, and the playhead will still
762    /// advance but will output silence. Otherwise, data can be read and (true) is returned. To check
763    /// if the stream is ready beforehand, call `ReadDiskStream::is_ready()`.
764    ///
765    /// If the end of a file is reached, then only the amount of frames up to the end will be returned,
766    /// and playback will return silence on each subsequent call to `read()`.
767    ///
768    /// NOTE: If the number of `frames` exceeds the block size of the decoder, then that block size
769    /// will be used instead. This can be retrieved using `ReadDiskStream::block_size()`.
770    pub fn read(
771        &mut self,
772        mut frames: usize,
773    ) -> Result<ReadData<'_, D::T>, ReadError<D::FatalError>> {
774        if self.fatal_error {
775            return Err(ReadError::FatalError(FatalReadError::StreamClosed));
776        }
777
778        frames = frames.min(self.block_size);
779
780        self.poll()?;
781
782        // Check that there is at-least one slot open for when `advance_to_next_block()` is called.
783        if self.to_server_tx.is_full() {
784            return Err(ReadError::IOServerChannelFull);
785        }
786
787        // Check if the end of the file was reached.
788        if self.playhead() >= self.file_info.num_frames {
789            return Err(ReadError::EndOfFile);
790        }
791        let mut reached_end_of_file = false;
792        if self.playhead() + frames >= self.file_info.num_frames {
793            frames = self.file_info.num_frames - self.playhead();
794            reached_end_of_file = true;
795        }
796
797        let end_frame_in_block = self.current_frame_in_block + frames;
798        if end_frame_in_block > self.block_size {
799            // Data spans between two blocks, so two copies need to be performed.
800
801            let first_len = self.block_size - self.current_frame_in_block;
802            let second_len = frames - first_len;
803
804            // Copy from first block.
805            {
806                let Some(heap) = self.heap_data.as_mut() else {
807                    // This will never return here because `heap_data` can only be `None` in the destructor.
808                    return Err(ReadError::IOServerChannelFull);
809                };
810
811                heap.read_buffer.clear();
812
813                copy_block_into_read_buffer(
814                    heap,
815                    self.current_block_index,
816                    self.current_frame_in_block,
817                    first_len,
818                );
819            }
820
821            self.advance_to_next_block()?;
822
823            // Copy from second block
824            {
825                let Some(heap) = self.heap_data.as_mut() else {
826                    // This will never return here because `heap_data` can only be `None` in the destructor.
827                    return Err(ReadError::IOServerChannelFull);
828                };
829
830                copy_block_into_read_buffer(heap, self.current_block_index, 0, second_len);
831            }
832
833            self.current_frame_in_block = second_len;
834        } else {
835            // Only need to copy from current block.
836            {
837                let Some(heap) = self.heap_data.as_mut() else {
838                    // This will never return here because `heap_data` can only be `None` in the destructor.
839                    return Err(ReadError::IOServerChannelFull);
840                };
841
842                heap.read_buffer.clear();
843
844                copy_block_into_read_buffer(
845                    heap,
846                    self.current_block_index,
847                    self.current_frame_in_block,
848                    frames,
849                );
850            }
851
852            self.current_frame_in_block = end_frame_in_block;
853            if self.current_frame_in_block == self.block_size {
854                self.advance_to_next_block()?;
855                self.current_frame_in_block = 0;
856            }
857        }
858
859        let Some(heap) = self.heap_data.as_mut() else {
860            // This will never return here because `heap_data` can only be `None` in the destructor.
861            return Err(ReadError::IOServerChannelFull);
862        };
863
864        // This check should never fail because it can only be `None` in the destructor.
865        Ok(ReadData::new(
866            &heap.read_buffer,
867            frames,
868            reached_end_of_file,
869        ))
870    }
871
872    fn advance_to_next_block(&mut self) -> Result<(), ReadError<D::FatalError>> {
873        let Some(heap) = self.heap_data.as_mut() else {
874            // This will never return here because `heap_data` can only be `None` in the destructor.
875            return Ok(());
876        };
877
878        let entry = &mut heap.prefetch_buffer[self.current_block_index];
879
880        // Request a new block of data that is one block ahead of the
881        // latest block in the prefetch buffer.
882        let wanted_start_frame = self.current_block_start_frame + (self.prefetch_size);
883
884        entry.use_cache_index = None;
885        entry.wanted_start_frame = wanted_start_frame;
886
887        // This cannot fail because the caller function `read` makes sure there
888        // is at-least one slot open before calling this function.
889        let _ = self.to_server_tx.push(ClientToServerMsg::ReadIntoBlock {
890            block_index: self.current_block_index,
891            // Send block to be re-used by the IO server.
892            block: entry.block.take(),
893            start_frame: wanted_start_frame,
894        });
895
896        self.current_block_index += 1;
897        if self.current_block_index >= self.num_prefetch_blocks {
898            self.current_block_index = 0;
899        }
900
901        self.next_block_index += 1;
902        if self.next_block_index >= self.num_prefetch_blocks {
903            self.next_block_index = 0;
904        }
905
906        self.current_block_start_frame += self.block_size;
907
908        Ok(())
909    }
910
911    /// Return the current frame of the playhead.
912    ///
913    /// This is realtime-safe.
914    pub fn playhead(&self) -> usize {
915        self.current_block_start_frame + self.current_frame_in_block
916    }
917
918    /// Return info about the file.
919    ///
920    /// This is realtime-safe.
921    pub fn info(&self) -> &FileInfo<D::FileParams> {
922        &self.file_info
923    }
924
925    /// Return the block size used by this decoder.
926    ///
927    /// This is realtime-safe.
928    pub fn block_size(&self) -> usize {
929        self.block_size
930    }
931}
932
933impl<D: Decoder> Drop for ReadDiskStream<D> {
934    fn drop(&mut self) {
935        // Tell the server to deallocate any heap data.
936        // This cannot fail because this is the only place the signal is ever sent.
937        let _ = self.close_signal_tx.push(self.heap_data.take());
938    }
939}
940
941fn copy_block_into_read_buffer<T: Copy + Default + Send>(
942    heap: &mut HeapData<T>,
943    block_index: usize,
944    start_frame_in_block: usize,
945    frames: usize,
946) {
947    let block_entry = &heap.prefetch_buffer[block_index];
948
949    let maybe_block = match block_entry.use_cache_index {
950        Some(cache_index) => heap.caches[cache_index]
951            .cache
952            .as_ref()
953            .map(|cache| &cache.blocks[block_index]),
954        None => {
955            block_entry.block.as_ref()
956
957            // TODO: warn of buffer underflow.
958        }
959    };
960
961    let Some(block) = maybe_block else {
962        // If no block exists, output silence.
963        for buffer_ch in heap.read_buffer.block.iter_mut() {
964            buffer_ch.resize(buffer_ch.len() + frames, Default::default());
965        }
966
967        return;
968    };
969
970    for (buffer_ch, block_ch) in heap.read_buffer.block.iter_mut().zip(block.block.iter()) {
971        // If for some reason the decoder did not fill this block fully,
972        // fill the rest with zeros.
973        if block_ch.len() < start_frame_in_block + frames {
974            if block_ch.len() <= start_frame_in_block {
975                // The block has no more data to copy, fill all frames with zeros.
976                buffer_ch.resize(buffer_ch.len() + frames, Default::default());
977            } else {
978                let copy_frames = block_ch.len() - start_frame_in_block;
979
980                buffer_ch.extend_from_slice(
981                    &block_ch[start_frame_in_block..start_frame_in_block + copy_frames],
982                );
983
984                buffer_ch.resize(buffer_ch.len() + frames - copy_frames, Default::default());
985            }
986        } else {
987            buffer_ch
988                .extend_from_slice(&block_ch[start_frame_in_block..start_frame_in_block + frames]);
989        };
990    }
991}