1use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20 collections::{HashSet, VecDeque},
21 task::{Poll, ready},
22 time::Duration,
23};
24
25const MAX_GROUP_AGE: Duration = Duration::from_secs(30);
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33 pub name: String,
35 pub priority: u8,
37}
38
39impl Track {
40 pub fn new<T: Into<String>>(name: T) -> Self {
42 Self {
43 name: name.into(),
44 priority: 0,
45 }
46 }
47
48 pub fn produce(self) -> TrackProducer {
50 TrackProducer::new(self)
51 }
52}
53
54#[derive(Default)]
55struct State {
56 groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
58 duplicates: HashSet<u64>,
59 offset: usize,
60 max_sequence: Option<u64>,
61 final_sequence: Option<u64>,
62 abort: Option<Error>,
63}
64
65impl State {
66 fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
70 let start = index.saturating_sub(self.offset);
71 for (i, slot) in self.groups.iter().enumerate().skip(start) {
72 if let Some((group, _)) = slot
73 && group.sequence >= min_sequence
74 {
75 return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
76 }
77 }
78
79 if self.final_sequence.is_some() {
81 Poll::Ready(Ok(None))
82 } else if let Some(err) = &self.abort {
83 Poll::Ready(Err(err.clone()))
84 } else {
85 Poll::Pending
86 }
87 }
88
89 fn poll_read_frame(
93 &self,
94 index: usize,
95 next_sequence: u64,
96 waiter: &conducer::Waiter,
97 ) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
98 let start = index.saturating_sub(self.offset);
99 let mut pending_seen = false;
100 for (i, slot) in self.groups.iter().enumerate().skip(start) {
101 let Some((group, _)) = slot else { continue };
102 if group.sequence < next_sequence {
103 continue;
104 }
105
106 let mut consumer = group.consume();
107 match consumer.poll_read_frame(waiter) {
108 Poll::Ready(Ok(Some(frame))) => {
109 return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence))));
110 }
111 Poll::Ready(Ok(None)) => continue,
112 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
113 Poll::Pending => {
114 pending_seen = true;
115 continue;
116 }
117 }
118 }
119
120 if pending_seen {
123 Poll::Pending
124 } else if self.final_sequence.is_some() {
125 Poll::Ready(Ok(None))
126 } else if let Some(err) = &self.abort {
127 Poll::Ready(Err(err.clone()))
128 } else {
129 Poll::Pending
130 }
131 }
132
133 fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
134 for (group, _) in self.groups.iter().flatten() {
136 if group.sequence == sequence {
137 return Poll::Ready(Ok(Some(group.consume())));
138 }
139 }
140
141 if let Some(fin) = self.final_sequence
143 && sequence >= fin
144 {
145 return Poll::Ready(Ok(None));
146 }
147
148 if let Some(err) = &self.abort {
149 return Poll::Ready(Err(err.clone()));
150 }
151
152 Poll::Pending
153 }
154
155 fn poll_closed(&self) -> Poll<Result<()>> {
156 if self.final_sequence.is_some() {
157 Poll::Ready(Ok(()))
158 } else if let Some(err) = &self.abort {
159 Poll::Ready(Err(err.clone()))
160 } else {
161 Poll::Pending
162 }
163 }
164
165 fn evict_expired(&mut self, now: tokio::time::Instant) {
172 for slot in self.groups.iter_mut() {
173 let Some((group, created_at)) = slot else { continue };
174
175 if Some(group.sequence) == self.max_sequence {
176 continue;
177 }
178
179 if now.duration_since(*created_at) <= MAX_GROUP_AGE {
180 break;
181 }
182
183 self.duplicates.remove(&group.sequence);
184 *slot = None;
185 }
186
187 while let Some(None) = self.groups.front() {
189 self.groups.pop_front();
190 self.offset += 1;
191 }
192 }
193
194 fn poll_finished(&self) -> Poll<Result<u64>> {
195 if let Some(fin) = self.final_sequence {
196 Poll::Ready(Ok(fin))
197 } else if let Some(err) = &self.abort {
198 Poll::Ready(Err(err.clone()))
199 } else {
200 Poll::Pending
201 }
202 }
203}
204
205pub struct TrackProducer {
207 info: Track,
208 state: conducer::Producer<State>,
209}
210
211impl std::ops::Deref for TrackProducer {
212 type Target = Track;
213
214 fn deref(&self) -> &Self::Target {
215 &self.info
216 }
217}
218
219impl TrackProducer {
220 pub fn new(info: Track) -> Self {
222 Self {
223 info,
224 state: conducer::Producer::default(),
225 }
226 }
227
228 pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
230 let group = info.produce();
231
232 let mut state = self.modify()?;
233 if let Some(fin) = state.final_sequence
234 && group.sequence >= fin
235 {
236 return Err(Error::Closed);
237 }
238
239 if !state.duplicates.insert(group.sequence) {
240 return Err(Error::Duplicate);
241 }
242
243 let now = tokio::time::Instant::now();
244 state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence));
245 state.groups.push_back(Some((group.clone(), now)));
246 state.evict_expired(now);
247
248 Ok(group)
249 }
250
251 pub fn append_group(&mut self) -> Result<GroupProducer> {
253 let mut state = self.modify()?;
254 let sequence = match state.max_sequence {
255 Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
256 None => 0,
257 };
258 if let Some(fin) = state.final_sequence
259 && sequence >= fin
260 {
261 return Err(Error::Closed);
262 }
263
264 let group = Group { sequence }.produce();
265
266 let now = tokio::time::Instant::now();
267 state.duplicates.insert(sequence);
268 state.max_sequence = Some(sequence);
269 state.groups.push_back(Some((group.clone(), now)));
270 state.evict_expired(now);
271
272 Ok(group)
273 }
274
275 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
277 let mut group = self.append_group()?;
278 group.write_frame(frame.into())?;
279 group.finish()?;
280 Ok(())
281 }
282
283 pub fn finish(&mut self) -> Result<()> {
289 let mut state = self.modify()?;
290 if state.final_sequence.is_some() {
291 return Err(Error::Closed);
292 }
293 state.final_sequence = Some(match state.max_sequence {
294 Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
295 None => 0,
296 });
297 Ok(())
298 }
299
300 pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
307 let mut state = self.modify()?;
308 let max = state.max_sequence.ok_or(Error::Closed)?;
309 if state.final_sequence.is_some() || sequence != max {
310 return Err(Error::Closed);
311 }
312 state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
313 Ok(())
314 }
315
316 pub fn abort(&mut self, err: Error) -> Result<()> {
322 let mut guard = self.modify()?;
323 guard.abort = Some(err);
324 guard.close();
325 Ok(())
326 }
327
328 pub fn consume(&self) -> TrackConsumer {
330 TrackConsumer {
331 info: self.info.clone(),
332 state: self.state.consume(),
333 index: 0,
334 min_sequence: 0,
335 next_sequence: 0,
336 }
337 }
338
339 pub async fn unused(&self) -> Result<()> {
341 self.state
342 .unused()
343 .await
344 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
345 }
346
347 pub async fn used(&self) -> Result<()> {
349 self.state
350 .used()
351 .await
352 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
353 }
354
355 pub async fn closed(&self) -> Error {
357 self.state.closed().await;
358 self.state.read().abort.clone().unwrap_or(Error::Dropped)
359 }
360
361 pub fn is_closed(&self) -> bool {
363 self.state.read().is_closed()
364 }
365
366 pub fn is_clone(&self, other: &Self) -> bool {
368 self.state.same_channel(&other.state)
369 }
370
371 pub(crate) fn weak(&self) -> TrackWeak {
373 TrackWeak {
374 info: self.info.clone(),
375 state: self.state.weak(),
376 }
377 }
378
379 fn modify(&self) -> Result<conducer::Mut<'_, State>> {
380 self.state
381 .write()
382 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
383 }
384}
385
386impl Clone for TrackProducer {
387 fn clone(&self) -> Self {
388 Self {
389 info: self.info.clone(),
390 state: self.state.clone(),
391 }
392 }
393}
394
395impl From<Track> for TrackProducer {
396 fn from(info: Track) -> Self {
397 TrackProducer::new(info)
398 }
399}
400
401#[derive(Clone)]
403pub(crate) struct TrackWeak {
404 pub(crate) info: Track,
405 state: conducer::Weak<State>,
406}
407
408impl TrackWeak {
409 pub fn is_closed(&self) -> bool {
410 self.state.is_closed()
411 }
412
413 pub fn consume(&self) -> TrackConsumer {
414 TrackConsumer {
415 info: self.info.clone(),
416 state: self.state.consume(),
417 index: 0,
418 min_sequence: 0,
419 next_sequence: 0,
420 }
421 }
422
423 pub async fn unused(&self) -> crate::Result<()> {
424 self.state
425 .unused()
426 .await
427 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
428 }
429
430 pub fn is_clone(&self, other: &Self) -> bool {
431 self.state.same_channel(&other.state)
432 }
433}
434
435#[derive(Clone)]
437pub struct TrackConsumer {
438 info: Track,
439 state: conducer::Consumer<State>,
440 index: usize,
442 min_sequence: u64,
444 next_sequence: u64,
447}
448
449impl std::ops::Deref for TrackConsumer {
450 type Target = Track;
451
452 fn deref(&self) -> &Self::Target {
453 &self.info
454 }
455}
456
457impl TrackConsumer {
458 fn poll<F, R>(&self, waiter: &conducer::Waiter, f: F) -> Poll<Result<R>>
460 where
461 F: Fn(&conducer::Ref<'_, State>) -> Poll<Result<R>>,
462 {
463 Poll::Ready(match ready!(self.state.poll(waiter, f)) {
464 Ok(res) => res,
465 Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
467 })
468 }
469
470 pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
481 let Some((consumer, found_index)) =
482 ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
483 else {
484 return Poll::Ready(Ok(None));
485 };
486
487 self.index = found_index + 1;
488 Poll::Ready(Ok(Some(consumer)))
489 }
490
491 pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
497 conducer::wait(|waiter| self.poll_recv_group(waiter)).await
498 }
499
500 pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
506 loop {
507 let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
508 return Poll::Ready(Ok(None));
509 };
510 if group.sequence < self.next_sequence {
511 continue;
513 }
514 self.next_sequence = group.sequence.saturating_add(1);
515 return Poll::Ready(Ok(Some(group)));
516 }
517 }
518
519 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
525 conducer::wait(|waiter| self.poll_next_group(waiter)).await
526 }
527
528 pub fn poll_read_frame(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
532 let lower = self.min_sequence.max(self.next_sequence);
533 let Some((frame, found_index, sequence)) =
534 ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
535 else {
536 return Poll::Ready(Ok(None));
537 };
538
539 self.index = found_index + 1;
540 self.next_sequence = sequence.saturating_add(1);
541 Poll::Ready(Ok(Some(frame)))
542 }
543
544 pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
548 conducer::wait(|waiter| self.poll_read_frame(waiter)).await
549 }
550
551 pub fn poll_get_group(&self, waiter: &conducer::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
553 self.poll(waiter, |state| state.poll_get_group(sequence))
554 }
555
556 pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
564 conducer::wait(|waiter| self.poll_get_group(waiter, sequence)).await
565 }
566
567 pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<Result<()>> {
569 self.poll(waiter, |state| state.poll_closed())
570 }
571
572 pub async fn closed(&self) -> Result<()> {
576 conducer::wait(|waiter| self.poll_closed(waiter)).await
577 }
578
579 pub fn is_clone(&self, other: &Self) -> bool {
581 self.state.same_channel(&other.state)
582 }
583
584 pub fn poll_finished(&mut self, waiter: &conducer::Waiter) -> Poll<Result<u64>> {
586 self.poll(waiter, |state| state.poll_finished())
587 }
588
589 pub async fn finished(&mut self) -> Result<u64> {
591 conducer::wait(|waiter| self.poll_finished(waiter)).await
592 }
593
594 pub fn start_at(&mut self, sequence: u64) {
596 self.min_sequence = sequence;
597 }
598
599 pub fn latest(&self) -> Option<u64> {
601 self.state.read().max_sequence
602 }
603
604 pub(crate) fn weak(&self) -> TrackWeak {
606 TrackWeak {
607 info: self.info.clone(),
608 state: self.state.weak(),
609 }
610 }
611}
612
613#[cfg(test)]
614use futures::FutureExt;
615
616#[cfg(test)]
617impl TrackConsumer {
618 pub fn assert_group(&mut self) -> GroupConsumer {
619 self.recv_group()
620 .now_or_never()
621 .expect("group would have blocked")
622 .expect("would have errored")
623 .expect("track was closed")
624 }
625
626 pub fn assert_no_group(&mut self) {
627 assert!(
628 self.recv_group().now_or_never().is_none(),
629 "recv_group would not have blocked"
630 );
631 }
632
633 pub fn assert_not_closed(&self) {
634 assert!(self.closed().now_or_never().is_none(), "should not be closed");
635 }
636
637 pub fn assert_closed(&self) {
638 assert!(self.closed().now_or_never().is_some(), "should be closed");
639 }
640
641 pub fn assert_error(&self) {
643 assert!(
644 self.closed().now_or_never().expect("should not block").is_err(),
645 "should be error"
646 );
647 }
648
649 pub fn assert_is_clone(&self, other: &Self) {
650 assert!(self.is_clone(other), "should be clone");
651 }
652
653 pub fn assert_not_clone(&self, other: &Self) {
654 assert!(!self.is_clone(other), "should not be clone");
655 }
656}
657
658#[cfg(test)]
659mod test {
660 use super::*;
661
662 fn live_groups(state: &State) -> usize {
664 state.groups.iter().flatten().count()
665 }
666
667 fn first_live_sequence(state: &State) -> u64 {
669 state.groups.iter().flatten().next().unwrap().0.sequence
670 }
671
672 #[tokio::test]
673 async fn evict_expired_groups() {
674 tokio::time::pause();
675
676 let mut producer = Track::new("test").produce();
677
678 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
684 let state = producer.state.read();
685 assert_eq!(live_groups(&state), 3);
686 assert_eq!(state.offset, 0);
687 }
688
689 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
691
692 producer.append_group().unwrap(); {
698 let state = producer.state.read();
699 assert_eq!(live_groups(&state), 1);
700 assert_eq!(first_live_sequence(&state), 3);
701 assert_eq!(state.offset, 3);
702 assert!(!state.duplicates.contains(&0));
703 assert!(!state.duplicates.contains(&1));
704 assert!(!state.duplicates.contains(&2));
705 assert!(state.duplicates.contains(&3));
706 }
707 }
708
709 #[tokio::test]
710 async fn evict_keeps_max_sequence() {
711 tokio::time::pause();
712
713 let mut producer = Track::new("test").produce();
714 producer.append_group().unwrap(); tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
718
719 producer.append_group().unwrap(); {
723 let state = producer.state.read();
724 assert_eq!(live_groups(&state), 1);
725 assert_eq!(first_live_sequence(&state), 1);
726 assert_eq!(state.offset, 1);
727 }
728 }
729
730 #[tokio::test]
731 async fn no_eviction_when_fresh() {
732 tokio::time::pause();
733
734 let mut producer = Track::new("test").produce();
735 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
740 let state = producer.state.read();
741 assert_eq!(live_groups(&state), 3);
742 assert_eq!(state.offset, 0);
743 }
744 }
745
746 #[tokio::test]
747 async fn consumer_skips_evicted_groups() {
748 tokio::time::pause();
749
750 let mut producer = Track::new("test").produce();
751 producer.append_group().unwrap(); let mut consumer = producer.consume();
754
755 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
756 producer.append_group().unwrap(); let group = consumer.assert_group();
760 assert_eq!(group.sequence, 1);
761 }
762
763 #[tokio::test]
764 async fn out_of_order_max_sequence_at_front() {
765 tokio::time::pause();
766
767 let mut producer = Track::new("test").produce();
768
769 producer.create_group(Group { sequence: 5 }).unwrap();
771 producer.create_group(Group { sequence: 3 }).unwrap();
772 producer.create_group(Group { sequence: 4 }).unwrap();
773
774 {
776 let state = producer.state.read();
777 assert_eq!(state.max_sequence, Some(5));
778 }
779
780 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
782
783 producer.append_group().unwrap(); {
789 let state = producer.state.read();
790 assert_eq!(live_groups(&state), 1);
791 assert_eq!(first_live_sequence(&state), 6);
792 assert!(!state.duplicates.contains(&3));
793 assert!(!state.duplicates.contains(&4));
794 assert!(!state.duplicates.contains(&5));
795 assert!(state.duplicates.contains(&6));
796 }
797 }
798
799 #[tokio::test]
800 async fn max_sequence_at_front_blocks_trim() {
801 tokio::time::pause();
802
803 let mut producer = Track::new("test").produce();
804
805 producer.create_group(Group { sequence: 5 }).unwrap();
807
808 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
809
810 producer.create_group(Group { sequence: 3 }).unwrap();
812
813 {
816 let state = producer.state.read();
817 assert_eq!(live_groups(&state), 2);
818 assert_eq!(state.offset, 0);
819 }
820
821 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
823
824 producer.create_group(Group { sequence: 2 }).unwrap();
826
827 {
832 let state = producer.state.read();
833 assert_eq!(live_groups(&state), 2);
834 assert_eq!(state.offset, 0);
835 assert!(state.duplicates.contains(&5));
836 assert!(!state.duplicates.contains(&3));
837 assert!(state.duplicates.contains(&2));
838 }
839
840 let mut consumer = producer.consume();
842 let group = consumer.assert_group();
843 assert_eq!(group.sequence, 5);
845 }
846
847 #[test]
848 fn append_finish_cannot_be_rewritten() {
849 let mut producer = Track::new("test").produce();
850
851 assert!(producer.finish().is_ok());
853 assert!(producer.finish().is_err());
854 assert!(producer.append_group().is_err());
855 }
856
857 #[test]
858 fn finish_after_groups() {
859 let mut producer = Track::new("test").produce();
860
861 producer.append_group().unwrap();
862 assert!(producer.finish().is_ok());
863 assert!(producer.finish().is_err());
864 assert!(producer.append_group().is_err());
865 }
866
867 #[test]
868 fn insert_finish_validates_sequence_and_freezes_to_max() {
869 let mut producer = Track::new("test").produce();
870 producer.create_group(Group { sequence: 5 }).unwrap();
871
872 assert!(producer.finish_at(4).is_err());
873 assert!(producer.finish_at(10).is_err());
874 assert!(producer.finish_at(5).is_ok());
875
876 {
877 let state = producer.state.read();
878 assert_eq!(state.final_sequence, Some(6));
879 }
880
881 assert!(producer.finish_at(5).is_err());
882 assert!(producer.create_group(Group { sequence: 4 }).is_ok());
883 assert!(producer.create_group(Group { sequence: 5 }).is_err());
884 }
885
886 #[tokio::test]
887 async fn recv_group_finishes_without_waiting_for_gaps() {
888 let mut producer = Track::new("test").produce();
889 producer.create_group(Group { sequence: 1 }).unwrap();
890 producer.finish_at(1).unwrap();
891
892 let mut consumer = producer.consume();
893 assert_eq!(consumer.assert_group().sequence, 1);
894
895 let done = consumer
896 .recv_group()
897 .now_or_never()
898 .expect("should not block")
899 .expect("would have errored");
900 assert!(done.is_none(), "track should finish without waiting for gaps");
901 }
902
903 #[tokio::test]
904 async fn next_group_skips_late_arrivals() {
905 let mut producer = Track::new("test").produce();
906 let mut consumer = producer.consume();
907
908 producer.create_group(Group { sequence: 5 }).unwrap();
910 let group = consumer
911 .next_group()
912 .now_or_never()
913 .expect("should not block")
914 .expect("would have errored")
915 .expect("track should not be closed");
916 assert_eq!(group.sequence, 5);
917
918 producer.create_group(Group { sequence: 3 }).unwrap();
920 producer.create_group(Group { sequence: 4 }).unwrap();
922 producer.create_group(Group { sequence: 7 }).unwrap();
924
925 let group = consumer
926 .next_group()
927 .now_or_never()
928 .expect("should not block")
929 .expect("would have errored")
930 .expect("track should not be closed");
931 assert_eq!(group.sequence, 7);
932
933 assert!(
935 consumer.next_group().now_or_never().is_none(),
936 "should block waiting for a higher sequence"
937 );
938 }
939
940 #[tokio::test]
941 async fn next_group_returns_arrivals_in_order() {
942 let mut producer = Track::new("test").produce();
943 let mut consumer = producer.consume();
944
945 producer.create_group(Group { sequence: 3 }).unwrap();
947 producer.create_group(Group { sequence: 5 }).unwrap();
948
949 let group = consumer
950 .next_group()
951 .now_or_never()
952 .expect("should not block")
953 .expect("would have errored")
954 .expect("track should not be closed");
955 assert_eq!(group.sequence, 3);
956
957 let group = consumer
958 .next_group()
959 .now_or_never()
960 .expect("should not block")
961 .expect("would have errored")
962 .expect("track should not be closed");
963 assert_eq!(group.sequence, 5);
964 }
965
966 #[tokio::test]
967 async fn recv_group_after_next_group_sees_late_arrivals() {
968 let mut producer = Track::new("test").produce();
969 let mut consumer = producer.consume();
970
971 producer.create_group(Group { sequence: 5 }).unwrap();
972 producer.create_group(Group { sequence: 3 }).unwrap();
973
974 let group = consumer
976 .next_group()
977 .now_or_never()
978 .expect("should not block")
979 .expect("would have errored")
980 .expect("track should not be closed");
981 assert_eq!(group.sequence, 5);
982
983 assert_eq!(consumer.assert_group().sequence, 3);
986 }
987
988 #[tokio::test]
989 async fn read_frame_returns_single_frame_per_group() {
990 let mut producer = Track::new("test").produce();
991 let mut consumer = producer.consume();
992
993 producer.write_frame(b"hello".as_slice()).unwrap();
994 producer.write_frame(b"world".as_slice()).unwrap();
995
996 let frame = consumer
997 .read_frame()
998 .now_or_never()
999 .expect("should not block")
1000 .expect("would have errored")
1001 .expect("track should not be closed");
1002 assert_eq!(&frame[..], b"hello");
1003
1004 let frame = consumer
1005 .read_frame()
1006 .now_or_never()
1007 .expect("should not block")
1008 .expect("would have errored")
1009 .expect("track should not be closed");
1010 assert_eq!(&frame[..], b"world");
1011 }
1012
1013 #[tokio::test]
1014 async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1015 let mut producer = Track::new("test").produce();
1016 let mut consumer = producer.consume();
1017
1018 let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1020 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1022 g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1023 g5.finish().unwrap();
1024
1025 let frame = consumer
1027 .read_frame()
1028 .now_or_never()
1029 .expect("should not block on stalled earlier group")
1030 .expect("would have errored")
1031 .expect("track should not be closed");
1032 assert_eq!(&frame[..], b"later");
1033 }
1034
1035 #[tokio::test]
1036 async fn read_frame_discards_rest_of_multi_frame_group() {
1037 let mut producer = Track::new("test").produce();
1038 let mut consumer = producer.consume();
1039
1040 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1042 g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1043 g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1044 g0.finish().unwrap();
1045
1046 producer.write_frame(b"next".as_slice()).unwrap();
1048
1049 let frame = consumer
1050 .read_frame()
1051 .now_or_never()
1052 .expect("should not block")
1053 .expect("would have errored")
1054 .expect("track should not be closed");
1055 assert_eq!(&frame[..], b"one");
1056
1057 let frame = consumer
1059 .read_frame()
1060 .now_or_never()
1061 .expect("should not block")
1062 .expect("would have errored")
1063 .expect("track should not be closed");
1064 assert_eq!(&frame[..], b"next");
1065 }
1066
1067 #[tokio::test]
1068 async fn read_frame_waits_for_pending_group_after_finish() {
1069 let mut producer = Track::new("test").produce();
1072 let mut consumer = producer.consume();
1073
1074 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1075 producer.finish().unwrap();
1076
1077 assert!(
1079 consumer.read_frame().now_or_never().is_none(),
1080 "read_frame must block on a pending group even after finish()"
1081 );
1082
1083 g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1085 let frame = consumer
1086 .read_frame()
1087 .now_or_never()
1088 .expect("should not block once a frame is written")
1089 .expect("would have errored")
1090 .expect("track should not be closed");
1091 assert_eq!(&frame[..], b"late");
1092 }
1093
1094 #[tokio::test]
1095 async fn read_frame_respects_start_at() {
1096 let mut producer = Track::new("test").produce();
1099 let mut consumer = producer.consume();
1100 consumer.start_at(5);
1101
1102 let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1104 g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1105 g3.finish().unwrap();
1106
1107 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1108 g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1109 g5.finish().unwrap();
1110
1111 let frame = consumer
1112 .read_frame()
1113 .now_or_never()
1114 .expect("should not block")
1115 .expect("would have errored")
1116 .expect("track should not be closed");
1117 assert_eq!(&frame[..], b"keep");
1118 }
1119
1120 #[tokio::test]
1121 async fn read_frame_returns_none_when_finished() {
1122 let mut producer = Track::new("test").produce();
1123 let mut consumer = producer.consume();
1124
1125 producer.write_frame(b"only".as_slice()).unwrap();
1126 producer.finish().unwrap();
1127
1128 let frame = consumer
1129 .read_frame()
1130 .now_or_never()
1131 .expect("should not block")
1132 .expect("would have errored")
1133 .expect("track should not be closed");
1134 assert_eq!(&frame[..], b"only");
1135
1136 let done = consumer
1137 .read_frame()
1138 .now_or_never()
1139 .expect("should not block")
1140 .expect("would have errored");
1141 assert!(done.is_none());
1142 }
1143
1144 #[tokio::test]
1145 async fn get_group_finishes_without_waiting_for_gaps() {
1146 let mut producer = Track::new("test").produce();
1147 producer.create_group(Group { sequence: 1 }).unwrap();
1148 producer.finish_at(1).unwrap();
1149
1150 let consumer = producer.consume();
1151 assert!(
1153 consumer.get_group(0).now_or_never().is_none(),
1154 "sequence below fin should block (group could still arrive)"
1155 );
1156 assert!(
1157 consumer
1158 .get_group(2)
1159 .now_or_never()
1160 .expect("sequence at-or-after fin should resolve")
1161 .expect("should not error")
1162 .is_none(),
1163 "sequence at-or-after fin should not exist"
1164 );
1165 }
1166
1167 #[test]
1168 fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1169 let mut producer = Track::new("test").produce();
1170 {
1171 let mut state = producer.state.write().ok().unwrap();
1172 state.max_sequence = Some(u64::MAX);
1173 }
1174
1175 assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1176 }
1177}