1use std::{fmt::Debug, future::Future, iter::FusedIterator, sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use async_ringbuf::{
5 AsyncHeapRb,
6 traits::{AsyncConsumer, AsyncProducer, Consumer, Observer, Split},
7};
8use parking_lot::RwLock;
9use rodio::Source;
10use symphonia::core::audio::SignalSpec;
11use tokio::{
12 runtime::Handle,
13 sync::{mpsc, oneshot},
14};
15
16use messages::{
17 MessageDataActual, MessageDataFirst, MessageDataValue, MessageSpec, MessageSpecResult,
18 RingMessages, RingMsgParse2, RingMsgWrite2,
19};
20use wrap::{ConsWrap, ProdWrap};
21
22use super::SampleType;
23
24mod messages;
25mod wrap;
26
27pub type SeekData = (Duration, oneshot::Sender<usize>);
30
31pub const MIN_RING_SIZE: usize = 192_000 * MessageDataValue::MESSAGE_SIZE;
36
37#[derive(Debug)]
41pub struct AsyncRingSourceProvider {
42 inner: ProdWrap,
43 seek_rx: Arc<RwLock<mpsc::Receiver<SeekData>>>,
44
45 data: Option<MessageDataActual>,
46}
47
48impl AsyncRingSourceProvider {
49 fn new(wrap: ProdWrap, seek_rx: mpsc::Receiver<SeekData>) -> Self {
50 AsyncRingSourceProvider {
51 inner: wrap,
52 seek_rx: Arc::new(RwLock::new(seek_rx)),
53 data: None,
54 }
55 }
56
57 #[allow(dead_code)] #[must_use]
60 pub fn is_closed(&self) -> bool {
61 self.inner.is_closed()
62 }
63
64 #[allow(clippy::missing_panics_doc)] pub async fn new_spec(
73 &mut self,
74 spec: SignalSpec,
75 current_span_len: usize,
76 ) -> Result<usize, ()> {
77 let mut msg_buf = [0; RingMsgWrite2::get_msg_size(MessageSpec::MESSAGE_SIZE)];
78 let _ = RingMsgWrite2::try_write_spec(spec, current_span_len, &mut msg_buf).unwrap();
84
85 self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
86
87 Ok(msg_buf.len())
88 }
89
90 #[allow(clippy::missing_panics_doc)] async fn new_data(&mut self, length: usize) -> Result<usize, ()> {
99 let mut msg_buf = [0; RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE)];
100 let (data, _written) = RingMsgWrite2::try_write_data_first(length, &mut msg_buf).unwrap();
106
107 self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
108
109 self.data = Some(data);
110
111 Ok(msg_buf.len())
112 }
113
114 async fn write_data_inner(&mut self, data: &[u8]) -> Result<usize, ()> {
124 let Some(msg) = &mut self.data else {
125 unimplemented!("This should be checked outside of the function");
126 };
127
128 let buf = &data[msg.get_range()];
129 self.inner.push_exact(buf).await.map_err(|_| ())?;
130 msg.advance_read(buf.len());
131
132 Ok(buf.len())
133 }
134
135 #[allow(clippy::missing_panics_doc)] pub async fn write_data(&mut self, data: &[u8]) -> Result<usize, ()> {
144 if data.is_empty() {
145 return Err(());
146 }
147
148 let mut written = 0;
149 if self.data.is_none() {
150 written += self.new_data(data.len()).await?;
151 }
152
153 while !self.data.as_mut().unwrap().is_done() && !self.inner.is_closed() {
158 written += self.write_data_inner(data).await?;
159 }
160
161 self.data.take();
162
163 Ok(written)
164 }
165
166 #[allow(clippy::missing_panics_doc)] pub async fn new_eos(&mut self) -> Result<usize, ()> {
175 let mut msg_buf = [0; RingMsgWrite2::get_msg_size(0)];
176 let _ = RingMsgWrite2::try_write_eos(&mut msg_buf).unwrap();
182
183 self.inner.push_exact(&msg_buf).await.map_err(|_| ())?;
184
185 Ok(msg_buf.len())
186 }
187
188 pub fn wait_seek(&mut self) -> WaitSeek {
190 WaitSeek(self.seek_rx.clone())
191 }
192
193 pub async fn process_seek(
198 &mut self,
199 spec: SignalSpec,
200 current_span_len: usize,
201 cb: oneshot::Sender<usize>,
202 ) {
203 self.data.take();
204 let bytes_to_skip = self.inner.occupied_len();
205 let _ = cb.send(bytes_to_skip);
206 let _ = self.new_spec(spec, current_span_len).await;
207 }
208}
209
210#[derive(Debug)]
212pub struct WaitSeek(Arc<RwLock<mpsc::Receiver<SeekData>>>);
213
214impl Future for WaitSeek {
215 type Output = Option<SeekData>;
216
217 fn poll(
218 self: std::pin::Pin<&mut Self>,
219 cx: &mut std::task::Context<'_>,
220 ) -> std::task::Poll<Self::Output> {
221 self.0.write().poll_recv(cx)
222 }
223}
224
225#[derive(Debug)]
229pub struct AsyncRingSource {
230 inner: ConsWrap,
231 seek_tx: Option<mpsc::Sender<SeekData>>,
234
235 buf: StaticBuf<32768>,
237 last_msg: Option<MessageDataActual>,
238 handle: Handle,
239
240 channels: u16,
242 rate: u32,
243 current_span_len: usize,
245 total_duration: Option<Duration>,
246}
247
248impl AsyncRingSource {
249 #[must_use]
255 pub fn new(
256 spec: SignalSpec,
257 total_duration: Option<Duration>,
258 current_span_len: usize,
259 size: usize,
260 handle: Handle,
261 ) -> (AsyncRingSourceProvider, Self) {
262 let size = size.max(MIN_RING_SIZE);
263 let ringbuf = AsyncHeapRb::<u8>::new(size);
264 let (prod, cons) = ringbuf.split();
265 let (tx, rx) = mpsc::channel(1);
267
268 let async_prod = AsyncRingSourceProvider::new(ProdWrap::new(prod), rx);
269 let async_cons = Self {
270 inner: ConsWrap::new(cons),
271 seek_tx: Some(tx),
272 channels: u16::try_from(spec.channels.count())
274 .expect("Channel size to be within u16::MAX"),
275 rate: spec.rate,
276 total_duration,
277 current_span_len,
278 last_msg: None,
279 buf: StaticBuf::new(),
280 handle,
281 };
282
283 (async_prod, async_cons)
284 }
285
286 #[must_use]
290 fn read_msg(&mut self) -> Option<RingMsgParse2> {
291 self.load_more_data(1)?;
294
295 assert!(!self.buf.is_empty());
296
297 let detected_type = {
298 let detect_byte = {
299 let byte = self.buf.get_ref()[0];
301 self.buf.advance_beginning(1);
302 byte
303 };
304
305 RingMessages::from_u8(detect_byte)
306 };
307
308 if detected_type == RingMessages::Eos {
310 return Some(RingMsgParse2::Eos);
311 }
312
313 let mut wait_for_bytes = 1;
314 let mut total = 0;
315 loop {
316 total += 1;
317 if self.inner.is_closed() && self.inner.is_empty() && self.buf.is_empty() {
319 return None;
320 }
321
322 self.load_more_data(wait_for_bytes)?;
323
324 assert!(total < 10);
326
327 let (msg, read) = match detected_type {
328 RingMessages::Data => {
329 let (data_res, read) = match MessageDataFirst::try_read_buf(self.buf.get_ref())
330 {
331 Ok(v) => v,
332 Err(wait_for) => {
333 wait_for_bytes = wait_for + self.buf.len();
334 continue;
335 }
336 };
337
338 (RingMsgParse2::Data(data_res), read)
339 }
340 RingMessages::Spec => {
341 let (spec_res, read) = match MessageSpec::try_read_buf(self.buf.get_ref()) {
342 Ok(v) => v,
343 Err(wait_for) => {
344 wait_for_bytes = wait_for + self.buf.len();
345 continue;
346 }
347 };
348
349 self.apply_spec_msg(spec_res);
350
351 (RingMsgParse2::Spec, read)
352 }
353 RingMessages::Eos => unreachable!("Message EOS is returned earlier"),
354 };
355
356 assert!(read > 0);
357
358 self.buf.advance_beginning(read);
359
360 return Some(msg);
361 }
362 }
363
364 fn load_more_data(&mut self, wait_for_bytes: usize) -> Option<()> {
370 if self.buf.len() >= wait_for_bytes {
371 return Some(());
372 }
373
374 self.buf.maybe_need_move();
375
376 if self.inner.occupied_len() < wait_for_bytes {
384 self.handle
389 .block_on(self.inner.wait_occupied(wait_for_bytes));
390 }
391
392 if self.inner.is_closed() && self.inner.is_empty() {
393 return None;
394 }
395
396 let written = self.inner.pop_slice(self.buf.get_spare_mut());
397 self.buf.advance_len(written);
398
399 debug_assert!(written > 0);
401
402 Some(())
403 }
404
405 fn apply_spec_msg(&mut self, new_spec: MessageSpecResult) {
409 self.channels = new_spec.channels;
410 self.rate = new_spec.rate;
411 }
412
413 #[must_use]
417 fn read_data(&mut self) -> Option<SampleType> {
418 self.load_more_data(MessageDataValue::MESSAGE_SIZE)?;
422
423 assert!(self.buf.len() >= MessageDataValue::MESSAGE_SIZE);
424
425 let msg = self.last_msg.as_mut().unwrap();
426
427 #[allow(clippy::missing_panics_doc)]
429 let (sample, read) = msg.try_read_buf(self.buf.get_ref()).unwrap();
430 self.buf.advance_beginning(read);
431
432 if msg.is_done() {
433 self.last_msg.take();
434 }
435
436 Some(sample)
437 }
438
439 fn is_seek_channel_active(&self) -> bool {
441 self.seek_tx.as_ref().is_some_and(|v| !v.is_closed())
442 }
443}
444
445impl Source for AsyncRingSource {
446 #[inline]
447 fn current_span_len(&self) -> Option<usize> {
448 if self.current_span_len == 0 && self.is_seek_channel_active() {
451 return Some(1);
452 }
453
454 Some(self.current_span_len)
455 }
456
457 #[inline]
458 #[allow(clippy::cast_possible_truncation)]
459 fn channels(&self) -> u16 {
460 self.channels
461 }
462
463 #[inline]
464 fn sample_rate(&self) -> u32 {
465 self.rate
466 }
467
468 #[inline]
469 fn total_duration(&self) -> Option<Duration> {
470 self.total_duration
471 }
472
473 #[inline]
474 fn try_seek(&mut self, pos: Duration) -> Result<(), rodio::source::SeekError> {
475 trace!("Consumer Seek");
476
477 self.inner.clear();
480 self.last_msg.take();
481 self.buf.clear();
482
483 let (cb_tx, cb_rx) = oneshot::channel();
484 let _ = self.seek_tx.as_mut().unwrap().blocking_send((pos, cb_tx));
485
486 let to_skip = cb_rx.blocking_recv().map_err(|_| {
488 rodio::source::SeekError::Other(
489 anyhow!("Seek Callback channel exited unexpectedly").into(),
490 )
491 })?;
492
493 let _ = self.inner.skip(to_skip);
495 trace!("Consumer Seek Done");
496
497 Ok(())
498 }
499}
500
501impl Iterator for AsyncRingSource {
502 type Item = SampleType;
503
504 #[inline]
505 fn next(&mut self) -> Option<Self::Item> {
506 if self.current_span_len == 0 && !self.is_seek_channel_active() {
511 return None;
512 }
513
514 loop {
515 if self.last_msg.is_some() {
516 let sample = self.read_data();
517
518 return sample;
519 }
520
521 if self.inner.is_empty() && self.inner.is_closed() {
522 debug!("DONE");
523 return None;
524 }
525
526 let msg = self.read_msg()?;
527
528 match msg {
529 RingMsgParse2::Spec => {}
530 RingMsgParse2::Data(message_data_actual) => {
531 self.last_msg = Some(message_data_actual);
532 }
533 RingMsgParse2::Eos => {
534 trace!("Reached EOS message");
535 let _ = self.seek_tx.take();
536 self.current_span_len = 0;
539 return None;
540 }
541 }
542 }
543 }
544}
545
546impl FusedIterator for AsyncRingSource {}
548
549#[derive(Debug, Clone, Copy)]
551struct StaticBuf<const N: usize> {
552 buf: [u8; N],
553 used_len: usize,
555 data_start_idx: usize,
557}
558
559impl<const N: usize> StaticBuf<N> {
560 const CAPACITY: usize = N;
561
562 fn new() -> Self {
566 const {
567 assert!(N > 0);
568 assert!(N % 2 == 0);
569 }
570 Self {
571 buf: [0; N],
572 used_len: 0,
573 data_start_idx: 0,
574 }
575 }
576
577 #[inline]
579 fn len(&self) -> usize {
580 self.get_ref().len()
581 }
582
583 #[inline]
585 fn is_empty(&self) -> bool {
586 self.len() == 0
587 }
588
589 #[inline]
591 fn clear(&mut self) {
592 self.data_start_idx = 0;
593 self.used_len = 0;
594
595 }
598
599 #[inline]
604 #[allow(unused)]
605 fn get_mut(&mut self) -> &mut [u8] {
606 &mut self.buf[self.data_start_idx..]
607 }
608
609 #[inline]
614 fn get_spare_mut(&mut self) -> &mut [u8] {
615 &mut self.buf[self.used_len..]
616 }
617
618 #[inline]
620 fn get_ref(&self) -> &[u8] {
621 &self.buf[self.data_start_idx..self.used_len]
622 }
623
624 #[inline]
626 fn maybe_need_move(&mut self) {
627 if self.data_start_idx > 0 && self.is_empty() {
629 self.clear();
630 return;
631 }
632
633 if self.data_start_idx > Self::CAPACITY / 2 {
635 self.make_beginning();
636 }
637 }
638
639 fn make_beginning(&mut self) {
641 let range = self.data_start_idx..self.used_len;
642 let range_len = range.len();
643 self.buf.copy_within(range, 0);
644 self.used_len = range_len;
645 self.data_start_idx = 0;
646
647 }
650
651 #[inline]
657 fn advance_len(&mut self, by: usize) {
658 self.set_len(self.len() + by);
659 }
660
661 #[inline]
667 fn set_len(&mut self, written: usize) {
668 self.used_len = self.data_start_idx + written;
669 assert!(self.used_len <= self.buf.len());
670 }
671
672 #[inline]
680 fn advance_beginning(&mut self, by: usize) {
681 self.data_start_idx += by;
682 assert!(self.data_start_idx <= self.buf.len());
683
684 }
687}
688
689#[cfg(test)]
690#[allow(clippy::assertions_on_constants)] mod tests {
692
693 mod static_buffer {
694 use crate::backends::rusty::source::async_ring::StaticBuf;
695
696 #[test]
697 fn should_work() {
698 let mut buf = StaticBuf::<32>::new();
699 assert_eq!(buf.len(), 0);
700 assert_eq!(buf.get_ref().len(), 0);
701 assert_eq!(buf.len(), buf.used_len);
702 assert_eq!(buf.get_mut().len(), 32);
703 assert_eq!(buf.get_ref(), &[0u8; 0]);
704
705 buf.get_mut()[0] = u8::MAX;
706 buf.set_len(1);
707
708 assert_eq!(buf.len(), 1);
709 assert_eq!(buf.get_ref().len(), 1);
710 assert_eq!(buf.len(), buf.used_len);
711 assert_eq!(buf.get_mut().len(), 32);
712 assert_eq!(buf.get_ref(), &[u8::MAX; 1]);
713
714 buf.advance_beginning(1);
715 assert_eq!(buf.len(), 0);
716 assert_eq!(buf.used_len, 1);
717 assert_ne!(buf.len(), buf.used_len);
718 assert_eq!(buf.get_mut().len(), 31);
719 assert_eq!(buf.get_ref(), &[0u8; 0]);
720
721 buf.get_mut().fill(u8::MAX);
722 buf.set_len(31);
723
724 assert_eq!(buf.len(), 31);
725 assert_eq!(buf.used_len, 32);
726 assert_ne!(buf.len(), buf.used_len);
727 assert_eq!(buf.get_mut().len(), 31);
728 assert_eq!(buf.get_ref(), &[u8::MAX; 31]);
729
730 buf.make_beginning();
731
732 assert_eq!(buf.len(), 31);
733 assert_eq!(buf.len(), buf.used_len);
734 assert_eq!(buf.get_mut().len(), 32);
735 assert_eq!(buf.get_ref(), &[u8::MAX; 31]);
736
737 buf.advance_beginning(15);
738
739 assert_eq!(buf.len(), 16);
740 assert_ne!(buf.len(), buf.used_len);
741 assert_eq!(buf.used_len, 31);
742 assert_eq!(buf.get_mut().len(), 17);
743 assert_eq!(buf.get_ref(), &[u8::MAX; 16]);
744
745 buf.clear();
746
747 assert_eq!(buf.len(), 0);
748 assert_eq!(buf.len(), buf.used_len);
749 assert_eq!(buf.get_ref().len(), 0);
750 assert_eq!(buf.get_mut().len(), 32);
751 assert_eq!(buf.get_ref(), &[0u8; 0]);
752 }
753
754 #[test]
755 #[should_panic(expected = "self.used_len <= self.buf.len()")]
756 fn length_capacity() {
757 let mut buf = StaticBuf::<16>::new();
758 buf.set_len(17);
759 }
760
761 #[test]
762 #[should_panic(expected = "self.data_start_idx <= self.buf.len()")]
763 fn beginning_capacity() {
764 let mut buf = StaticBuf::<16>::new();
765 buf.advance_beginning(17);
766 }
767
768 #[test]
769 fn advance_length() {
770 let mut buf = StaticBuf::<32>::new();
771 assert_eq!(buf.len(), 0);
772 assert_eq!(buf.get_ref().len(), 0);
773 assert_eq!(buf.get_mut().len(), 32);
774 assert_eq!(buf.get_spare_mut().len(), 32);
775 assert_eq!(buf.get_ref(), &[0u8; 0]);
776
777 buf.get_mut()[..4].fill(4);
778 buf.advance_len(4);
779
780 assert_eq!(buf.len(), 4);
781 assert_eq!(buf.get_ref().len(), 4);
782 assert_eq!(buf.get_mut().len(), 32);
783 assert_eq!(buf.get_spare_mut().len(), 28);
784 let expected = &[4u8; 4];
785 assert_eq!(buf.get_ref(), expected);
786
787 buf.get_spare_mut()[..4].fill(6);
788 buf.advance_len(4);
789
790 assert_eq!(buf.len(), 8);
791 assert_eq!(buf.get_ref().len(), 8);
792 assert_eq!(buf.get_mut().len(), 32);
793 assert_eq!(buf.get_spare_mut().len(), 24);
794 let expected: Vec<_> = [4u8; 4].into_iter().chain([6u8; 4]).collect();
795 assert_eq!(buf.get_ref(), &expected);
796
797 buf.advance_beginning(5);
798
799 assert_eq!(buf.len(), 3);
800 assert_eq!(buf.get_ref().len(), 3);
801 assert_eq!(buf.get_mut().len(), 27);
802 assert_eq!(buf.get_spare_mut().len(), 24);
803 let expected = &[6u8; 3];
804 assert_eq!(buf.get_ref(), expected);
805 }
806 }
807
808 mod ringbuffer {
809 use std::{sync::Arc, time::Duration};
810
811 use async_ringbuf::traits::Observer;
812 use parking_lot::Mutex;
813 use symphonia::core::audio::{Channels, SignalSpec};
814
815 use crate::backends::rusty::source::{
816 SampleType,
817 async_ring::{
818 AsyncRingSource, MIN_RING_SIZE, MessageDataFirst, MessageSpec, RingMsgWrite2,
819 },
820 };
821
822 #[tokio::test]
823 async fn should_work() {
824 let mut send = Vec::new();
825 let recv = Arc::new(Mutex::new(Vec::new()));
826
827 let (mut prod, mut cons) = AsyncRingSource::new(
828 SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
829 None,
830 1024,
831 0,
832 tokio::runtime::Handle::current(),
833 );
834
835 assert_eq!(prod.inner.capacity().get(), MIN_RING_SIZE);
836
837 let recv_c = recv.clone();
838 let handle = tokio::task::spawn_blocking(move || {
839 let mut lock = recv_c.lock();
840 for num in cons.by_ref() {
841 lock.extend_from_slice(&num.to_ne_bytes());
842 }
843 assert_eq!(cons.inner.occupied_len(), 0);
844 });
845
846 let first_data = 1f32.to_le_bytes().repeat(1024);
847 let written = prod.write_data(&first_data).await.unwrap();
848 assert_eq!(
849 written,
850 RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
851 );
852 send.extend_from_slice(&first_data);
853
854 let new_spec = SignalSpec::new(48000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT);
855 let written = prod.new_spec(new_spec, 1024).await.unwrap();
856 assert_eq!(
857 written,
858 RingMsgWrite2::get_msg_size(MessageSpec::MESSAGE_SIZE)
859 );
860
861 let second_data = 2f32.to_le_bytes().repeat(1024);
862 let written = prod.write_data(&second_data).await.unwrap();
863 assert_eq!(
864 written,
865 RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + second_data.len())
866 );
867 send.extend_from_slice(&second_data);
868
869 let written = prod.new_eos().await.unwrap();
870 assert_eq!(written, RingMsgWrite2::get_msg_size(0));
871
872 prod.write_data(&[]).await.unwrap_err();
873
874 let res = tokio::time::timeout(Duration::from_secs(3), handle)
876 .await
877 .is_ok();
878 assert!(res, "Read Task did not complete within 3 seconds");
879
880 assert!(prod.is_closed());
881 assert!(prod.inner.is_empty());
882
883 let recv_lock = recv.lock();
884 let value_size = size_of::<SampleType>();
885 assert_eq!(send.len(), value_size * 1024 * 2);
886 assert_eq!(recv_lock.len(), value_size * 1024 * 2);
887
888 assert_eq!(*send, *recv_lock);
889 }
890
891 #[tokio::test]
894 async fn prod_should_not_exist_before_cons() {
895 let order = Arc::new(Mutex::new(Vec::new()));
896
897 let (mut prod, mut cons) = AsyncRingSource::new(
898 SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
899 None,
900 1024,
901 0,
902 tokio::runtime::Handle::current(),
903 );
904
905 let obsv = prod.inner.observe();
906
907 assert!(obsv.read_is_held());
908 assert!(obsv.write_is_held());
909 let order_c = order.clone();
910
911 let cons_handle = tokio::task::spawn_blocking(move || {
912 for num in cons.by_ref() {
913 let _ = num;
914 }
915 order_c.lock().push("recv_eos");
916 assert_eq!(cons.inner.occupied_len(), 0);
917 assert!(cons.seek_tx.is_none());
918 });
919
920 let obsv_c = obsv.clone();
921 let order_c = order.clone();
922
923 let prod_handle = tokio::task::spawn(async move {
924 let first_data = 1f32.to_le_bytes().repeat(1024);
925 let written = prod.write_data(&first_data).await.unwrap();
926 assert_eq!(
927 written,
928 RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
929 );
930
931 assert!(obsv_c.read_is_held());
932 assert!(obsv_c.write_is_held());
933
934 order_c.lock().push("send_eos");
935 let written = prod.new_eos().await.unwrap();
936 assert_eq!(written, RingMsgWrite2::get_msg_size(0));
937
938 let _ = prod.wait_seek().await;
940 assert!(prod.seek_rx.read().is_closed());
941 order_c.lock().push("prod");
942 });
943
944 let res = tokio::time::timeout(Duration::from_secs(3), cons_handle)
946 .await
947 .is_ok();
948 assert!(res, "Read Task did not complete within 3 seconds");
949
950 assert!(!obsv.read_is_held());
951
952 let res = tokio::time::timeout(Duration::from_secs(3), prod_handle)
954 .await
955 .is_ok();
956 assert!(res, "Write Task did not complete within 3 seconds");
957
958 assert!(!obsv.write_is_held());
959
960 let lock = order.lock();
962 assert_eq!(&lock[..1], &["send_eos"]);
964 assert!(lock[1..].contains(&"recv_eos"));
966 assert!(lock[1..].contains(&"prod"));
967 assert_eq!(lock.len(), 3);
968 }
969
970 #[tokio::test]
972 async fn should_consume_on_prod_exit_eos() {
973 let recv = Arc::new(Mutex::new(Vec::new()));
974
975 let (mut prod, mut cons) = AsyncRingSource::new(
976 SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
977 None,
978 1024,
979 0,
980 tokio::runtime::Handle::current(),
981 );
982
983 let recv_c = recv.clone();
984 let handle = tokio::task::spawn_blocking(move || {
985 let mut lock = recv_c.lock();
986 for num in cons.by_ref() {
987 lock.extend_from_slice(&num.to_ne_bytes());
988 }
989 assert_eq!(cons.inner.occupied_len(), 0);
990 assert!(!cons.inner.write_is_held());
991 });
992
993 let first_data = 1f32.to_le_bytes().repeat(1024);
994 let written = prod.write_data(&first_data).await.unwrap();
995 assert_eq!(
996 written,
997 RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
998 );
999
1000 let written = prod.new_eos().await.unwrap();
1001 assert_eq!(written, RingMsgWrite2::get_msg_size(0));
1002
1003 let obsv = prod.inner.observe();
1004 drop(prod);
1005
1006 assert!(!obsv.write_is_held());
1007 let res = tokio::time::timeout(Duration::from_secs(3), handle)
1012 .await
1013 .is_ok();
1014 assert!(res, "Read Task did not complete within 3 seconds");
1015
1016 assert!(!obsv.write_is_held());
1017 assert!(!obsv.read_is_held());
1018
1019 let recv_lock = recv.lock();
1020 let value_size = size_of::<SampleType>();
1021 assert_eq!(recv_lock.len(), value_size * 1024);
1022
1023 assert_eq!(*recv_lock, first_data.as_slice());
1024 }
1025
1026 #[tokio::test]
1028 async fn should_consume_on_prod_exit() {
1029 let recv = Arc::new(Mutex::new(Vec::new()));
1030
1031 let (mut prod, mut cons) = AsyncRingSource::new(
1032 SignalSpec::new(44000, Channels::FRONT_LEFT | Channels::FRONT_RIGHT),
1033 None,
1034 1024,
1035 0,
1036 tokio::runtime::Handle::current(),
1037 );
1038
1039 let recv_c = recv.clone();
1040 let handle = tokio::task::spawn_blocking(move || {
1041 let mut lock = recv_c.lock();
1042 for num in cons.by_ref() {
1043 lock.extend_from_slice(&num.to_ne_bytes());
1044 }
1045 assert_eq!(cons.inner.occupied_len(), 0);
1046 assert!(!cons.inner.write_is_held());
1047 });
1048
1049 let first_data = 1f32.to_le_bytes().repeat(1024);
1050 let written = prod.write_data(&first_data).await.unwrap();
1051 assert_eq!(
1052 written,
1053 RingMsgWrite2::get_msg_size(MessageDataFirst::MESSAGE_SIZE + first_data.len())
1054 );
1055
1056 let obsv = prod.inner.observe();
1057 drop(prod);
1058
1059 assert!(!obsv.write_is_held());
1060 let res = tokio::time::timeout(Duration::from_secs(3), handle)
1065 .await
1066 .is_ok();
1067 assert!(res, "Read Task did not complete within 3 seconds");
1068
1069 assert!(!obsv.write_is_held());
1070 assert!(!obsv.read_is_held());
1071
1072 let recv_lock = recv.lock();
1073 let value_size = size_of::<SampleType>();
1074 assert_eq!(recv_lock.len(), value_size * 1024);
1075
1076 assert_eq!(*recv_lock, first_data.as_slice());
1077 }
1078 }
1079}