1use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20 collections::{HashSet, VecDeque},
21 task::{Poll, ready},
22 time::Duration,
23};
24
25const MAX_GROUP_AGE: Duration = Duration::from_secs(5);
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33 pub name: String,
35 pub priority: u8,
37}
38
39impl Track {
40 pub fn new<T: Into<String>>(name: T) -> Self {
42 Self {
43 name: name.into(),
44 priority: 0,
45 }
46 }
47
48 pub fn produce(self) -> TrackProducer {
50 TrackProducer::new(self)
51 }
52}
53
54#[derive(Default)]
55struct State {
56 groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
58 duplicates: HashSet<u64>,
59 offset: usize,
60 max_sequence: Option<u64>,
61 final_sequence: Option<u64>,
62 abort: Option<Error>,
63}
64
65impl State {
66 fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
70 let start = index.saturating_sub(self.offset);
71 for (i, slot) in self.groups.iter().enumerate().skip(start) {
72 if let Some((group, _)) = slot
73 && group.sequence >= min_sequence
74 {
75 return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
76 }
77 }
78
79 if self.final_sequence.is_some() {
81 Poll::Ready(Ok(None))
82 } else if let Some(err) = &self.abort {
83 Poll::Ready(Err(err.clone()))
84 } else {
85 Poll::Pending
86 }
87 }
88
89 fn poll_read_frame(
93 &self,
94 index: usize,
95 next_sequence: u64,
96 waiter: &kio::Waiter,
97 ) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
98 let start = index.saturating_sub(self.offset);
99 let mut pending_seen = false;
100 for (i, slot) in self.groups.iter().enumerate().skip(start) {
101 let Some((group, _)) = slot else { continue };
102 if group.sequence < next_sequence {
103 continue;
104 }
105
106 let mut consumer = group.consume();
107 match consumer.poll_read_frame(waiter) {
108 Poll::Ready(Ok(Some(frame))) => {
109 return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence))));
110 }
111 Poll::Ready(Ok(None)) => continue,
112 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
113 Poll::Pending => {
114 pending_seen = true;
115 continue;
116 }
117 }
118 }
119
120 if pending_seen {
123 Poll::Pending
124 } else if self.final_sequence.is_some() {
125 Poll::Ready(Ok(None))
126 } else if let Some(err) = &self.abort {
127 Poll::Ready(Err(err.clone()))
128 } else {
129 Poll::Pending
130 }
131 }
132
133 fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
134 for (group, _) in self.groups.iter().flatten() {
136 if group.sequence == sequence {
137 return Poll::Ready(Ok(Some(group.consume())));
138 }
139 }
140
141 if let Some(fin) = self.final_sequence
143 && sequence >= fin
144 {
145 return Poll::Ready(Ok(None));
146 }
147
148 if let Some(err) = &self.abort {
149 return Poll::Ready(Err(err.clone()));
150 }
151
152 Poll::Pending
153 }
154
155 fn poll_closed(&self) -> Poll<Result<()>> {
156 if self.final_sequence.is_some() {
157 Poll::Ready(Ok(()))
158 } else if let Some(err) = &self.abort {
159 Poll::Ready(Err(err.clone()))
160 } else {
161 Poll::Pending
162 }
163 }
164
165 fn evict_expired(&mut self, now: tokio::time::Instant) {
172 for slot in self.groups.iter_mut() {
173 let Some((group, created_at)) = slot else { continue };
174
175 if Some(group.sequence) == self.max_sequence {
176 continue;
177 }
178
179 if now.duration_since(*created_at) <= MAX_GROUP_AGE {
180 break;
181 }
182
183 self.duplicates.remove(&group.sequence);
184 *slot = None;
185 }
186
187 while let Some(None) = self.groups.front() {
189 self.groups.pop_front();
190 self.offset += 1;
191 }
192 }
193
194 fn poll_finished(&self) -> Poll<Result<u64>> {
195 if let Some(fin) = self.final_sequence {
196 Poll::Ready(Ok(fin))
197 } else if let Some(err) = &self.abort {
198 Poll::Ready(Err(err.clone()))
199 } else {
200 Poll::Pending
201 }
202 }
203}
204
205pub struct TrackProducer {
207 info: Track,
208 state: kio::Producer<State>,
209}
210
211impl std::ops::Deref for TrackProducer {
212 type Target = Track;
213
214 fn deref(&self) -> &Self::Target {
215 &self.info
216 }
217}
218
219impl TrackProducer {
220 pub fn new(info: Track) -> Self {
222 Self {
223 info,
224 state: kio::Producer::default(),
225 }
226 }
227
228 pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
230 let group = info.produce();
231
232 let mut state = self.modify()?;
233 if let Some(fin) = state.final_sequence
234 && group.sequence >= fin
235 {
236 return Err(Error::Closed);
237 }
238
239 if !state.duplicates.insert(group.sequence) {
240 return Err(Error::Duplicate);
241 }
242
243 let now = tokio::time::Instant::now();
244 state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence));
245 state.groups.push_back(Some((group.clone(), now)));
246 state.evict_expired(now);
247
248 Ok(group)
249 }
250
251 pub fn append_group(&mut self) -> Result<GroupProducer> {
253 let mut state = self.modify()?;
254 let sequence = match state.max_sequence {
255 Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
256 None => 0,
257 };
258 if let Some(fin) = state.final_sequence
259 && sequence >= fin
260 {
261 return Err(Error::Closed);
262 }
263
264 let group = Group { sequence }.produce();
265
266 let now = tokio::time::Instant::now();
267 state.duplicates.insert(sequence);
268 state.max_sequence = Some(sequence);
269 state.groups.push_back(Some((group.clone(), now)));
270 state.evict_expired(now);
271
272 Ok(group)
273 }
274
275 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
277 let mut group = self.append_group()?;
278 group.write_frame(frame.into())?;
279 group.finish()?;
280 Ok(())
281 }
282
283 pub fn finish(&mut self) -> Result<()> {
289 let mut state = self.modify()?;
290 if state.final_sequence.is_some() {
291 return Err(Error::Closed);
292 }
293 state.final_sequence = Some(match state.max_sequence {
294 Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
295 None => 0,
296 });
297 Ok(())
298 }
299
300 pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
307 let mut state = self.modify()?;
308 let max = state.max_sequence.ok_or(Error::Closed)?;
309 if state.final_sequence.is_some() || sequence != max {
310 return Err(Error::Closed);
311 }
312 state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
313 Ok(())
314 }
315
316 pub fn abort(&mut self, err: Error) -> Result<()> {
324 let mut guard = self.modify()?;
325 guard.abort = Some(err);
326 guard.groups.clear();
327 guard.duplicates.clear();
328 guard.close();
329 Ok(())
330 }
331
332 pub fn consume(&self) -> TrackConsumer {
334 TrackConsumer {
335 info: self.info.clone(),
336 state: self.state.consume(),
337 index: 0,
338 min_sequence: 0,
339 next_sequence: 0,
340 }
341 }
342
343 pub fn name(&self) -> &str {
345 &self.info.name
346 }
347
348 pub fn demand(&self) -> TrackDemand {
350 TrackDemand {
351 name: self.info.name.clone(),
352 state: self.state.weak(),
353 }
354 }
355
356 pub async fn unused(&self) -> Result<()> {
358 self.state
359 .unused()
360 .await
361 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
362 }
363
364 pub async fn used(&self) -> Result<()> {
366 self.state
367 .used()
368 .await
369 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
370 }
371
372 pub async fn closed(&self) -> Error {
374 self.state.closed().await;
375 self.state.read().abort.clone().unwrap_or(Error::Dropped)
376 }
377
378 pub fn is_closed(&self) -> bool {
380 self.state.read().is_closed()
381 }
382
383 pub fn is_clone(&self, other: &Self) -> bool {
385 self.state.same_channel(&other.state)
386 }
387
388 pub(crate) fn weak(&self) -> TrackWeak {
390 TrackWeak {
391 info: self.info.clone(),
392 state: self.state.weak(),
393 }
394 }
395
396 fn modify(&self) -> Result<kio::Mut<'_, State>> {
397 self.state
398 .write()
399 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
400 }
401}
402
403impl Clone for TrackProducer {
404 fn clone(&self) -> Self {
405 Self {
406 info: self.info.clone(),
407 state: self.state.clone(),
408 }
409 }
410}
411
412impl Drop for TrackProducer {
413 fn drop(&mut self) {
414 if !self.state.is_last() {
419 return;
420 }
421 if let Ok(mut state) = self.state.write()
422 && state.final_sequence.is_none()
423 {
424 state.groups.clear();
425 state.duplicates.clear();
426 }
427 }
428}
429
430impl From<Track> for TrackProducer {
431 fn from(info: Track) -> Self {
432 TrackProducer::new(info)
433 }
434}
435
436#[derive(Clone)]
438pub(crate) struct TrackWeak {
439 pub(crate) info: Track,
440 state: kio::Weak<State>,
441}
442
443#[derive(Clone)]
445pub struct TrackDemand {
446 name: String,
447 state: kio::Weak<State>,
448}
449
450impl TrackDemand {
451 pub fn name(&self) -> &str {
453 &self.name
454 }
455
456 pub async fn used(&self) -> Result<()> {
458 self.state
459 .used()
460 .await
461 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
462 }
463
464 pub async fn unused(&self) -> Result<()> {
466 self.state
467 .unused()
468 .await
469 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
470 }
471
472 pub async fn closed(&self) -> Error {
474 if let Some(state) = self.state.produce() {
475 state.closed().await;
476 }
477
478 self.state.read().abort.clone().unwrap_or(Error::Dropped)
479 }
480}
481
482impl TrackWeak {
483 pub fn is_closed(&self) -> bool {
484 self.state.is_closed()
485 }
486
487 pub fn consume(&self) -> TrackConsumer {
488 TrackConsumer {
489 info: self.info.clone(),
490 state: self.state.consume(),
491 index: 0,
492 min_sequence: 0,
493 next_sequence: 0,
494 }
495 }
496
497 pub async fn unused(&self) -> crate::Result<()> {
498 self.state
499 .unused()
500 .await
501 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
502 }
503
504 pub fn is_clone(&self, other: &Self) -> bool {
505 self.state.same_channel(&other.state)
506 }
507}
508
509#[derive(Clone)]
511pub struct TrackConsumer {
512 info: Track,
513 state: kio::Consumer<State>,
514 index: usize,
516 min_sequence: u64,
518 next_sequence: u64,
521}
522
523impl std::ops::Deref for TrackConsumer {
524 type Target = Track;
525
526 fn deref(&self) -> &Self::Target {
527 &self.info
528 }
529}
530
531impl TrackConsumer {
532 pub fn name(&self) -> &str {
534 &self.info.name
535 }
536
537 fn poll<F, R>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<R>>
539 where
540 F: Fn(&kio::Ref<'_, State>) -> Poll<Result<R>>,
541 {
542 Poll::Ready(match ready!(self.state.poll(waiter, f)) {
543 Ok(res) => res,
544 Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
546 })
547 }
548
549 pub fn poll_recv_group(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
560 let Some((consumer, found_index)) =
561 ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
562 else {
563 return Poll::Ready(Ok(None));
564 };
565
566 self.index = found_index + 1;
567 Poll::Ready(Ok(Some(consumer)))
568 }
569
570 pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
576 kio::wait(|waiter| self.poll_recv_group(waiter)).await
577 }
578
579 pub fn poll_next_group(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
585 loop {
586 let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
587 return Poll::Ready(Ok(None));
588 };
589 if group.sequence < self.next_sequence {
590 continue;
592 }
593 self.next_sequence = group.sequence.saturating_add(1);
594 return Poll::Ready(Ok(Some(group)));
595 }
596 }
597
598 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
604 kio::wait(|waiter| self.poll_next_group(waiter)).await
605 }
606
607 pub fn poll_read_frame(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
611 let lower = self.min_sequence.max(self.next_sequence);
612 let Some((frame, found_index, sequence)) =
613 ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
614 else {
615 return Poll::Ready(Ok(None));
616 };
617
618 self.index = found_index + 1;
619 self.next_sequence = sequence.saturating_add(1);
620 Poll::Ready(Ok(Some(frame)))
621 }
622
623 pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
627 kio::wait(|waiter| self.poll_read_frame(waiter)).await
628 }
629
630 pub fn poll_get_group(&self, waiter: &kio::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
632 self.poll(waiter, |state| state.poll_get_group(sequence))
633 }
634
635 pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
643 kio::wait(|waiter| self.poll_get_group(waiter, sequence)).await
644 }
645
646 pub fn poll_closed(&self, waiter: &kio::Waiter) -> Poll<Result<()>> {
648 self.poll(waiter, |state| state.poll_closed())
649 }
650
651 pub async fn closed(&self) -> Result<()> {
655 kio::wait(|waiter| self.poll_closed(waiter)).await
656 }
657
658 pub fn is_clone(&self, other: &Self) -> bool {
660 self.state.same_channel(&other.state)
661 }
662
663 pub fn poll_finished(&mut self, waiter: &kio::Waiter) -> Poll<Result<u64>> {
665 self.poll(waiter, |state| state.poll_finished())
666 }
667
668 pub async fn finished(&mut self) -> Result<u64> {
670 kio::wait(|waiter| self.poll_finished(waiter)).await
671 }
672
673 pub fn start_at(&mut self, sequence: u64) {
675 self.min_sequence = sequence;
676 }
677
678 pub fn latest(&self) -> Option<u64> {
680 self.state.read().max_sequence
681 }
682
683 pub(crate) fn weak(&self) -> TrackWeak {
685 TrackWeak {
686 info: self.info.clone(),
687 state: self.state.weak(),
688 }
689 }
690}
691
692#[cfg(test)]
693use futures::FutureExt;
694
695#[cfg(test)]
696impl TrackConsumer {
697 pub fn assert_group(&mut self) -> GroupConsumer {
698 self.recv_group()
699 .now_or_never()
700 .expect("group would have blocked")
701 .expect("would have errored")
702 .expect("track was closed")
703 }
704
705 pub fn assert_no_group(&mut self) {
706 assert!(
707 self.recv_group().now_or_never().is_none(),
708 "recv_group would not have blocked"
709 );
710 }
711
712 pub fn assert_not_closed(&self) {
713 assert!(self.closed().now_or_never().is_none(), "should not be closed");
714 }
715
716 pub fn assert_closed(&self) {
717 assert!(self.closed().now_or_never().is_some(), "should be closed");
718 }
719
720 pub fn assert_error(&self) {
722 assert!(
723 self.closed().now_or_never().expect("should not block").is_err(),
724 "should be error"
725 );
726 }
727
728 pub fn assert_is_clone(&self, other: &Self) {
729 assert!(self.is_clone(other), "should be clone");
730 }
731
732 pub fn assert_not_clone(&self, other: &Self) {
733 assert!(!self.is_clone(other), "should not be clone");
734 }
735}
736
737#[cfg(test)]
738mod test {
739 use super::*;
740
741 fn live_groups(state: &State) -> usize {
743 state.groups.iter().flatten().count()
744 }
745
746 fn first_live_sequence(state: &State) -> u64 {
748 state.groups.iter().flatten().next().unwrap().0.sequence
749 }
750
751 #[tokio::test]
752 async fn evict_expired_groups() {
753 tokio::time::pause();
754
755 let mut producer = Track::new("test").produce();
756
757 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
763 let state = producer.state.read();
764 assert_eq!(live_groups(&state), 3);
765 assert_eq!(state.offset, 0);
766 }
767
768 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
770
771 producer.append_group().unwrap(); {
777 let state = producer.state.read();
778 assert_eq!(live_groups(&state), 1);
779 assert_eq!(first_live_sequence(&state), 3);
780 assert_eq!(state.offset, 3);
781 assert!(!state.duplicates.contains(&0));
782 assert!(!state.duplicates.contains(&1));
783 assert!(!state.duplicates.contains(&2));
784 assert!(state.duplicates.contains(&3));
785 }
786 }
787
788 #[tokio::test]
789 async fn evict_keeps_max_sequence() {
790 tokio::time::pause();
791
792 let mut producer = Track::new("test").produce();
793 producer.append_group().unwrap(); tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
797
798 producer.append_group().unwrap(); {
802 let state = producer.state.read();
803 assert_eq!(live_groups(&state), 1);
804 assert_eq!(first_live_sequence(&state), 1);
805 assert_eq!(state.offset, 1);
806 }
807 }
808
809 #[tokio::test]
810 async fn no_eviction_when_fresh() {
811 tokio::time::pause();
812
813 let mut producer = Track::new("test").produce();
814 producer.append_group().unwrap(); producer.append_group().unwrap(); producer.append_group().unwrap(); {
819 let state = producer.state.read();
820 assert_eq!(live_groups(&state), 3);
821 assert_eq!(state.offset, 0);
822 }
823 }
824
825 #[tokio::test]
826 async fn consumer_skips_evicted_groups() {
827 tokio::time::pause();
828
829 let mut producer = Track::new("test").produce();
830 producer.append_group().unwrap(); let mut consumer = producer.consume();
833
834 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
835 producer.append_group().unwrap(); let group = consumer.assert_group();
839 assert_eq!(group.sequence, 1);
840 }
841
842 #[tokio::test]
843 async fn out_of_order_max_sequence_at_front() {
844 tokio::time::pause();
845
846 let mut producer = Track::new("test").produce();
847
848 producer.create_group(Group { sequence: 5 }).unwrap();
850 producer.create_group(Group { sequence: 3 }).unwrap();
851 producer.create_group(Group { sequence: 4 }).unwrap();
852
853 {
855 let state = producer.state.read();
856 assert_eq!(state.max_sequence, Some(5));
857 }
858
859 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
861
862 producer.append_group().unwrap(); {
868 let state = producer.state.read();
869 assert_eq!(live_groups(&state), 1);
870 assert_eq!(first_live_sequence(&state), 6);
871 assert!(!state.duplicates.contains(&3));
872 assert!(!state.duplicates.contains(&4));
873 assert!(!state.duplicates.contains(&5));
874 assert!(state.duplicates.contains(&6));
875 }
876 }
877
878 #[tokio::test]
879 async fn max_sequence_at_front_blocks_trim() {
880 tokio::time::pause();
881
882 let mut producer = Track::new("test").produce();
883
884 producer.create_group(Group { sequence: 5 }).unwrap();
886
887 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
888
889 producer.create_group(Group { sequence: 3 }).unwrap();
891
892 {
895 let state = producer.state.read();
896 assert_eq!(live_groups(&state), 2);
897 assert_eq!(state.offset, 0);
898 }
899
900 tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
902
903 producer.create_group(Group { sequence: 2 }).unwrap();
905
906 {
911 let state = producer.state.read();
912 assert_eq!(live_groups(&state), 2);
913 assert_eq!(state.offset, 0);
914 assert!(state.duplicates.contains(&5));
915 assert!(!state.duplicates.contains(&3));
916 assert!(state.duplicates.contains(&2));
917 }
918
919 let mut consumer = producer.consume();
921 let group = consumer.assert_group();
922 assert_eq!(group.sequence, 5);
924 }
925
926 #[tokio::test]
927 async fn abort_clears_cached_groups() {
928 let mut producer = Track::new("test").produce();
929 producer.append_group().unwrap();
930 producer.append_group().unwrap();
931
932 let mut consumer = producer.consume();
934 assert_eq!(live_groups(&producer.state.read()), 2);
935
936 producer.abort(Error::Cancel).unwrap();
937
938 {
939 let state = producer.state.read();
940 assert!(state.groups.is_empty(), "cached groups should be dropped on abort");
941 assert!(state.duplicates.is_empty());
942 }
943
944 let result = consumer.recv_group().now_or_never().expect("should not block");
946 assert!(matches!(result, Err(Error::Cancel)));
947 }
948
949 #[tokio::test]
950 async fn drop_unfinished_clears_cached_groups() {
951 let producer = Track::new("test").produce();
952 let mut writer = producer.clone();
953 writer.append_group().unwrap();
954
955 let mut consumer = producer.consume();
957 assert_eq!(live_groups(&producer.state.read()), 1);
958
959 drop(writer);
961 drop(producer);
962
963 let result = consumer.recv_group().now_or_never().expect("should not block");
964 assert!(matches!(result, Err(Error::Dropped)));
965 }
966
967 #[tokio::test]
968 async fn drop_finished_keeps_cached_groups() {
969 let mut producer = Track::new("test").produce();
970 producer.append_group().unwrap();
971 producer.finish().unwrap();
972
973 let mut consumer = producer.consume();
974 drop(producer);
975
976 assert_eq!(consumer.assert_group().sequence, 0);
978 let done = consumer.recv_group().now_or_never().expect("should not block").unwrap();
979 assert!(done.is_none(), "consumer should drain then see clean finish");
980 }
981
982 #[test]
983 fn append_finish_cannot_be_rewritten() {
984 let mut producer = Track::new("test").produce();
985
986 assert!(producer.finish().is_ok());
988 assert!(producer.finish().is_err());
989 assert!(producer.append_group().is_err());
990 }
991
992 #[test]
993 fn finish_after_groups() {
994 let mut producer = Track::new("test").produce();
995
996 producer.append_group().unwrap();
997 assert!(producer.finish().is_ok());
998 assert!(producer.finish().is_err());
999 assert!(producer.append_group().is_err());
1000 }
1001
1002 #[test]
1003 fn insert_finish_validates_sequence_and_freezes_to_max() {
1004 let mut producer = Track::new("test").produce();
1005 producer.create_group(Group { sequence: 5 }).unwrap();
1006
1007 assert!(producer.finish_at(4).is_err());
1008 assert!(producer.finish_at(10).is_err());
1009 assert!(producer.finish_at(5).is_ok());
1010
1011 {
1012 let state = producer.state.read();
1013 assert_eq!(state.final_sequence, Some(6));
1014 }
1015
1016 assert!(producer.finish_at(5).is_err());
1017 assert!(producer.create_group(Group { sequence: 4 }).is_ok());
1018 assert!(producer.create_group(Group { sequence: 5 }).is_err());
1019 }
1020
1021 #[tokio::test]
1022 async fn recv_group_finishes_without_waiting_for_gaps() {
1023 let mut producer = Track::new("test").produce();
1024 producer.create_group(Group { sequence: 1 }).unwrap();
1025 producer.finish_at(1).unwrap();
1026
1027 let mut consumer = producer.consume();
1028 assert_eq!(consumer.assert_group().sequence, 1);
1029
1030 let done = consumer
1031 .recv_group()
1032 .now_or_never()
1033 .expect("should not block")
1034 .expect("would have errored");
1035 assert!(done.is_none(), "track should finish without waiting for gaps");
1036 }
1037
1038 #[tokio::test]
1039 async fn next_group_skips_late_arrivals() {
1040 let mut producer = Track::new("test").produce();
1041 let mut consumer = producer.consume();
1042
1043 producer.create_group(Group { sequence: 5 }).unwrap();
1045 let group = consumer
1046 .next_group()
1047 .now_or_never()
1048 .expect("should not block")
1049 .expect("would have errored")
1050 .expect("track should not be closed");
1051 assert_eq!(group.sequence, 5);
1052
1053 producer.create_group(Group { sequence: 3 }).unwrap();
1055 producer.create_group(Group { sequence: 4 }).unwrap();
1057 producer.create_group(Group { sequence: 7 }).unwrap();
1059
1060 let group = consumer
1061 .next_group()
1062 .now_or_never()
1063 .expect("should not block")
1064 .expect("would have errored")
1065 .expect("track should not be closed");
1066 assert_eq!(group.sequence, 7);
1067
1068 assert!(
1070 consumer.next_group().now_or_never().is_none(),
1071 "should block waiting for a higher sequence"
1072 );
1073 }
1074
1075 #[tokio::test]
1076 async fn next_group_returns_arrivals_in_order() {
1077 let mut producer = Track::new("test").produce();
1078 let mut consumer = producer.consume();
1079
1080 producer.create_group(Group { sequence: 3 }).unwrap();
1082 producer.create_group(Group { sequence: 5 }).unwrap();
1083
1084 let group = consumer
1085 .next_group()
1086 .now_or_never()
1087 .expect("should not block")
1088 .expect("would have errored")
1089 .expect("track should not be closed");
1090 assert_eq!(group.sequence, 3);
1091
1092 let group = consumer
1093 .next_group()
1094 .now_or_never()
1095 .expect("should not block")
1096 .expect("would have errored")
1097 .expect("track should not be closed");
1098 assert_eq!(group.sequence, 5);
1099 }
1100
1101 #[tokio::test]
1102 async fn recv_group_after_next_group_sees_late_arrivals() {
1103 let mut producer = Track::new("test").produce();
1104 let mut consumer = producer.consume();
1105
1106 producer.create_group(Group { sequence: 5 }).unwrap();
1107 producer.create_group(Group { sequence: 3 }).unwrap();
1108
1109 let group = consumer
1111 .next_group()
1112 .now_or_never()
1113 .expect("should not block")
1114 .expect("would have errored")
1115 .expect("track should not be closed");
1116 assert_eq!(group.sequence, 5);
1117
1118 assert_eq!(consumer.assert_group().sequence, 3);
1121 }
1122
1123 #[tokio::test]
1124 async fn read_frame_returns_single_frame_per_group() {
1125 let mut producer = Track::new("test").produce();
1126 let mut consumer = producer.consume();
1127
1128 producer.write_frame(b"hello".as_slice()).unwrap();
1129 producer.write_frame(b"world".as_slice()).unwrap();
1130
1131 let frame = consumer
1132 .read_frame()
1133 .now_or_never()
1134 .expect("should not block")
1135 .expect("would have errored")
1136 .expect("track should not be closed");
1137 assert_eq!(&frame[..], b"hello");
1138
1139 let frame = consumer
1140 .read_frame()
1141 .now_or_never()
1142 .expect("should not block")
1143 .expect("would have errored")
1144 .expect("track should not be closed");
1145 assert_eq!(&frame[..], b"world");
1146 }
1147
1148 #[tokio::test]
1149 async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1150 let mut producer = Track::new("test").produce();
1151 let mut consumer = producer.consume();
1152
1153 let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1155 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1157 g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1158 g5.finish().unwrap();
1159
1160 let frame = consumer
1162 .read_frame()
1163 .now_or_never()
1164 .expect("should not block on stalled earlier group")
1165 .expect("would have errored")
1166 .expect("track should not be closed");
1167 assert_eq!(&frame[..], b"later");
1168 }
1169
1170 #[tokio::test]
1171 async fn read_frame_discards_rest_of_multi_frame_group() {
1172 let mut producer = Track::new("test").produce();
1173 let mut consumer = producer.consume();
1174
1175 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1177 g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1178 g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1179 g0.finish().unwrap();
1180
1181 producer.write_frame(b"next".as_slice()).unwrap();
1183
1184 let frame = consumer
1185 .read_frame()
1186 .now_or_never()
1187 .expect("should not block")
1188 .expect("would have errored")
1189 .expect("track should not be closed");
1190 assert_eq!(&frame[..], b"one");
1191
1192 let frame = consumer
1194 .read_frame()
1195 .now_or_never()
1196 .expect("should not block")
1197 .expect("would have errored")
1198 .expect("track should not be closed");
1199 assert_eq!(&frame[..], b"next");
1200 }
1201
1202 #[tokio::test]
1203 async fn read_frame_waits_for_pending_group_after_finish() {
1204 let mut producer = Track::new("test").produce();
1207 let mut consumer = producer.consume();
1208
1209 let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1210 producer.finish().unwrap();
1211
1212 assert!(
1214 consumer.read_frame().now_or_never().is_none(),
1215 "read_frame must block on a pending group even after finish()"
1216 );
1217
1218 g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1220 let frame = consumer
1221 .read_frame()
1222 .now_or_never()
1223 .expect("should not block once a frame is written")
1224 .expect("would have errored")
1225 .expect("track should not be closed");
1226 assert_eq!(&frame[..], b"late");
1227 }
1228
1229 #[tokio::test]
1230 async fn read_frame_respects_start_at() {
1231 let mut producer = Track::new("test").produce();
1234 let mut consumer = producer.consume();
1235 consumer.start_at(5);
1236
1237 let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1239 g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1240 g3.finish().unwrap();
1241
1242 let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1243 g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1244 g5.finish().unwrap();
1245
1246 let frame = consumer
1247 .read_frame()
1248 .now_or_never()
1249 .expect("should not block")
1250 .expect("would have errored")
1251 .expect("track should not be closed");
1252 assert_eq!(&frame[..], b"keep");
1253 }
1254
1255 #[tokio::test]
1256 async fn read_frame_returns_none_when_finished() {
1257 let mut producer = Track::new("test").produce();
1258 let mut consumer = producer.consume();
1259
1260 producer.write_frame(b"only".as_slice()).unwrap();
1261 producer.finish().unwrap();
1262
1263 let frame = consumer
1264 .read_frame()
1265 .now_or_never()
1266 .expect("should not block")
1267 .expect("would have errored")
1268 .expect("track should not be closed");
1269 assert_eq!(&frame[..], b"only");
1270
1271 let done = consumer
1272 .read_frame()
1273 .now_or_never()
1274 .expect("should not block")
1275 .expect("would have errored");
1276 assert!(done.is_none());
1277 }
1278
1279 #[tokio::test]
1280 async fn get_group_finishes_without_waiting_for_gaps() {
1281 let mut producer = Track::new("test").produce();
1282 producer.create_group(Group { sequence: 1 }).unwrap();
1283 producer.finish_at(1).unwrap();
1284
1285 let consumer = producer.consume();
1286 assert!(
1288 consumer.get_group(0).now_or_never().is_none(),
1289 "sequence below fin should block (group could still arrive)"
1290 );
1291 assert!(
1292 consumer
1293 .get_group(2)
1294 .now_or_never()
1295 .expect("sequence at-or-after fin should resolve")
1296 .expect("should not error")
1297 .is_none(),
1298 "sequence at-or-after fin should not exist"
1299 );
1300 }
1301
1302 #[test]
1303 fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1304 let mut producer = Track::new("test").produce();
1305 {
1306 let mut state = producer.state.write().ok().unwrap();
1307 state.max_sequence = Some(u64::MAX);
1308 }
1309
1310 assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1311 }
1312}