1use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20 collections::{HashSet, VecDeque},
21 task::{Poll, ready},
22 time::Duration,
23};
24
25const MAX_GROUP_AGE: Duration = Duration::from_secs(30);
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33 pub name: String,
34 pub priority: u8,
35}
36
37impl Track {
38 pub fn new<T: Into<String>>(name: T) -> Self {
39 Self {
40 name: name.into(),
41 priority: 0,
42 }
43 }
44
45 pub fn produce(self) -> TrackProducer {
46 TrackProducer::new(self)
47 }
48}
49
50#[derive(Default)]
51struct State {
52 groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
54 duplicates: HashSet<u64>,
55 offset: usize,
56 max_sequence: Option<u64>,
57 final_sequence: Option<u64>,
58 abort: Option<Error>,
59}
60
61impl State {
62 fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
66 let start = index.saturating_sub(self.offset);
67 for (i, slot) in self.groups.iter().enumerate().skip(start) {
68 if let Some((group, _)) = slot
69 && group.info.sequence >= min_sequence
70 {
71 return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
72 }
73 }
74
75 if self.final_sequence.is_some() {
77 Poll::Ready(Ok(None))
78 } else if let Some(err) = &self.abort {
79 Poll::Ready(Err(err.clone()))
80 } else {
81 Poll::Pending
82 }
83 }
84
85 fn poll_read_frame(
89 &self,
90 index: usize,
91 next_sequence: u64,
92 waiter: &conducer::Waiter,
93 ) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
94 let start = index.saturating_sub(self.offset);
95 let mut pending_seen = false;
96 for (i, slot) in self.groups.iter().enumerate().skip(start) {
97 let Some((group, _)) = slot else { continue };
98 if group.info.sequence < next_sequence {
99 continue;
100 }
101
102 let mut consumer = group.consume();
103 match consumer.poll_read_frame(waiter) {
104 Poll::Ready(Ok(Some(frame))) => {
105 return Poll::Ready(Ok(Some((frame, self.offset + i, group.info.sequence))));
106 }
107 Poll::Ready(Ok(None)) => continue,
108 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
109 Poll::Pending => {
110 pending_seen = true;
111 continue;
112 }
113 }
114 }
115
116 if pending_seen {
119 Poll::Pending
120 } else if self.final_sequence.is_some() {
121 Poll::Ready(Ok(None))
122 } else if let Some(err) = &self.abort {
123 Poll::Ready(Err(err.clone()))
124 } else {
125 Poll::Pending
126 }
127 }
128
129 fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
130 for (group, _) in self.groups.iter().flatten() {
132 if group.info.sequence == sequence {
133 return Poll::Ready(Ok(Some(group.consume())));
134 }
135 }
136
137 if let Some(fin) = self.final_sequence
139 && sequence >= fin
140 {
141 return Poll::Ready(Ok(None));
142 }
143
144 if let Some(err) = &self.abort {
145 return Poll::Ready(Err(err.clone()));
146 }
147
148 Poll::Pending
149 }
150
151 fn poll_closed(&self) -> Poll<Result<()>> {
152 if self.final_sequence.is_some() {
153 Poll::Ready(Ok(()))
154 } else if let Some(err) = &self.abort {
155 Poll::Ready(Err(err.clone()))
156 } else {
157 Poll::Pending
158 }
159 }
160
161 fn evict_expired(&mut self, now: tokio::time::Instant) {
168 for slot in self.groups.iter_mut() {
169 let Some((group, created_at)) = slot else { continue };
170
171 if Some(group.info.sequence) == self.max_sequence {
172 continue;
173 }
174
175 if now.duration_since(*created_at) <= MAX_GROUP_AGE {
176 break;
177 }
178
179 self.duplicates.remove(&group.info.sequence);
180 *slot = None;
181 }
182
183 while let Some(None) = self.groups.front() {
185 self.groups.pop_front();
186 self.offset += 1;
187 }
188 }
189
190 fn poll_finished(&self) -> Poll<Result<u64>> {
191 if let Some(fin) = self.final_sequence {
192 Poll::Ready(Ok(fin))
193 } else if let Some(err) = &self.abort {
194 Poll::Ready(Err(err.clone()))
195 } else {
196 Poll::Pending
197 }
198 }
199}
200
201pub struct TrackProducer {
203 pub info: Track,
204 state: conducer::Producer<State>,
205}
206
207impl TrackProducer {
208 pub fn new(info: Track) -> Self {
209 Self {
210 info,
211 state: conducer::Producer::default(),
212 }
213 }
214
215 pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
217 let group = info.produce();
218
219 let mut state = self.modify()?;
220 if let Some(fin) = state.final_sequence
221 && group.info.sequence >= fin
222 {
223 return Err(Error::Closed);
224 }
225
226 if !state.duplicates.insert(group.info.sequence) {
227 return Err(Error::Duplicate);
228 }
229
230 let now = tokio::time::Instant::now();
231 state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
232 state.groups.push_back(Some((group.clone(), now)));
233 state.evict_expired(now);
234
235 Ok(group)
236 }
237
238 pub fn append_group(&mut self) -> Result<GroupProducer> {
240 let mut state = self.modify()?;
241 let sequence = match state.max_sequence {
242 Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
243 None => 0,
244 };
245 if let Some(fin) = state.final_sequence
246 && sequence >= fin
247 {
248 return Err(Error::Closed);
249 }
250
251 let group = Group { sequence }.produce();
252
253 let now = tokio::time::Instant::now();
254 state.duplicates.insert(sequence);
255 state.max_sequence = Some(sequence);
256 state.groups.push_back(Some((group.clone(), now)));
257 state.evict_expired(now);
258
259 Ok(group)
260 }
261
262 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
264 let mut group = self.append_group()?;
265 group.write_frame(frame.into())?;
266 group.finish()?;
267 Ok(())
268 }
269
270 pub fn finish(&mut self) -> Result<()> {
276 let mut state = self.modify()?;
277 if state.final_sequence.is_some() {
278 return Err(Error::Closed);
279 }
280 state.final_sequence = Some(match state.max_sequence {
281 Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
282 None => 0,
283 });
284 Ok(())
285 }
286
287 #[deprecated(note = "use finish() or finish_at(sequence) instead")]
292 pub fn close(&mut self) -> Result<()> {
293 self.finish()
294 }
295
296 pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
303 let mut state = self.modify()?;
304 let max = state.max_sequence.ok_or(Error::Closed)?;
305 if state.final_sequence.is_some() || sequence != max {
306 return Err(Error::Closed);
307 }
308 state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
309 Ok(())
310 }
311
312 pub fn abort(&mut self, err: Error) -> Result<()> {
314 let mut guard = self.modify()?;
315
316 for (group, _) in guard.groups.iter_mut().flatten() {
318 group.abort(err.clone()).ok();
320 }
321
322 guard.abort = Some(err);
323 guard.close();
324 Ok(())
325 }
326
327 pub fn consume(&self) -> TrackConsumer {
329 TrackConsumer {
330 info: self.info.clone(),
331 state: self.state.consume(),
332 index: 0,
333 min_sequence: 0,
334 next_sequence: 0,
335 }
336 }
337
338 pub async fn unused(&self) -> Result<()> {
340 self.state
341 .unused()
342 .await
343 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
344 }
345
346 pub async fn used(&self) -> Result<()> {
348 self.state
349 .used()
350 .await
351 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
352 }
353
354 pub fn is_closed(&self) -> bool {
356 self.state.read().is_closed()
357 }
358
359 pub fn is_clone(&self, other: &Self) -> bool {
361 self.state.same_channel(&other.state)
362 }
363
364 pub(crate) fn weak(&self) -> TrackWeak {
366 TrackWeak {
367 info: self.info.clone(),
368 state: self.state.weak(),
369 }
370 }
371
372 fn modify(&self) -> Result<conducer::Mut<'_, State>> {
373 self.state
374 .write()
375 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
376 }
377}
378
379impl Clone for TrackProducer {
380 fn clone(&self) -> Self {
381 Self {
382 info: self.info.clone(),
383 state: self.state.clone(),
384 }
385 }
386}
387
388impl From<Track> for TrackProducer {
389 fn from(info: Track) -> Self {
390 TrackProducer::new(info)
391 }
392}
393
394#[derive(Clone)]
396pub(crate) struct TrackWeak {
397 pub info: Track,
398 state: conducer::Weak<State>,
399}
400
401impl TrackWeak {
402 pub fn abort(&self, err: Error) {
403 let Ok(mut guard) = self.state.write() else { return };
404
405 for (group, _) in guard.groups.iter_mut().flatten() {
407 group.abort(err.clone()).ok();
408 }
409
410 guard.abort = Some(err);
411 guard.close();
412 }
413
414 pub fn is_closed(&self) -> bool {
415 self.state.is_closed()
416 }
417
418 pub fn consume(&self) -> TrackConsumer {
419 TrackConsumer {
420 info: self.info.clone(),
421 state: self.state.consume(),
422 index: 0,
423 min_sequence: 0,
424 next_sequence: 0,
425 }
426 }
427
428 pub async fn unused(&self) -> crate::Result<()> {
429 self.state
430 .unused()
431 .await
432 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
433 }
434
435 pub fn is_clone(&self, other: &Self) -> bool {
436 self.state.same_channel(&other.state)
437 }
438}
439
440#[derive(Clone)]
442pub struct TrackConsumer {
443 pub info: Track,
444 state: conducer::Consumer<State>,
445 index: usize,
447 min_sequence: u64,
449 next_sequence: u64,
452}
453
454impl TrackConsumer {
455 fn poll<F, R>(&self, waiter: &conducer::Waiter, f: F) -> Poll<Result<R>>
457 where
458 F: Fn(&conducer::Ref<'_, State>) -> Poll<Result<R>>,
459 {
460 Poll::Ready(match ready!(self.state.poll(waiter, f)) {
461 Ok(res) => res,
462 Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
464 })
465 }
466
467 pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
478 let Some((consumer, found_index)) =
479 ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
480 else {
481 return Poll::Ready(Ok(None));
482 };
483
484 self.index = found_index + 1;
485 Poll::Ready(Ok(Some(consumer)))
486 }
487
488 pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
494 conducer::wait(|waiter| self.poll_recv_group(waiter)).await
495 }
496
497 #[deprecated(note = "use poll_recv_group for arrival order, or poll_next_group_ordered for sequence order")]
499 pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
500 self.poll_recv_group(waiter)
501 }
502
503 #[deprecated(note = "use recv_group for arrival order, or next_group_ordered for sequence order")]
505 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
506 self.recv_group().await
507 }
508
509 pub fn poll_next_group_ordered(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
513 loop {
514 let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
515 return Poll::Ready(Ok(None));
516 };
517 if group.info.sequence < self.next_sequence {
518 continue;
520 }
521 self.next_sequence = group.info.sequence.saturating_add(1);
522 return Poll::Ready(Ok(Some(group)));
523 }
524 }
525
526 pub async fn next_group_ordered(&mut self) -> Result<Option<GroupConsumer>> {
533 conducer::wait(|waiter| self.poll_next_group_ordered(waiter)).await
534 }
535
536 pub fn poll_read_frame(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
540 let lower = self.min_sequence.max(self.next_sequence);
541 let Some((frame, found_index, sequence)) =
542 ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
543 else {
544 return Poll::Ready(Ok(None));
545 };
546
547 self.index = found_index + 1;
548 self.next_sequence = sequence.saturating_add(1);
549 Poll::Ready(Ok(Some(frame)))
550 }
551
552 pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
556 conducer::wait(|waiter| self.poll_read_frame(waiter)).await
557 }
558
559 pub fn poll_get_group(&self, waiter: &conducer::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
561 self.poll(waiter, |state| state.poll_get_group(sequence))
562 }
563
564 pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
568 conducer::wait(|waiter| self.poll_get_group(waiter, sequence)).await
569 }
570
571 pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<Result<()>> {
573 self.poll(waiter, |state| state.poll_closed())
574 }
575
576 pub async fn closed(&self) -> Result<()> {
580 conducer::wait(|waiter| self.poll_closed(waiter)).await
581 }
582
583 pub fn is_clone(&self, other: &Self) -> bool {
584 self.state.same_channel(&other.state)
585 }
586
587 pub fn poll_finished(&mut self, waiter: &conducer::Waiter) -> Poll<Result<u64>> {
589 self.poll(waiter, |state| state.poll_finished())
590 }
591
592 pub async fn finished(&mut self) -> Result<u64> {
594 conducer::wait(|waiter| self.poll_finished(waiter)).await
595 }
596
597 pub fn start_at(&mut self, sequence: u64) {
599 self.min_sequence = sequence;
600 }
601
602 pub fn latest(&self) -> Option<u64> {
604 self.state.read().max_sequence
605 }
606
607 pub fn produce(&self) -> Result<TrackProducer> {
622 let state = self
623 .state
624 .produce()
625 .ok_or_else(|| self.state.read().abort.clone().unwrap_or(Error::Dropped))?;
626 Ok(TrackProducer {
627 info: self.info.clone(),
628 state,
629 })
630 }
631}
632
633#[cfg(test)]
634use futures::FutureExt;
635
636#[cfg(test)]
637impl TrackConsumer {
638 pub fn assert_group(&mut self) -> GroupConsumer {
639 self.recv_group()
640 .now_or_never()
641 .expect("group would have blocked")
642 .expect("would have errored")
643 .expect("track was closed")
644 }
645
646 pub fn assert_no_group(&mut self) {
647 assert!(
648 self.recv_group().now_or_never().is_none(),
649 "recv_group would not have blocked"
650 );
651 }
652
653 pub fn assert_not_closed(&self) {
654 assert!(self.closed().now_or_never().is_none(), "should not be closed");
655 }
656
657 pub fn assert_closed(&self) {
658 assert!(self.closed().now_or_never().is_some(), "should be closed");
659 }
660
661 pub fn assert_error(&self) {
663 assert!(
664 self.closed().now_or_never().expect("should not block").is_err(),
665 "should be error"
666 );
667 }
668
669 pub fn assert_is_clone(&self, other: &Self) {
670 assert!(self.is_clone(other), "should be clone");
671 }
672
673 pub fn assert_not_clone(&self, other: &Self) {
674 assert!(!self.is_clone(other), "should not be clone");
675 }
676}
677
678#[cfg(test)]
679mod test {
680 use super::*;
681
682 fn live_groups(state: &State) -> usize {
684 state.groups.iter().flatten().count()
685 }
686
687 fn first_live_sequence(state: &State) -> u64 {
689 state.groups.iter().flatten().next().unwrap().0.info.sequence
690 }
691
692 #[tokio::test]
693 async fn evict_expired_groups() {
694 tokio::time::pause();
695
696 let mut producer = Track::new("test").produce();
697
698 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
704 let state = producer.state.read();
705 assert_eq!(live_groups(&state), 3);
706 assert_eq!(state.offset, 0);
707 }
708
709 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
711
712 producer.append_group().unwrap(); {
718 let state = producer.state.read();
719 assert_eq!(live_groups(&state), 1);
720 assert_eq!(first_live_sequence(&state), 3);
721 assert_eq!(state.offset, 3);
722 assert!(!state.duplicates.contains(&0));
723 assert!(!state.duplicates.contains(&1));
724 assert!(!state.duplicates.contains(&2));
725 assert!(state.duplicates.contains(&3));
726 }
727 }
728
729 #[tokio::test]
730 async fn evict_keeps_max_sequence() {
731 tokio::time::pause();
732
733 let mut producer = Track::new("test").produce();
734 producer.append_group().unwrap(); tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
738
739 producer.append_group().unwrap(); {
743 let state = producer.state.read();
744 assert_eq!(live_groups(&state), 1);
745 assert_eq!(first_live_sequence(&state), 1);
746 assert_eq!(state.offset, 1);
747 }
748 }
749
750 #[tokio::test]
751 async fn no_eviction_when_fresh() {
752 tokio::time::pause();
753
754 let mut producer = Track::new("test").produce();
755 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
760 let state = producer.state.read();
761 assert_eq!(live_groups(&state), 3);
762 assert_eq!(state.offset, 0);
763 }
764 }
765
766 #[tokio::test]
767 async fn consumer_skips_evicted_groups() {
768 tokio::time::pause();
769
770 let mut producer = Track::new("test").produce();
771 producer.append_group().unwrap(); let mut consumer = producer.consume();
774
775 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
776 producer.append_group().unwrap(); let group = consumer.assert_group();
780 assert_eq!(group.info.sequence, 1);
781 }
782
783 #[tokio::test]
784 async fn out_of_order_max_sequence_at_front() {
785 tokio::time::pause();
786
787 let mut producer = Track::new("test").produce();
788
789 producer.create_group(Group { sequence: 5 }).unwrap();
791 producer.create_group(Group { sequence: 3 }).unwrap();
792 producer.create_group(Group { sequence: 4 }).unwrap();
793
794 {
796 let state = producer.state.read();
797 assert_eq!(state.max_sequence, Some(5));
798 }
799
800 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
802
803 producer.append_group().unwrap(); {
809 let state = producer.state.read();
810 assert_eq!(live_groups(&state), 1);
811 assert_eq!(first_live_sequence(&state), 6);
812 assert!(!state.duplicates.contains(&3));
813 assert!(!state.duplicates.contains(&4));
814 assert!(!state.duplicates.contains(&5));
815 assert!(state.duplicates.contains(&6));
816 }
817 }
818
819 #[tokio::test]
820 async fn max_sequence_at_front_blocks_trim() {
821 tokio::time::pause();
822
823 let mut producer = Track::new("test").produce();
824
825 producer.create_group(Group { sequence: 5 }).unwrap();
827
828 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
829
830 producer.create_group(Group { sequence: 3 }).unwrap();
832
833 {
836 let state = producer.state.read();
837 assert_eq!(live_groups(&state), 2);
838 assert_eq!(state.offset, 0);
839 }
840
841 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
843
844 producer.create_group(Group { sequence: 2 }).unwrap();
846
847 {
852 let state = producer.state.read();
853 assert_eq!(live_groups(&state), 2);
854 assert_eq!(state.offset, 0);
855 assert!(state.duplicates.contains(&5));
856 assert!(!state.duplicates.contains(&3));
857 assert!(state.duplicates.contains(&2));
858 }
859
860 let mut consumer = producer.consume();
862 let group = consumer.assert_group();
863 assert_eq!(group.info.sequence, 5);
865 }
866
867 #[test]
868 fn append_finish_cannot_be_rewritten() {
869 let mut producer = Track::new("test").produce();
870
871 assert!(producer.finish().is_ok());
873 assert!(producer.finish().is_err());
874 assert!(producer.append_group().is_err());
875 }
876
877 #[test]
878 fn finish_after_groups() {
879 let mut producer = Track::new("test").produce();
880
881 producer.append_group().unwrap();
882 assert!(producer.finish().is_ok());
883 assert!(producer.finish().is_err());
884 assert!(producer.append_group().is_err());
885 }
886
887 #[test]
888 fn insert_finish_validates_sequence_and_freezes_to_max() {
889 let mut producer = Track::new("test").produce();
890 producer.create_group(Group { sequence: 5 }).unwrap();
891
892 assert!(producer.finish_at(4).is_err());
893 assert!(producer.finish_at(10).is_err());
894 assert!(producer.finish_at(5).is_ok());
895
896 {
897 let state = producer.state.read();
898 assert_eq!(state.final_sequence, Some(6));
899 }
900
901 assert!(producer.finish_at(5).is_err());
902 assert!(producer.create_group(Group { sequence: 4 }).is_ok());
903 assert!(producer.create_group(Group { sequence: 5 }).is_err());
904 }
905
906 #[tokio::test]
907 async fn recv_group_finishes_without_waiting_for_gaps() {
908 let mut producer = Track::new("test").produce();
909 producer.create_group(Group { sequence: 1 }).unwrap();
910 producer.finish_at(1).unwrap();
911
912 let mut consumer = producer.consume();
913 assert_eq!(consumer.assert_group().info.sequence, 1);
914
915 let done = consumer
916 .recv_group()
917 .now_or_never()
918 .expect("should not block")
919 .expect("would have errored");
920 assert!(done.is_none(), "track should finish without waiting for gaps");
921 }
922
923 #[tokio::test]
924 async fn next_group_ordered_skips_late_arrivals() {
925 let mut producer = Track::new("test").produce();
926 let mut consumer = producer.consume();
927
928 producer.create_group(Group { sequence: 5 }).unwrap();
930 let group = consumer
931 .next_group_ordered()
932 .now_or_never()
933 .expect("should not block")
934 .expect("would have errored")
935 .expect("track should not be closed");
936 assert_eq!(group.info.sequence, 5);
937
938 producer.create_group(Group { sequence: 3 }).unwrap();
940 producer.create_group(Group { sequence: 4 }).unwrap();
942 producer.create_group(Group { sequence: 7 }).unwrap();
944
945 let group = consumer
946 .next_group_ordered()
947 .now_or_never()
948 .expect("should not block")
949 .expect("would have errored")
950 .expect("track should not be closed");
951 assert_eq!(group.info.sequence, 7);
952
953 assert!(
955 consumer.next_group_ordered().now_or_never().is_none(),
956 "should block waiting for a higher sequence"
957 );
958 }
959
960 #[tokio::test]
961 async fn next_group_ordered_returns_arrivals_in_order() {
962 let mut producer = Track::new("test").produce();
963 let mut consumer = producer.consume();
964
965 producer.create_group(Group { sequence: 3 }).unwrap();
967 producer.create_group(Group { sequence: 5 }).unwrap();
968
969 let group = consumer
970 .next_group_ordered()
971 .now_or_never()
972 .expect("should not block")
973 .expect("would have errored")
974 .expect("track should not be closed");
975 assert_eq!(group.info.sequence, 3);
976
977 let group = consumer
978 .next_group_ordered()
979 .now_or_never()
980 .expect("should not block")
981 .expect("would have errored")
982 .expect("track should not be closed");
983 assert_eq!(group.info.sequence, 5);
984 }
985
986 #[tokio::test]
987 async fn recv_group_after_next_group_ordered_sees_late_arrivals() {
988 let mut producer = Track::new("test").produce();
989 let mut consumer = producer.consume();
990
991 producer.create_group(Group { sequence: 5 }).unwrap();
992 producer.create_group(Group { sequence: 3 }).unwrap();
993
994 let group = consumer
996 .next_group_ordered()
997 .now_or_never()
998 .expect("should not block")
999 .expect("would have errored")
1000 .expect("track should not be closed");
1001 assert_eq!(group.info.sequence, 5);
1002
1003 assert_eq!(consumer.assert_group().info.sequence, 3);
1006 }
1007
1008 #[tokio::test]
1009 async fn read_frame_returns_single_frame_per_group() {
1010 let mut producer = Track::new("test").produce();
1011 let mut consumer = producer.consume();
1012
1013 producer.write_frame(b"hello".as_slice()).unwrap();
1014 producer.write_frame(b"world".as_slice()).unwrap();
1015
1016 let frame = consumer
1017 .read_frame()
1018 .now_or_never()
1019 .expect("should not block")
1020 .expect("would have errored")
1021 .expect("track should not be closed");
1022 assert_eq!(&frame[..], b"hello");
1023
1024 let frame = consumer
1025 .read_frame()
1026 .now_or_never()
1027 .expect("should not block")
1028 .expect("would have errored")
1029 .expect("track should not be closed");
1030 assert_eq!(&frame[..], b"world");
1031 }
1032
1033 #[tokio::test]
1034 async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1035 let mut producer = Track::new("test").produce();
1036 let mut consumer = producer.consume();
1037
1038 let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1040 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1042 g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1043 g5.finish().unwrap();
1044
1045 let frame = consumer
1047 .read_frame()
1048 .now_or_never()
1049 .expect("should not block on stalled earlier group")
1050 .expect("would have errored")
1051 .expect("track should not be closed");
1052 assert_eq!(&frame[..], b"later");
1053 }
1054
1055 #[tokio::test]
1056 async fn read_frame_discards_rest_of_multi_frame_group() {
1057 let mut producer = Track::new("test").produce();
1058 let mut consumer = producer.consume();
1059
1060 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1062 g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1063 g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1064 g0.finish().unwrap();
1065
1066 producer.write_frame(b"next".as_slice()).unwrap();
1068
1069 let frame = consumer
1070 .read_frame()
1071 .now_or_never()
1072 .expect("should not block")
1073 .expect("would have errored")
1074 .expect("track should not be closed");
1075 assert_eq!(&frame[..], b"one");
1076
1077 let frame = consumer
1079 .read_frame()
1080 .now_or_never()
1081 .expect("should not block")
1082 .expect("would have errored")
1083 .expect("track should not be closed");
1084 assert_eq!(&frame[..], b"next");
1085 }
1086
1087 #[tokio::test]
1088 async fn read_frame_waits_for_pending_group_after_finish() {
1089 let mut producer = Track::new("test").produce();
1092 let mut consumer = producer.consume();
1093
1094 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1095 producer.finish().unwrap();
1096
1097 assert!(
1099 consumer.read_frame().now_or_never().is_none(),
1100 "read_frame must block on a pending group even after finish()"
1101 );
1102
1103 g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1105 let frame = consumer
1106 .read_frame()
1107 .now_or_never()
1108 .expect("should not block once a frame is written")
1109 .expect("would have errored")
1110 .expect("track should not be closed");
1111 assert_eq!(&frame[..], b"late");
1112 }
1113
1114 #[tokio::test]
1115 async fn read_frame_respects_start_at() {
1116 let mut producer = Track::new("test").produce();
1119 let mut consumer = producer.consume();
1120 consumer.start_at(5);
1121
1122 let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1124 g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1125 g3.finish().unwrap();
1126
1127 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1128 g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1129 g5.finish().unwrap();
1130
1131 let frame = consumer
1132 .read_frame()
1133 .now_or_never()
1134 .expect("should not block")
1135 .expect("would have errored")
1136 .expect("track should not be closed");
1137 assert_eq!(&frame[..], b"keep");
1138 }
1139
1140 #[tokio::test]
1141 async fn read_frame_returns_none_when_finished() {
1142 let mut producer = Track::new("test").produce();
1143 let mut consumer = producer.consume();
1144
1145 producer.write_frame(b"only".as_slice()).unwrap();
1146 producer.finish().unwrap();
1147
1148 let frame = consumer
1149 .read_frame()
1150 .now_or_never()
1151 .expect("should not block")
1152 .expect("would have errored")
1153 .expect("track should not be closed");
1154 assert_eq!(&frame[..], b"only");
1155
1156 let done = consumer
1157 .read_frame()
1158 .now_or_never()
1159 .expect("should not block")
1160 .expect("would have errored");
1161 assert!(done.is_none());
1162 }
1163
1164 #[tokio::test]
1165 async fn get_group_finishes_without_waiting_for_gaps() {
1166 let mut producer = Track::new("test").produce();
1167 producer.create_group(Group { sequence: 1 }).unwrap();
1168 producer.finish_at(1).unwrap();
1169
1170 let consumer = producer.consume();
1171 assert!(
1173 consumer.get_group(0).now_or_never().is_none(),
1174 "sequence below fin should block (group could still arrive)"
1175 );
1176 assert!(
1177 consumer
1178 .get_group(2)
1179 .now_or_never()
1180 .expect("sequence at-or-after fin should resolve")
1181 .expect("should not error")
1182 .is_none(),
1183 "sequence at-or-after fin should not exist"
1184 );
1185 }
1186
1187 #[test]
1188 fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1189 let mut producer = Track::new("test").produce();
1190 {
1191 let mut state = producer.state.write().ok().unwrap();
1192 state.max_sequence = Some(u64::MAX);
1193 }
1194
1195 assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1196 }
1197
1198 #[tokio::test]
1199 async fn consumer_produce() {
1200 let mut producer = Track::new("test").produce();
1201 producer.append_group().unwrap();
1202
1203 let consumer = producer.consume();
1204
1205 let got = consumer.produce().expect("should produce");
1207 assert!(got.is_clone(&producer), "should be the same track");
1208
1209 got.clone().append_group().unwrap();
1211 let mut sub = producer.consume();
1212 sub.assert_group(); sub.assert_group(); }
1215
1216 #[tokio::test]
1217 async fn consumer_produce_after_drop() {
1218 let producer = Track::new("test").produce();
1219 let consumer = producer.consume();
1220 drop(producer);
1221
1222 let err = consumer.produce();
1225 assert!(matches!(err, Err(Error::Dropped)), "expected Dropped");
1226 }
1227
1228 #[tokio::test]
1229 async fn consumer_produce_after_abort() {
1230 let mut producer = Track::new("test").produce();
1231 let consumer = producer.consume();
1232 producer.abort(Error::Cancel).unwrap();
1233 drop(producer);
1234
1235 let err = consumer.produce();
1237 assert!(matches!(err, Err(Error::Cancel)), "expected Cancel");
1238 }
1239
1240 #[tokio::test]
1241 async fn consumer_produce_keeps_alive() {
1242 let producer = Track::new("test").produce();
1243 let consumer = producer.consume();
1244 let upgraded = consumer.produce().expect("should produce");
1245 drop(producer);
1246
1247 assert!(consumer.closed().now_or_never().is_none(), "should not be closed");
1249 drop(upgraded);
1250
1251 assert!(consumer.closed().now_or_never().is_some(), "should be closed");
1253 }
1254}