termusicplayback/backends/rusty/source/async_ring/
mod.rs

1use std::{fmt::Debug, future::Future, iter::FusedIterator, sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use async_ringbuf::{
5    AsyncHeapRb,
6    traits::{AsyncConsumer, AsyncProducer, Consumer, Observer, Split},
7};
8use parking_lot::RwLock;
9use rodio::Source;
10use symphonia::core::audio::SignalSpec;
11use tokio::{
12    runtime::Handle,
13    sync::{mpsc, oneshot},
14};
15
16use messages::{
17    MessageDataActual, MessageDataFirst, MessageDataValue, MessageSpec, MessageSpecResult,
18    RingMessages, RingMsgParse2, RingMsgWrite2,
19};
20use wrap::{ConsWrap, ProdWrap};
21
22use super::SampleType;
23
24mod messages;
25mod wrap;
26
27/// Seek Channel data, the first is the point to seek to in the decoder
28/// the second is a callback that the seek is done and how many elements to skip until new data.
29pub type SeekData = (Duration, oneshot::Sender<usize>);
30
31/// The minimal size a decode-ringbuffer should have.
32///
33/// Currently the size is based on 192kHz * 1 channel * 1 seconds, or 2 seconds of 48kHz stereo(2 channel) audio.
34// NOTE: try to keep this in-sync with the config's `RINGBUF_SIZE_DEFAULT`
35pub const MIN_RING_SIZE: usize = 192_000 * MessageDataValue::MESSAGE_SIZE;
36
37/// A ringbuffer Producer meant for wrapping [`Source`] to make decode & playback async and keep the buffer filled without having audible gaps.
38///
39/// The implementation of the Producer is meant for async code, awaiting to fully put data on the ringbuffer OR waiting for seek events.
40#[derive(Debug)]
41pub struct AsyncRingSourceProvider {
42    inner: ProdWrap,
43    seek_rx: Arc<RwLock<mpsc::Receiver<SeekData>>>,
44
45    data: Option<MessageDataActual>,
46}
47
48impl AsyncRingSourceProvider {
49    fn new(wrap: ProdWrap, seek_rx: mpsc::Receiver<SeekData>) -> Self {
50        AsyncRingSourceProvider {
51            inner: wrap,
52            seek_rx: Arc::new(RwLock::new(seek_rx)),
53            data: None,
54        }
55    }
56
57    /// Check if the consumer ([`AsyncRingSource`]) is still connected and writes are possible
58    #[allow(dead_code)] // can't use expect as this function is used in tests
59    #[must_use]
60    pub fn is_closed(&self) -> bool {
61        self.inner.is_closed()
62    }
63
64    /// Write a new spec.
65    ///
66    /// Returns [`Ok(count)`](Ok) if the message is written, with the length, [`Err`] if closed.
67    ///
68    /// # Errors
69    ///
70    /// Ringbuffer closed and no more data available.
71    #[allow(clippy::missing_panics_doc)] // https://github.com/rust-lang/rust-clippy/issues/14534
72    pub async fn new_spec(
73        &mut self,
74        spec: SignalSpec,
75        current_span_len: usize,
76    ) -> Result<usize, ()> {
77        let mut msg_buf = [0; RingMsgWrite2::get_msg_size(MessageSpec::MESSAGE_SIZE)];
78        // SAFETY: we allocated the exact necessary size, this can never fail
79        // #[expect(
80        //     clippy::missing_panics_doc,
81        //     reason = "static buffer with exact size required created above"
82        // )]
83        let _ = RingMsgWrite2::try_write_spec(spec, current_span_len, &mut msg_buf).unwrap();
84
85        self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
86
87        Ok(msg_buf.len())
88    }
89
90    /// Write a new data message, without the buffer yet.
91    ///
92    /// Returns [`Ok(count)`](Ok) if the message is written, with the length, [`Err`] if closed.
93    ///
94    /// # Errors
95    ///
96    /// Ringbuffer closed and no more data available.
97    #[allow(clippy::missing_panics_doc)] // https://github.com/rust-lang/rust-clippy/issues/14534
98    async fn new_data(&mut self, length: usize) -> Result<usize, ()> {
99        let mut msg_buf = [0; RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE)];
100        // SAFETY: we allocated the exact necessary size, this can never fail
101        // #[expect(
102        //     clippy::missing_panics_doc,
103        //     reason = "static buffer with exact size required created above"
104        // )]
105        let (data, _written) = RingMsgWrite2::try_write_data_first(length, &mut msg_buf).unwrap();
106
107        self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
108
109        self.data = Some(data);
110
111        Ok(msg_buf.len())
112    }
113
114    /// Write a buffer's content.
115    ///
116    /// This functions expects a data message to be active.
117    ///
118    /// Returns [`Ok(count)`](Ok) if the message is written, with the length, [`Err`] if closed.
119    ///
120    /// # Errors
121    ///
122    /// Ringbuffer closed and no more data available.
123    async fn write_data_inner(&mut self, data: &[u8]) -> Result<usize, ()> {
124        let Some(msg) = &mut self.data else {
125            unimplemented!("This should be checked outside of the function");
126        };
127
128        let buf = &data[msg.get_range()];
129        self.inner.push_exact(buf).await.map_err(|_| ())?;
130        msg.advance_read(buf.len());
131
132        Ok(buf.len())
133    }
134
135    /// Write a given buffer as a data message.
136    ///
137    /// Returns [`Ok(count)`](Ok) if the message is written, with the length, [`Err`] if closed.
138    ///
139    /// # Errors
140    ///
141    /// Ringbuffer closed and no more data available.
142    #[allow(clippy::missing_panics_doc)] // https://github.com/rust-lang/rust-clippy/issues/14534
143    pub async fn write_data(&mut self, data: &[u8]) -> Result<usize, ()> {
144        if data.is_empty() {
145            return Err(());
146        }
147
148        let mut written = 0;
149        if self.data.is_none() {
150            written += self.new_data(data.len()).await?;
151        }
152
153        // #[expect(
154        //     clippy::missing_panics_doc,
155        //     reason = "it is ensured to be Some via if and `new_data` above"
156        // )]
157        while !self.data.as_mut().unwrap().is_done() && !self.inner.is_closed() {
158            written += self.write_data_inner(data).await?;
159        }
160
161        self.data.take();
162
163        Ok(written)
164    }
165
166    /// Write a EOS message.
167    ///
168    /// Returns [`Ok(count)`](Ok) if the message is written, with the length, [`Err`] if closed.
169    ///
170    /// # Errors
171    ///
172    /// Ringbuffer closed and no more data available.
173    #[allow(clippy::missing_panics_doc)] // https://github.com/rust-lang/rust-clippy/issues/14534
174    pub async fn new_eos(&mut self) -> Result<usize, ()> {
175        let mut msg_buf = [0; RingMsgWrite2::get_msg_size(0)];
176        // SAFETY: we allocated the exact necessary size, this can never fail
177        // #[expect(
178        //     clippy::missing_panics_doc,
179        //     reason = "static buffer with exact size required created above"
180        // )]
181        let _ = RingMsgWrite2::try_write_eos(&mut msg_buf).unwrap();
182
183        self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
184
185        Ok(msg_buf.len())
186    }
187
188    /// Wait until the seek channel is dropped([`None`]) or a seek is requested([`Some`]).
189    pub fn wait_seek(&mut self) -> WaitSeek {
190        WaitSeek(self.seek_rx.clone())
191    }
192
193    /// Process a seek and call the Consumer to resume.
194    ///
195    /// This clear all data state, calls the consumer to resume with how many bytes to skip
196    /// and sends the new spec (in case it changed in the seek).
197    pub async fn process_seek(
198        &mut self,
199        spec: SignalSpec,
200        current_span_len: usize,
201        cb: oneshot::Sender<usize>,
202    ) {
203        self.data.take();
204        let bytes_to_skip = self.inner.occupied_len();
205        let _ = cb.send(bytes_to_skip);
206        let _ = self.new_spec(spec, current_span_len).await;
207    }
208}
209
210/// Struct to wait on a channel, without the compiler complaining that `self` can't be borrowed mutably multiple times.
211#[derive(Debug)]
212pub struct WaitSeek(Arc<RwLock<mpsc::Receiver<SeekData>>>);
213
214impl Future for WaitSeek {
215    type Output = Option<SeekData>;
216
217    fn poll(
218        self: std::pin::Pin<&mut Self>,
219        cx: &mut std::task::Context<'_>,
220    ) -> std::task::Poll<Self::Output> {
221        self.0.write().poll_recv(cx)
222    }
223}
224
225/// A ringbuffer Consumer meant for wrapping [`Source`] to make decode & playback async and keep the buffer filled without having audible gaps.
226///
227/// The implementation of the Consumer is meant for sync code, only blocking when having to wait for more data.
228#[derive(Debug)]
229pub struct AsyncRingSource {
230    inner: ConsWrap,
231    /// Send a seek-to value to the producer.
232    /// Also used to indicate to the producer that it should keep active.
233    seek_tx: Option<mpsc::Sender<SeekData>>,
234
235    // random default size, 1024*32
236    buf: StaticBuf<32768>,
237    last_msg: Option<MessageDataActual>,
238    handle: Handle,
239
240    // cached information on how to treat current data until a update
241    channels: u16,
242    rate: u32,
243    /// Always above 0, unless EOS had been reached.
244    current_span_len: usize,
245    total_duration: Option<Duration>,
246}
247
248impl AsyncRingSource {
249    /// Create a new ringbuffer, with initial channel spec and at least [`MIN_RING_SIZE`].
250    ///
251    /// # Panics
252    ///
253    /// If the given channels in `spec` cannot be converted to a u16 (which should never happen as there are only 26 defined channels)
254    #[must_use]
255    pub fn new(
256        spec: SignalSpec,
257        total_duration: Option<Duration>,
258        current_span_len: usize,
259        size: usize,
260        handle: Handle,
261    ) -> (AsyncRingSourceProvider, Self) {
262        let size = size.max(MIN_RING_SIZE);
263        let ringbuf = AsyncHeapRb::<u8>::new(size);
264        let (prod, cons) = ringbuf.split();
265        // Channel if exactly 1 message size, as seeks should not happen often, and if they do, only one can be processed at once.
266        let (tx, rx) = mpsc::channel(1);
267
268        let async_prod = AsyncRingSourceProvider::new(ProdWrap::new(prod), rx);
269        let async_cons = Self {
270            inner: ConsWrap::new(cons),
271            seek_tx: Some(tx),
272            // SAFETY: as of symphonia 0.5.4, there can only be at most 26 channels (Channels::all().count())
273            channels: u16::try_from(spec.channels.count())
274                .expect("Channel size to be within u16::MAX"),
275            rate: spec.rate,
276            total_duration,
277            current_span_len,
278            last_msg: None,
279            buf: StaticBuf::new(),
280            handle,
281        };
282
283        (async_prod, async_cons)
284    }
285
286    /// Ensure there is a complete message in `last_msg`.
287    ///
288    /// This function assumes there is no current message.
289    #[must_use]
290    fn read_msg(&mut self) -> Option<RingMsgParse2> {
291        // trace!("Reading a message from the ringbuffer");
292
293        self.load_more_data(1)?;
294
295        assert!(!self.buf.is_empty());
296
297        let detected_type = {
298            let detect_byte = {
299                // SAFETY: we loaded and asserted that there is at least one byte
300                let byte = self.buf.get_ref()[0];
301                self.buf.advance_beginning(1);
302                byte
303            };
304
305            RingMessages::from_u8(detect_byte)
306        };
307
308        // Eos event does not have more than the id itself
309        if detected_type == RingMessages::Eos {
310            return Some(RingMsgParse2::Eos);
311        }
312
313        let mut wait_for_bytes = 1;
314        let mut total = 0;
315        loop {
316            total += 1;
317            // "buf.is_empty" is safe here as all messages consume the buffer fully here.
318            if self.inner.is_closed() && self.inner.is_empty() && self.buf.is_empty() {
319                return None;
320            }
321
322            self.load_more_data(wait_for_bytes)?;
323
324            // Sanity check against infinite loop
325            assert!(total < 10);
326
327            let (msg, read) = match detected_type {
328                RingMessages::Data => {
329                    let (data_res, read) = match MessageDataFirst::try_read_buf(self.buf.get_ref())
330                    {
331                        Ok(v) => v,
332                        Err(wait_for) => {
333                            wait_for_bytes = wait_for + self.buf.len();
334                            continue;
335                        }
336                    };
337
338                    (RingMsgParse2::Data(data_res), read)
339                }
340                RingMessages::Spec => {
341                    let (spec_res, read) = match MessageSpec::try_read_buf(self.buf.get_ref()) {
342                        Ok(v) => v,
343                        Err(wait_for) => {
344                            wait_for_bytes = wait_for + self.buf.len();
345                            continue;
346                        }
347                    };
348
349                    self.apply_spec_msg(spec_res);
350
351                    (RingMsgParse2::Spec, read)
352                }
353                RingMessages::Eos => unreachable!("Message EOS is returned earlier"),
354            };
355
356            assert!(read > 0);
357
358            self.buf.advance_beginning(read);
359
360            return Some(msg);
361        }
362    }
363
364    /// Loads more data into the current buffer, if the current buffer does not have at least `wait_for_bytes` bytes.
365    ///
366    /// This function will **not** block when there is `wait_for_bytes` available, but **will** block otherwise.
367    ///
368    /// Returns [`Some`] if the current buffer now has at least `wait_for_bytes` buffered, [`None`] if the buffer closed and not enough can be loaded anymore.
369    fn load_more_data(&mut self, wait_for_bytes: usize) -> Option<()> {
370        if self.buf.len() >= wait_for_bytes {
371            return Some(());
372        }
373
374        self.buf.maybe_need_move();
375
376        // wait for at least one element being occupied,
377        // more elements would mean to wait until they all are there, regardless if they are part of the message or not
378
379        // Avoid calling into async-runtime and async code if the ringbuffer knowingly already contains enough bytes
380        // as reading can be done sync, non-blocking for buffer copies.
381        // Also SAFETY: "occupied_len" says it "could be more or less" but we are in the consumer here, so we known it won't *decrease*
382        // between here and actually reading it.
383        if self.inner.occupied_len() < wait_for_bytes {
384            // Avoid having to call async stuff for as long as possible, as that can heavily increase CPU load in a hot path.
385            // When not doing this, cpu load can be 1.0~1.4 on average.
386            // When doing the current way, the load is ~0.5~0.6 on average, the same as if running the decoder directly as
387            // as source instead of using this ringbuffer.
388            self.handle
389                .block_on(self.inner.wait_occupied(wait_for_bytes));
390        }
391
392        if self.inner.is_closed() && self.inner.is_empty() {
393            return None;
394        }
395
396        let written = self.inner.pop_slice(self.buf.get_spare_mut());
397        self.buf.advance_len(written);
398
399        // Sanity, this point should never be reached if the buffer contains enough data or there is no more data to read
400        debug_assert!(written > 0);
401
402        Some(())
403    }
404
405    /// Apply a new spec from the current message.
406    ///
407    /// This function assumes the current message is a [`MessageSpec`].
408    fn apply_spec_msg(&mut self, new_spec: MessageSpecResult) {
409        self.channels = new_spec.channels;
410        self.rate = new_spec.rate;
411    }
412
413    /// Read data from a Data Message.
414    ///
415    /// This function assumes the current message is a non-finished [`MessageDataActual`].
416    #[must_use]
417    fn read_data(&mut self) -> Option<SampleType> {
418        // trace!("Reading Data");
419
420        // wait until we have enough data to parse a value
421        self.load_more_data(MessageDataValue::MESSAGE_SIZE)?;
422
423        assert!(self.buf.len() >= MessageDataValue::MESSAGE_SIZE);
424
425        let msg = self.last_msg.as_mut().unwrap();
426
427        // unwrap: should never panic as we explicitly load at least the required amount above.
428        #[allow(clippy::missing_panics_doc)]
429        let (sample, read) = msg.try_read_buf(self.buf.get_ref()).unwrap();
430        self.buf.advance_beginning(read);
431
432        if msg.is_done() {
433            self.last_msg.take();
434        }
435
436        Some(sample)
437    }
438
439    /// Get whether the Seek channel is still open and available.
440    fn is_seek_channel_active(&self) -> bool {
441        self.seek_tx.as_ref().is_some_and(|v| !v.is_closed())
442    }
443}
444
445impl Source for AsyncRingSource {
446    #[inline]
447    fn current_span_len(&self) -> Option<usize> {
448        // this workaround might not be great, but on "None" rodio *never* retries reading this value
449        // but if we return "0", it assumes the stream is done.
450        if self.current_span_len == 0 && self.is_seek_channel_active() {
451            return Some(1);
452        }
453
454        Some(self.current_span_len)
455    }
456
457    #[inline]
458    #[allow(clippy::cast_possible_truncation)]
459    fn channels(&self) -> u16 {
460        self.channels
461    }
462
463    #[inline]
464    fn sample_rate(&self) -> u32 {
465        self.rate
466    }
467
468    #[inline]
469    fn total_duration(&self) -> Option<Duration> {
470        self.total_duration
471    }
472
473    #[inline]
474    fn try_seek(&mut self, pos: Duration) -> Result<(), rodio::source::SeekError> {
475        trace!("Consumer Seek");
476
477        // clear the ringbuffer before sending, in case it is full and to more quickly unblock the producer
478        // though this should not be relied upon
479        self.inner.clear();
480        self.last_msg.take();
481        self.buf.clear();
482
483        let (cb_tx, cb_rx) = oneshot::channel();
484        let _ = self.seek_tx.as_mut().unwrap().blocking_send((pos, cb_tx));
485
486        // Wait for the Producer to have processed the seek and get the final value of elements to skip
487        let to_skip = cb_rx.blocking_recv().map_err(|_| {
488            rodio::source::SeekError::Other(
489                anyhow!("Seek Callback channel exited unexpectedly").into(),
490            )
491        })?;
492
493        // skip possible new elements
494        let _ = self.inner.skip(to_skip);
495        trace!("Consumer Seek Done");
496
497        Ok(())
498    }
499}
500
501impl Iterator for AsyncRingSource {
502    type Item = SampleType;
503
504    #[inline]
505    fn next(&mut self) -> Option<Self::Item> {
506        // This uses the seek channel as a indicator to see if the decoder is still active.
507        // This initial "0" may happen in symphonia for whatever reason in ogg-vorbis, see https://github.com/tramhao/termusic/issues/566
508        // The seek channel can be used this way, as it only gets closed by the consumer (see the EOS path in the match below)
509        // or if the decoder has exited (via error or something).
510        if self.current_span_len == 0 && !self.is_seek_channel_active() {
511            return None;
512        }
513
514        loop {
515            if self.last_msg.is_some() {
516                let sample = self.read_data();
517
518                return sample;
519            }
520
521            if self.inner.is_empty() && self.inner.is_closed() {
522                debug!("DONE");
523                return None;
524            }
525
526            let msg = self.read_msg()?;
527
528            match msg {
529                RingMsgParse2::Spec => {}
530                RingMsgParse2::Data(message_data_actual) => {
531                    self.last_msg = Some(message_data_actual);
532                }
533                RingMsgParse2::Eos => {
534                    trace!("Reached EOS message");
535                    let _ = self.seek_tx.take();
536                    // this indicates to rodio via Source::current_span_len that there is no more data
537                    // and we also use it to uphold the contract with FusedIterator.
538                    self.current_span_len = 0;
539                    return None;
540                }
541            }
542        }
543    }
544}
545
546// Contract: once reaching a EOS or the ringbuffer being closed & having read all data, it will continue outputting None
547impl FusedIterator for AsyncRingSource {}
548
549/// Static Buffer allocated on the stack, which can have a moving area of actual data within
550#[derive(Debug, Clone, Copy)]
551struct StaticBuf<const N: usize> {
552    buf: [u8; N],
553    /// The length of the good data. 0 means 0 good elements (like `.len()`)
554    used_len: usize,
555    /// The length of data to skip until next clear / [`make_beginning`](Self::make_beginning) call.
556    data_start_idx: usize,
557}
558
559impl<const N: usize> StaticBuf<N> {
560    const CAPACITY: usize = N;
561
562    /// Create a new buffer.
563    ///
564    /// Size must be above 0 and and divideable by 2.
565    fn new() -> Self {
566        const {
567            assert!(N > 0);
568            assert!(N % 2 == 0);
569        }
570        Self {
571            buf: [0; N],
572            used_len: 0,
573            data_start_idx: 0,
574        }
575    }
576
577    /// The length of actual data in the buffer
578    #[inline]
579    fn len(&self) -> usize {
580        self.get_ref().len()
581    }
582
583    /// Returns `true` if there is currently no good data in the buffer
584    #[inline]
585    fn is_empty(&self) -> bool {
586        self.len() == 0
587    }
588
589    /// Rest good data length.
590    #[inline]
591    fn clear(&mut self) {
592        self.data_start_idx = 0;
593        self.used_len = 0;
594
595        // // DEBUG: this should not be necessary, but for debugging the buffer
596        // self.buf.fill(u8::MAX);
597    }
598
599    /// Get a mutable reference to the slice from `data_start` until end.
600    ///
601    /// May contain bad data.
602    /// And [`advance_len`](Self::advance_len) needs to be called afterward with the written size.
603    #[inline]
604    #[allow(unused)]
605    fn get_mut(&mut self) -> &mut [u8] {
606        &mut self.buf[self.data_start_idx..]
607    }
608
609    /// Get a mutable reference to the unused portion of the buffer (starting from `len`)
610    ///
611    /// May contain bad data.
612    /// And [`advance_len`](Self::advance_len) needs to be called afterward with the written size.
613    #[inline]
614    fn get_spare_mut(&mut self) -> &mut [u8] {
615        &mut self.buf[self.used_len..]
616    }
617
618    /// Get a reference to the slice which contains good data
619    #[inline]
620    fn get_ref(&self) -> &[u8] {
621        &self.buf[self.data_start_idx..self.used_len]
622    }
623
624    /// Move the data to the beginning, if start is above half the capacity
625    #[inline]
626    fn maybe_need_move(&mut self) {
627        // Fast-path: clear if start idx is above 0 and there are no good elements
628        if self.data_start_idx > 0 && self.is_empty() {
629            self.clear();
630            return;
631        }
632
633        // Only move to the beginning if the start idx is above half the buffer size.
634        if self.data_start_idx > Self::CAPACITY / 2 {
635            self.make_beginning();
636        }
637    }
638
639    /// Move the data to beginning if it is not already.
640    fn make_beginning(&mut self) {
641        let range = self.data_start_idx..self.used_len;
642        let range_len = range.len();
643        self.buf.copy_within(range, 0);
644        self.used_len = range_len;
645        self.data_start_idx = 0;
646
647        // // DEBUG: this should not be necessary, but for debugging the buffer
648        // self.buf[range_len..].fill(u8::MAX);
649    }
650
651    /// Advance the initialized data length by the given count.
652    ///
653    /// # Panics
654    ///
655    /// If the given `by` count plus the current `length` will result in higher than `capacity`.
656    #[inline]
657    fn advance_len(&mut self, by: usize) {
658        self.set_len(self.len() + by);
659    }
660
661    /// Set the length of the buffer to the written size plus data start, ie how the buffer was given from [`get_mut`](Self::get_mut).
662    ///
663    /// # Panics
664    ///
665    /// If the given `written` plus the current `start_idx` will result in higher than `capacity`.
666    #[inline]
667    fn set_len(&mut self, written: usize) {
668        self.used_len = self.data_start_idx + written;
669        assert!(self.used_len <= self.buf.len());
670    }
671
672    /// Advance the start index.
673    ///
674    /// Use [`make_beginning`](Self::make_beginning) to move all data to the front again.
675    ///
676    /// # Panics
677    ///
678    /// If the given `by` count plus the current `start_idx` will result in higher than `capacity`.
679    #[inline]
680    fn advance_beginning(&mut self, by: usize) {
681        self.data_start_idx += by;
682        assert!(self.data_start_idx <= self.buf.len());
683
684        // // DEBUG: this should not be necessary, but for debugging the buffer
685        // self.buf[0..self.data_start_idx].fill(u8::MAX);
686    }
687}
688
689#[cfg(test)]
690#[allow(clippy::assertions_on_constants)] // make sure that tests fail if expectations change
691mod tests {
692
693    mod static_buffer {
694        use crate::backends::rusty::source::async_ring::StaticBuf;
695
696        #[test]
697        fn should_work() {
698            let mut buf = StaticBuf::<32>::new();
699            assert_eq!(buf.len(), 0);
700            assert_eq!(buf.get_ref().len(), 0);
701            assert_eq!(buf.len(), buf.used_len);
702            assert_eq!(buf.get_mut().len(), 32);
703            assert_eq!(buf.get_ref(), &[0u8; 0]);
704
705            buf.get_mut()[0] = u8::MAX;
706            buf.set_len(1);
707
708            assert_eq!(buf.len(), 1);
709            assert_eq!(buf.get_ref().len(), 1);
710            assert_eq!(buf.len(), buf.used_len);
711            assert_eq!(buf.get_mut().len(), 32);
712            assert_eq!(buf.get_ref(), &[u8::MAX; 1]);
713
714            buf.advance_beginning(1);
715            assert_eq!(buf.len(), 0);
716            assert_eq!(buf.used_len, 1);
717            assert_ne!(buf.len(), buf.used_len);
718            assert_eq!(buf.get_mut().len(), 31);
719            assert_eq!(buf.get_ref(), &[0u8; 0]);
720
721            buf.get_mut().fill(u8::MAX);
722            buf.set_len(31);
723
724            assert_eq!(buf.len(), 31);
725            assert_eq!(buf.used_len, 32);
726            assert_ne!(buf.len(), buf.used_len);
727            assert_eq!(buf.get_mut().len(), 31);
728            assert_eq!(buf.get_ref(), &[u8::MAX; 31]);
729
730            buf.make_beginning();
731
732            assert_eq!(buf.len(), 31);
733            assert_eq!(buf.len(), buf.used_len);
734            assert_eq!(buf.get_mut().len(), 32);
735            assert_eq!(buf.get_ref(), &[u8::MAX; 31]);
736
737            buf.advance_beginning(15);
738
739            assert_eq!(buf.len(), 16);
740            assert_ne!(buf.len(), buf.used_len);
741            assert_eq!(buf.used_len, 31);
742            assert_eq!(buf.get_mut().len(), 17);
743            assert_eq!(buf.get_ref(), &[u8::MAX; 16]);
744
745            buf.clear();
746
747            assert_eq!(buf.len(), 0);
748            assert_eq!(buf.len(), buf.used_len);
749            assert_eq!(buf.get_ref().len(), 0);
750            assert_eq!(buf.get_mut().len(), 32);
751            assert_eq!(buf.get_ref(), &[0u8; 0]);
752        }
753
754        #[test]
755        #[should_panic(expected = "self.used_len <= self.buf.len()")]
756        fn length_capacity() {
757            let mut buf = StaticBuf::<16>::new();
758            buf.set_len(17);
759        }
760
761        #[test]
762        #[should_panic(expected = "self.data_start_idx <= self.buf.len()")]
763        fn beginning_capacity() {
764            let mut buf = StaticBuf::<16>::new();
765            buf.advance_beginning(17);
766        }
767
768        #[test]
769        fn advance_length() {
770            let mut buf = StaticBuf::<32>::new();
771            assert_eq!(buf.len(), 0);
772            assert_eq!(buf.get_ref().len(), 0);
773            assert_eq!(buf.get_mut().len(), 32);
774            assert_eq!(buf.get_spare_mut().len(), 32);
775            assert_eq!(buf.get_ref(), &[0u8; 0]);
776
777            buf.get_mut()[..4].fill(4);
778            buf.advance_len(4);
779
780            assert_eq!(buf.len(), 4);
781            assert_eq!(buf.get_ref().len(), 4);
782            assert_eq!(buf.get_mut().len(), 32);
783            assert_eq!(buf.get_spare_mut().len(), 28);
784            let expected = &[4u8; 4];
785            assert_eq!(buf.get_ref(), expected);
786
787            buf.get_spare_mut()[..4].fill(6);
788            buf.advance_len(4);
789
790            assert_eq!(buf.len(), 8);
791            assert_eq!(buf.get_ref().len(), 8);
792            assert_eq!(buf.get_mut().len(), 32);
793            assert_eq!(buf.get_spare_mut().len(), 24);
794            let expected: Vec<_> = [4u8; 4].into_iter().chain([6u8; 4]).collect();
795            assert_eq!(buf.get_ref(), &expected);
796
797            buf.advance_beginning(5);
798
799            assert_eq!(buf.len(), 3);
800            assert_eq!(buf.get_ref().len(), 3);
801            assert_eq!(buf.get_mut().len(), 27);
802            assert_eq!(buf.get_spare_mut().len(), 24);
803            let expected = &[6u8; 3];
804            assert_eq!(buf.get_ref(), expected);
805        }
806    }
807
808    mod ringbuffer {
809        use std::{sync::Arc, time::Duration};
810
811        use async_ringbuf::traits::Observer;
812        use parking_lot::Mutex;
813        use symphonia::core::audio::{Channels, SignalSpec};
814
815        use crate::backends::rusty::source::{
816            SampleType,
817            async_ring::{
818                AsyncRingSource, MIN_RING_SIZE, MessageDataFirst, MessageSpec, RingMsgWrite2,
819            },
820        };
821
822        #[tokio::test]
823        async fn should_work() {
824            let mut send = Vec::new();
825            let recv = Arc::new(Mutex::new(Vec::new()));
826
827            let (mut prod, mut cons) = AsyncRingSource::new(
828                SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
829                None,
830                1024,
831                0,
832                tokio::runtime::Handle::current(),
833            );
834
835            assert_eq!(prod.inner.capacity().get(), MIN_RING_SIZE);
836
837            let recv_c = recv.clone();
838            let handle = tokio::task::spawn_blocking(move || {
839                let mut lock = recv_c.lock();
840                for num in cons.by_ref() {
841                    lock.extend_from_slice(&num.to_ne_bytes());
842                }
843                assert_eq!(cons.inner.occupied_len(), 0);
844            });
845
846            let first_data = 1f32.to_le_bytes().repeat(1024);
847            let written = prod.write_data(&first_data).await.unwrap();
848            assert_eq!(
849                written,
850                RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
851            );
852            send.extend_from_slice(&first_data);
853
854            let new_spec = SignalSpec::new(48000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT);
855            let written = prod.new_spec(new_spec, 1024).await.unwrap();
856            assert_eq!(
857                written,
858                RingMsgWrite2::get_msg_size(MessageSpec::MESSAGE_SIZE)
859            );
860
861            let second_data = 2f32.to_le_bytes().repeat(1024);
862            let written = prod.write_data(&second_data).await.unwrap();
863            assert_eq!(
864                written,
865                RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + second_data.len())
866            );
867            send.extend_from_slice(&second_data);
868
869            let written = prod.new_eos().await.unwrap();
870            assert_eq!(written, RingMsgWrite2::get_msg_size(0));
871
872            prod.write_data(&[]).await.unwrap_err();
873
874            // just to prevent an infinitely running test due to a deadlock
875            let res = tokio::time::timeout(Duration::from_secs(3), handle)
876                .await
877                .is_ok();
878            assert!(res, "Read Task did not complete within 3 seconds");
879
880            assert!(prod.is_closed());
881            assert!(prod.inner.is_empty());
882
883            let recv_lock = recv.lock();
884            let value_size = size_of::<SampleType>();
885            assert_eq!(send.len(), value_size * 1024 * 2);
886            assert_eq!(recv_lock.len(), value_size * 1024 * 2);
887
888            assert_eq!(*send, *recv_lock);
889        }
890
891        // the producer should not exit before the consumer in a actual use-case
892        // as the producer may need to still process and output a seek request
893        #[tokio::test]
894        async fn prod_should_not_exist_before_cons() {
895            let order = Arc::new(Mutex::new(Vec::new()));
896
897            let (mut prod, mut cons) = AsyncRingSource::new(
898                SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
899                None,
900                1024,
901                0,
902                tokio::runtime::Handle::current(),
903            );
904
905            let obsv = prod.inner.observe();
906
907            assert!(obsv.read_is_held());
908            assert!(obsv.write_is_held());
909            let order_c = order.clone();
910
911            let cons_handle = tokio::task::spawn_blocking(move || {
912                for num in cons.by_ref() {
913                    let _ = num;
914                }
915                order_c.lock().push("recv_eos");
916                assert_eq!(cons.inner.occupied_len(), 0);
917                assert!(cons.seek_tx.is_none());
918            });
919
920            let obsv_c = obsv.clone();
921            let order_c = order.clone();
922
923            let prod_handle = tokio::task::spawn(async move {
924                let first_data = 1f32.to_le_bytes().repeat(1024);
925                let written = prod.write_data(&first_data).await.unwrap();
926                assert_eq!(
927                    written,
928                    RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
929                );
930
931                assert!(obsv_c.read_is_held());
932                assert!(obsv_c.write_is_held());
933
934                order_c.lock().push("send_eos");
935                let written = prod.new_eos().await.unwrap();
936                assert_eq!(written, RingMsgWrite2::get_msg_size(0));
937
938                // "wait_seek" can only return here if the channel gets closed, which happens when the consumer reaches the EOS message (not just on drop)
939                let _ = prod.wait_seek().await;
940                assert!(prod.seek_rx.read().is_closed());
941                order_c.lock().push("prod");
942            });
943
944            // just to prevent an infinitely running test due to a deadlock
945            let res = tokio::time::timeout(Duration::from_secs(3), cons_handle)
946                .await
947                .is_ok();
948            assert!(res, "Read Task did not complete within 3 seconds");
949
950            assert!(!obsv.read_is_held());
951
952            // just to prevent an infinitely running test due to a deadlock
953            let res = tokio::time::timeout(Duration::from_secs(3), prod_handle)
954                .await
955                .is_ok();
956            assert!(res, "Write Task did not complete within 3 seconds");
957
958            assert!(!obsv.write_is_held());
959
960            // consumer should always exit first (unless explicit tests), because the consumer may signal a seek where the producer needs to respond to, even if a EOS was already send.
961            let lock = order.lock();
962            // "send_eos" is always ensured to happen before the other events
963            assert_eq!(&lock[..1], &["send_eos"]);
964            // but any other depend on the scheduling, unless the actual code is modified to send those events (which is not done)
965            assert!(lock[1..].contains(&"recv_eos"));
966            assert!(lock[1..].contains(&"prod"));
967            assert_eq!(lock.len(), 3);
968        }
969
970        // even if the producer (due to some error or otherwise) exits with eos, the consumer should consume everything still available
971        #[tokio::test]
972        async fn should_consume_on_prod_exit_eos() {
973            let recv = Arc::new(Mutex::new(Vec::new()));
974
975            let (mut prod, mut cons) = AsyncRingSource::new(
976                SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
977                None,
978                1024,
979                0,
980                tokio::runtime::Handle::current(),
981            );
982
983            let recv_c = recv.clone();
984            let handle = tokio::task::spawn_blocking(move || {
985                let mut lock = recv_c.lock();
986                for num in cons.by_ref() {
987                    lock.extend_from_slice(&num.to_ne_bytes());
988                }
989                assert_eq!(cons.inner.occupied_len(), 0);
990                assert!(!cons.inner.write_is_held());
991            });
992
993            let first_data = 1f32.to_le_bytes().repeat(1024);
994            let written = prod.write_data(&first_data).await.unwrap();
995            assert_eq!(
996                written,
997                RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
998            );
999
1000            let written = prod.new_eos().await.unwrap();
1001            assert_eq!(written, RingMsgWrite2::get_msg_size(0));
1002
1003            let obsv = prod.inner.observe();
1004            drop(prod);
1005
1006            assert!(!obsv.write_is_held());
1007            // dont check read as that *could* have consumed and exited already
1008            // assert_eq!(obsv.read_is_held(), true);
1009
1010            // just to prevent an infinitely running test due to a deadlock
1011            let res = tokio::time::timeout(Duration::from_secs(3), handle)
1012                .await
1013                .is_ok();
1014            assert!(res, "Read Task did not complete within 3 seconds");
1015
1016            assert!(!obsv.write_is_held());
1017            assert!(!obsv.read_is_held());
1018
1019            let recv_lock = recv.lock();
1020            let value_size = size_of::<SampleType>();
1021            assert_eq!(recv_lock.len(), value_size * 1024);
1022
1023            assert_eq!(*recv_lock, first_data.as_slice());
1024        }
1025
1026        // even if the producer (due to some error or otherwise) exits without, the consumer should consume everything still available
1027        #[tokio::test]
1028        async fn should_consume_on_prod_exit() {
1029            let recv = Arc::new(Mutex::new(Vec::new()));
1030
1031            let (mut prod, mut cons) = AsyncRingSource::new(
1032                SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
1033                None,
1034                1024,
1035                0,
1036                tokio::runtime::Handle::current(),
1037            );
1038
1039            let recv_c = recv.clone();
1040            let handle = tokio::task::spawn_blocking(move || {
1041                let mut lock = recv_c.lock();
1042                for num in cons.by_ref() {
1043                    lock.extend_from_slice(&num.to_ne_bytes());
1044                }
1045                assert_eq!(cons.inner.occupied_len(), 0);
1046                assert!(!cons.inner.write_is_held());
1047            });
1048
1049            let first_data = 1f32.to_le_bytes().repeat(1024);
1050            let written = prod.write_data(&first_data).await.unwrap();
1051            assert_eq!(
1052                written,
1053                RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
1054            );
1055
1056            let obsv = prod.inner.observe();
1057            drop(prod);
1058
1059            assert!(!obsv.write_is_held());
1060            // dont check read as that *could* have consumed and exited already
1061            // assert_eq!(obsv.read_is_held(), true);
1062
1063            // just to prevent an infinitely running test due to a deadlock
1064            let res = tokio::time::timeout(Duration::from_secs(3), handle)
1065                .await
1066                .is_ok();
1067            assert!(res, "Read Task did not complete within 3 seconds");
1068
1069            assert!(!obsv.write_is_held());
1070            assert!(!obsv.read_is_held());
1071
1072            let recv_lock = recv.lock();
1073            let value_size = size_of::<SampleType>();
1074            assert_eq!(recv_lock.len(), value_size * 1024);
1075
1076            assert_eq!(*recv_lock, first_data.as_slice());
1077        }
1078    }
1079}