Skip to main content

moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use crate::{Error, Result};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20	collections::{HashSet, VecDeque},
21	task::{Poll, ready},
22	time::Duration,
23};
24
25/// Groups older than this are evicted from the track cache (unless they are the max_sequence group).
26// TODO: Replace with a configurable cache size.
27const MAX_GROUP_AGE: Duration = Duration::from_secs(30);
28
29/// A track is a collection of groups, delivered out-of-order until expired.
30#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33	pub name: String,
34	pub priority: u8,
35}
36
37impl Track {
38	pub fn new<T: Into<String>>(name: T) -> Self {
39		Self {
40			name: name.into(),
41			priority: 0,
42		}
43	}
44
45	pub fn produce(self) -> TrackProducer {
46		TrackProducer::new(self)
47	}
48}
49
50#[derive(Default)]
51struct State {
52	/// Groups in arrival order. `None` entries are tombstones for evicted groups.
53	groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
54	duplicates: HashSet<u64>,
55	offset: usize,
56	max_sequence: Option<u64>,
57	final_sequence: Option<u64>,
58	abort: Option<Error>,
59}
60
61impl State {
62	/// Find the next non-tombstoned group at or after `index`.
63	///
64	/// Returns the group and its absolute index so the consumer can advance past it.
65	fn poll_next_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
66		let start = index.saturating_sub(self.offset);
67		for (i, slot) in self.groups.iter().enumerate().skip(start) {
68			if let Some((group, _)) = slot
69				&& group.info.sequence >= min_sequence
70			{
71				return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
72			}
73		}
74
75		// TODO once we have drop notifications, check if index == final_sequence.
76		if self.final_sequence.is_some() {
77			Poll::Ready(Ok(None))
78		} else if let Some(err) = &self.abort {
79			Poll::Ready(Err(err.clone()))
80		} else {
81			Poll::Pending
82		}
83	}
84
85	fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
86		// Search for the group with the matching sequence, skipping tombstones.
87		for (group, _) in self.groups.iter().flatten() {
88			if group.info.sequence == sequence {
89				return Poll::Ready(Ok(Some(group.consume())));
90			}
91		}
92
93		// Once final_sequence is set, groups at or past it can never exist.
94		if let Some(fin) = self.final_sequence
95			&& sequence >= fin
96		{
97			return Poll::Ready(Ok(None));
98		}
99
100		if let Some(err) = &self.abort {
101			return Poll::Ready(Err(err.clone()));
102		}
103
104		Poll::Pending
105	}
106
107	fn poll_closed(&self) -> Poll<Result<()>> {
108		if self.final_sequence.is_some() {
109			Poll::Ready(Ok(()))
110		} else if let Some(err) = &self.abort {
111			Poll::Ready(Err(err.clone()))
112		} else {
113			Poll::Pending
114		}
115	}
116
117	/// Evict groups older than MAX_GROUP_AGE, never evicting the max_sequence group.
118	///
119	/// Groups are in arrival order, so we can stop early when we hit a non-expired,
120	/// non-max_sequence group (everything after it arrived even later).
121	/// When max_sequence is at the front, we skip past it and tombstone expired groups
122	/// behind it.
123	fn evict_expired(&mut self, now: tokio::time::Instant) {
124		for slot in self.groups.iter_mut() {
125			let Some((group, created_at)) = slot else { continue };
126
127			if Some(group.info.sequence) == self.max_sequence {
128				continue;
129			}
130
131			if now.duration_since(*created_at) <= MAX_GROUP_AGE {
132				break;
133			}
134
135			self.duplicates.remove(&group.info.sequence);
136			*slot = None;
137		}
138
139		// Trim leading tombstones to advance the offset.
140		while let Some(None) = self.groups.front() {
141			self.groups.pop_front();
142			self.offset += 1;
143		}
144	}
145
146	fn poll_finished(&self) -> Poll<Result<u64>> {
147		if let Some(fin) = self.final_sequence {
148			Poll::Ready(Ok(fin))
149		} else if let Some(err) = &self.abort {
150			Poll::Ready(Err(err.clone()))
151		} else {
152			Poll::Pending
153		}
154	}
155}
156
157/// A producer for a track, used to create new groups.
158pub struct TrackProducer {
159	pub info: Track,
160	state: conducer::Producer<State>,
161}
162
163impl TrackProducer {
164	pub fn new(info: Track) -> Self {
165		Self {
166			info,
167			state: conducer::Producer::default(),
168		}
169	}
170
171	/// Create a new group with the given sequence number.
172	pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
173		let group = info.produce();
174
175		let mut state = self.modify()?;
176		if let Some(fin) = state.final_sequence
177			&& group.info.sequence >= fin
178		{
179			return Err(Error::Closed);
180		}
181
182		if !state.duplicates.insert(group.info.sequence) {
183			return Err(Error::Duplicate);
184		}
185
186		let now = tokio::time::Instant::now();
187		state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
188		state.groups.push_back(Some((group.clone(), now)));
189		state.evict_expired(now);
190
191		Ok(group)
192	}
193
194	/// Create a new group with the next sequence number.
195	pub fn append_group(&mut self) -> Result<GroupProducer> {
196		let mut state = self.modify()?;
197		let sequence = match state.max_sequence {
198			Some(s) => s.checked_add(1).ok_or(Error::BoundsExceeded)?,
199			None => 0,
200		};
201		if let Some(fin) = state.final_sequence
202			&& sequence >= fin
203		{
204			return Err(Error::Closed);
205		}
206
207		let group = Group { sequence }.produce();
208
209		let now = tokio::time::Instant::now();
210		state.duplicates.insert(sequence);
211		state.max_sequence = Some(sequence);
212		state.groups.push_back(Some((group.clone(), now)));
213		state.evict_expired(now);
214
215		Ok(group)
216	}
217
218	/// Create a group with a single frame.
219	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
220		let mut group = self.append_group()?;
221		group.write_frame(frame.into())?;
222		group.finish()?;
223		Ok(())
224	}
225
226	/// Mark the track as finished after the last appended group.
227	///
228	/// Sets the final sequence to one past the current max_sequence.
229	/// No new groups at or above this sequence can be appended.
230	/// NOTE: Old groups with lower sequence numbers can still arrive.
231	pub fn finish(&mut self) -> Result<()> {
232		let mut state = self.modify()?;
233		if state.final_sequence.is_some() {
234			return Err(Error::Closed);
235		}
236		state.final_sequence = Some(match state.max_sequence {
237			Some(max) => max.checked_add(1).ok_or(Error::BoundsExceeded)?,
238			None => 0,
239		});
240		Ok(())
241	}
242
243	/// Mark the track as finished after the last appended group.
244	///
245	/// Deprecated: use [`Self::finish`] for this behavior, or
246	/// [`Self::finish_at`] to set an explicit final sequence.
247	#[deprecated(note = "use finish() or finish_at(sequence) instead")]
248	pub fn close(&mut self) -> Result<()> {
249		self.finish()
250	}
251
252	/// Mark the track as finished at an exact final sequence.
253	///
254	/// The caller must pass the current max_sequence exactly.
255	/// Freezes the final boundary at one past the current max_sequence.
256	/// No new groups at or above that sequence can be created.
257	/// NOTE: Old groups with lower sequence numbers can still arrive.
258	pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
259		let mut state = self.modify()?;
260		let max = state.max_sequence.ok_or(Error::Closed)?;
261		if state.final_sequence.is_some() || sequence != max {
262			return Err(Error::Closed);
263		}
264		state.final_sequence = Some(max.checked_add(1).ok_or(Error::BoundsExceeded)?);
265		Ok(())
266	}
267
268	/// Abort the track with the given error.
269	pub fn abort(&mut self, err: Error) -> Result<()> {
270		let mut guard = self.modify()?;
271
272		// Abort all groups still in progress.
273		for (group, _) in guard.groups.iter_mut().flatten() {
274			// Ignore errors, we don't care if the group was already closed.
275			group.abort(err.clone()).ok();
276		}
277
278		guard.abort = Some(err);
279		guard.close();
280		Ok(())
281	}
282
283	/// Create a new consumer for the track, starting at the beginning.
284	pub fn consume(&self) -> TrackConsumer {
285		TrackConsumer {
286			info: self.info.clone(),
287			state: self.state.consume(),
288			index: 0,
289			min_sequence: 0,
290		}
291	}
292
293	/// Block until there are no active consumers.
294	pub async fn unused(&self) -> Result<()> {
295		self.state
296			.unused()
297			.await
298			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
299	}
300
301	/// Return true if the track has been closed.
302	pub fn is_closed(&self) -> bool {
303		self.state.read().is_closed()
304	}
305
306	/// Return true if this is the same track.
307	pub fn is_clone(&self, other: &Self) -> bool {
308		self.state.same_channel(&other.state)
309	}
310
311	/// Create a weak reference that doesn't prevent auto-close.
312	pub(crate) fn weak(&self) -> TrackWeak {
313		TrackWeak {
314			info: self.info.clone(),
315			state: self.state.weak(),
316		}
317	}
318
319	fn modify(&self) -> Result<conducer::Mut<'_, State>> {
320		self.state
321			.write()
322			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
323	}
324}
325
326impl Clone for TrackProducer {
327	fn clone(&self) -> Self {
328		Self {
329			info: self.info.clone(),
330			state: self.state.clone(),
331		}
332	}
333}
334
335impl From<Track> for TrackProducer {
336	fn from(info: Track) -> Self {
337		TrackProducer::new(info)
338	}
339}
340
341/// A weak reference to a track that doesn't prevent auto-close.
342#[derive(Clone)]
343pub(crate) struct TrackWeak {
344	pub info: Track,
345	state: conducer::Weak<State>,
346}
347
348impl TrackWeak {
349	pub fn abort(&self, err: Error) {
350		let Ok(mut guard) = self.state.write() else { return };
351
352		// Cascade abort to all groups.
353		for (group, _) in guard.groups.iter_mut().flatten() {
354			group.abort(err.clone()).ok();
355		}
356
357		guard.abort = Some(err);
358		guard.close();
359	}
360
361	pub fn is_closed(&self) -> bool {
362		self.state.is_closed()
363	}
364
365	pub fn consume(&self) -> TrackConsumer {
366		TrackConsumer {
367			info: self.info.clone(),
368			state: self.state.consume(),
369			index: 0,
370			min_sequence: 0,
371		}
372	}
373
374	pub async fn unused(&self) -> crate::Result<()> {
375		self.state
376			.unused()
377			.await
378			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
379	}
380
381	pub fn is_clone(&self, other: &Self) -> bool {
382		self.state.same_channel(&other.state)
383	}
384}
385
386/// A consumer for a track, used to read groups.
387#[derive(Clone)]
388pub struct TrackConsumer {
389	pub info: Track,
390	state: conducer::Consumer<State>,
391	index: usize,
392
393	min_sequence: u64,
394}
395
396impl TrackConsumer {
397	// A helper to automatically apply Dropped if the state is closed without an error.
398	fn poll<F, R>(&self, waiter: &conducer::Waiter, f: F) -> Poll<Result<R>>
399	where
400		F: Fn(&conducer::Ref<'_, State>) -> Poll<Result<R>>,
401	{
402		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
403			Ok(res) => res,
404			// We try to clone abort just in case the function forgot to check for terminal state.
405			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
406		})
407	}
408
409	/// Poll for the next group without blocking.
410	///
411	/// Returns `Poll::Ready(Some(Ok(group)))` when a group is available,
412	/// `Poll::Ready(None)` when the track is finished,
413	/// `Poll::Ready(Some(Err(e)))` when the track has been aborted, or
414	/// `Poll::Pending` when no group is available yet.
415	pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
416		let Some((consumer, found_index)) =
417			ready!(self.poll(waiter, |state| state.poll_next_group(self.index, self.min_sequence))?)
418		else {
419			return Poll::Ready(Ok(None));
420		};
421
422		self.index = found_index + 1;
423		Poll::Ready(Ok(Some(consumer)))
424	}
425
426	/// Return the next group in order.
427	///
428	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
429	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
430		conducer::wait(|waiter| self.poll_next_group(waiter)).await
431	}
432
433	/// Poll for the group with the given sequence, without blocking.
434	pub fn poll_get_group(&self, waiter: &conducer::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
435		self.poll(waiter, |state| state.poll_get_group(sequence))
436	}
437
438	/// Block until the group with the given sequence is available.
439	///
440	/// Returns None if the group is not in the cache and a newer group exists.
441	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
442		conducer::wait(|waiter| self.poll_get_group(waiter, sequence)).await
443	}
444
445	/// Poll for track closure, without blocking.
446	pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<Result<()>> {
447		self.poll(waiter, |state| state.poll_closed())
448	}
449
450	/// Block until the track is closed.
451	///
452	/// Returns Ok() is the track was cleanly finished.
453	pub async fn closed(&self) -> Result<()> {
454		conducer::wait(|waiter| self.poll_closed(waiter)).await
455	}
456
457	pub fn is_clone(&self, other: &Self) -> bool {
458		self.state.same_channel(&other.state)
459	}
460
461	/// Poll for the total number of groups in the track.
462	pub fn poll_finished(&mut self, waiter: &conducer::Waiter) -> Poll<Result<u64>> {
463		self.poll(waiter, |state| state.poll_finished())
464	}
465
466	/// Block until the track is finished, returning the total number of groups.
467	pub async fn finished(&mut self) -> Result<u64> {
468		conducer::wait(|waiter| self.poll_finished(waiter)).await
469	}
470
471	/// Start the consumer at the specified sequence.
472	pub fn start_at(&mut self, sequence: u64) {
473		self.min_sequence = sequence;
474	}
475
476	/// Return the latest sequence number in the track.
477	pub fn latest(&self) -> Option<u64> {
478		self.state.read().max_sequence
479	}
480}
481
482#[cfg(test)]
483use futures::FutureExt;
484
485#[cfg(test)]
486impl TrackConsumer {
487	pub fn assert_group(&mut self) -> GroupConsumer {
488		self.next_group()
489			.now_or_never()
490			.expect("group would have blocked")
491			.expect("would have errored")
492			.expect("track was closed")
493	}
494
495	pub fn assert_no_group(&mut self) {
496		assert!(
497			self.next_group().now_or_never().is_none(),
498			"next group would not have blocked"
499		);
500	}
501
502	pub fn assert_not_closed(&self) {
503		assert!(self.closed().now_or_never().is_none(), "should not be closed");
504	}
505
506	pub fn assert_closed(&self) {
507		assert!(self.closed().now_or_never().is_some(), "should be closed");
508	}
509
510	// TODO assert specific errors after implementing PartialEq
511	pub fn assert_error(&self) {
512		assert!(
513			self.closed().now_or_never().expect("should not block").is_err(),
514			"should be error"
515		);
516	}
517
518	pub fn assert_is_clone(&self, other: &Self) {
519		assert!(self.is_clone(other), "should be clone");
520	}
521
522	pub fn assert_not_clone(&self, other: &Self) {
523		assert!(!self.is_clone(other), "should not be clone");
524	}
525}
526
527#[cfg(test)]
528mod test {
529	use super::*;
530
531	/// Helper: count non-tombstoned groups in state.
532	fn live_groups(state: &State) -> usize {
533		state.groups.iter().flatten().count()
534	}
535
536	/// Helper: get the sequence number of the first live group.
537	fn first_live_sequence(state: &State) -> u64 {
538		state.groups.iter().flatten().next().unwrap().0.info.sequence
539	}
540
541	#[tokio::test]
542	async fn evict_expired_groups() {
543		tokio::time::pause();
544
545		let mut producer = Track::new("test").produce();
546
547		// Create 3 groups at time 0.
548		producer.append_group().unwrap(); // seq 0
549		producer.append_group().unwrap(); // seq 1
550		producer.append_group().unwrap(); // seq 2
551
552		{
553			let state = producer.state.read();
554			assert_eq!(live_groups(&state), 3);
555			assert_eq!(state.offset, 0);
556		}
557
558		// Advance time past the eviction threshold.
559		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
560
561		// Append a new group to trigger eviction.
562		producer.append_group().unwrap(); // seq 3
563
564		// Groups 0, 1, 2 are expired but seq 3 (max_sequence) is kept.
565		// Leading tombstones are trimmed, so only seq 3 remains.
566		{
567			let state = producer.state.read();
568			assert_eq!(live_groups(&state), 1);
569			assert_eq!(first_live_sequence(&state), 3);
570			assert_eq!(state.offset, 3);
571			assert!(!state.duplicates.contains(&0));
572			assert!(!state.duplicates.contains(&1));
573			assert!(!state.duplicates.contains(&2));
574			assert!(state.duplicates.contains(&3));
575		}
576	}
577
578	#[tokio::test]
579	async fn evict_keeps_max_sequence() {
580		tokio::time::pause();
581
582		let mut producer = Track::new("test").produce();
583		producer.append_group().unwrap(); // seq 0
584
585		// Advance time past threshold.
586		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
587
588		// Append another group; seq 0 is expired and evicted.
589		producer.append_group().unwrap(); // seq 1
590
591		{
592			let state = producer.state.read();
593			assert_eq!(live_groups(&state), 1);
594			assert_eq!(first_live_sequence(&state), 1);
595			assert_eq!(state.offset, 1);
596		}
597	}
598
599	#[tokio::test]
600	async fn no_eviction_when_fresh() {
601		tokio::time::pause();
602
603		let mut producer = Track::new("test").produce();
604		producer.append_group().unwrap(); // seq 0
605		producer.append_group().unwrap(); // seq 1
606		producer.append_group().unwrap(); // seq 2
607
608		{
609			let state = producer.state.read();
610			assert_eq!(live_groups(&state), 3);
611			assert_eq!(state.offset, 0);
612		}
613	}
614
615	#[tokio::test]
616	async fn consumer_skips_evicted_groups() {
617		tokio::time::pause();
618
619		let mut producer = Track::new("test").produce();
620		producer.append_group().unwrap(); // seq 0
621
622		let mut consumer = producer.consume();
623
624		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
625		producer.append_group().unwrap(); // seq 1
626
627		// Group 0 was evicted. Consumer should get group 1.
628		let group = consumer.assert_group();
629		assert_eq!(group.info.sequence, 1);
630	}
631
632	#[tokio::test]
633	async fn out_of_order_max_sequence_at_front() {
634		tokio::time::pause();
635
636		let mut producer = Track::new("test").produce();
637
638		// Arrive out of order: seq 5 first, then 3, then 4.
639		producer.create_group(Group { sequence: 5 }).unwrap();
640		producer.create_group(Group { sequence: 3 }).unwrap();
641		producer.create_group(Group { sequence: 4 }).unwrap();
642
643		// max_sequence = 5, which is at the front of the VecDeque.
644		{
645			let state = producer.state.read();
646			assert_eq!(state.max_sequence, Some(5));
647		}
648
649		// Expire all three groups.
650		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
651
652		// Append seq 6 (becomes new max_sequence).
653		producer.append_group().unwrap(); // seq 6
654
655		// Seq 3, 4, 5 are all expired. Seq 5 was the old max_sequence but now 6 is.
656		// All old groups are evicted.
657		{
658			let state = producer.state.read();
659			assert_eq!(live_groups(&state), 1);
660			assert_eq!(first_live_sequence(&state), 6);
661			assert!(!state.duplicates.contains(&3));
662			assert!(!state.duplicates.contains(&4));
663			assert!(!state.duplicates.contains(&5));
664			assert!(state.duplicates.contains(&6));
665		}
666	}
667
668	#[tokio::test]
669	async fn max_sequence_at_front_blocks_trim() {
670		tokio::time::pause();
671
672		let mut producer = Track::new("test").produce();
673
674		// Arrive: seq 5, then seq 3.
675		producer.create_group(Group { sequence: 5 }).unwrap();
676
677		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
678
679		// Seq 3 arrives late; max_sequence is still 5 (at front).
680		producer.create_group(Group { sequence: 3 }).unwrap();
681
682		// Seq 5 is max_sequence (protected). Seq 3 is not expired (just created).
683		// Nothing should be evicted.
684		{
685			let state = producer.state.read();
686			assert_eq!(live_groups(&state), 2);
687			assert_eq!(state.offset, 0);
688		}
689
690		// Expire seq 3 as well.
691		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
692
693		// Seq 2 arrives late, triggering eviction.
694		producer.create_group(Group { sequence: 2 }).unwrap();
695
696		// Seq 5 is still max_sequence (protected, at front, blocks trim).
697		// Seq 3 is expired → tombstoned.
698		// Seq 2 is fresh → kept.
699		// VecDeque: [Some(5), None, Some(2)]. Leading entry is Some, so offset stays.
700		{
701			let state = producer.state.read();
702			assert_eq!(live_groups(&state), 2);
703			assert_eq!(state.offset, 0);
704			assert!(state.duplicates.contains(&5));
705			assert!(!state.duplicates.contains(&3));
706			assert!(state.duplicates.contains(&2));
707		}
708
709		// Consumer should still be able to read through the hole.
710		let mut consumer = producer.consume();
711		let group = consumer.assert_group();
712		// consume() starts at index 0, first non-tombstoned group is seq 5.
713		assert_eq!(group.info.sequence, 5);
714	}
715
716	#[test]
717	fn append_finish_cannot_be_rewritten() {
718		let mut producer = Track::new("test").produce();
719
720		// Finishing an empty track is valid (fin = 0, total groups = 0).
721		assert!(producer.finish().is_ok());
722		assert!(producer.finish().is_err());
723		assert!(producer.append_group().is_err());
724	}
725
726	#[test]
727	fn finish_after_groups() {
728		let mut producer = Track::new("test").produce();
729
730		producer.append_group().unwrap();
731		assert!(producer.finish().is_ok());
732		assert!(producer.finish().is_err());
733		assert!(producer.append_group().is_err());
734	}
735
736	#[test]
737	fn insert_finish_validates_sequence_and_freezes_to_max() {
738		let mut producer = Track::new("test").produce();
739		producer.create_group(Group { sequence: 5 }).unwrap();
740
741		assert!(producer.finish_at(4).is_err());
742		assert!(producer.finish_at(10).is_err());
743		assert!(producer.finish_at(5).is_ok());
744
745		{
746			let state = producer.state.read();
747			assert_eq!(state.final_sequence, Some(6));
748		}
749
750		assert!(producer.finish_at(5).is_err());
751		assert!(producer.create_group(Group { sequence: 4 }).is_ok());
752		assert!(producer.create_group(Group { sequence: 5 }).is_err());
753	}
754
755	#[tokio::test]
756	async fn next_group_finishes_without_waiting_for_gaps() {
757		let mut producer = Track::new("test").produce();
758		producer.create_group(Group { sequence: 1 }).unwrap();
759		producer.finish_at(1).unwrap();
760
761		let mut consumer = producer.consume();
762		assert_eq!(consumer.assert_group().info.sequence, 1);
763
764		let done = consumer
765			.next_group()
766			.now_or_never()
767			.expect("should not block")
768			.expect("would have errored");
769		assert!(done.is_none(), "track should finish without waiting for gaps");
770	}
771
772	#[tokio::test]
773	async fn get_group_finishes_without_waiting_for_gaps() {
774		let mut producer = Track::new("test").produce();
775		producer.create_group(Group { sequence: 1 }).unwrap();
776		producer.finish_at(1).unwrap();
777
778		let consumer = producer.consume();
779		// get_group(0) blocks because group 0 is below final_sequence and could still arrive.
780		assert!(
781			consumer.get_group(0).now_or_never().is_none(),
782			"sequence below fin should block (group could still arrive)"
783		);
784		assert!(
785			consumer
786				.get_group(2)
787				.now_or_never()
788				.expect("sequence at-or-after fin should resolve")
789				.expect("should not error")
790				.is_none(),
791			"sequence at-or-after fin should not exist"
792		);
793	}
794
795	#[test]
796	fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
797		let mut producer = Track::new("test").produce();
798		{
799			let mut state = producer.state.write().ok().unwrap();
800			state.max_sequence = Some(u64::MAX);
801		}
802
803		assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded)));
804	}
805}