Skip to main content

moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequence number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Transport is to not block on them.
10//! Streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new streams going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20	collections::{HashSet, VecDeque},
21	task::{Poll, ready},
22	time::Duration,
23};
24
25/// Groups older than this are evicted from the track cache (unless they are the max_sequence group).
26// TODO: Replace with a configurable cache size.
27const MAX_GROUP_AGE: Duration = Duration::from_secs(30);
28
29/// A track is a collection of groups, delivered out-of-order until expired.
30#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33	/// Identifier within a broadcast. Unique per [`crate::Broadcast`].
34	pub name: String,
35	/// Delivery priority. Higher values preempt lower ones when bandwidth is constrained.
36	pub priority: u8,
37}
38
39impl Track {
40	/// Create a track with the given name and default priority (`0`).
41	pub fn new<T: Into<String>>(name: T) -> Self {
42		Self {
43			name: name.into(),
44			priority: 0,
45		}
46	}
47
48	/// Consume this [`Track`] to create a producer that owns its metadata.
49	pub fn produce(self) -> TrackProducer {
50		TrackProducer::new(self)
51	}
52}
53
54#[derive(Default)]
55struct State {
56	/// Groups in arrival order. `None` entries are tombstones for evicted groups.
57	groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
58	duplicates: HashSet<u64>,
59	offset: usize,
60	max_sequence: Option<u64>,
61	final_sequence: Option<u64>,
62	abort: Option<Error>,
63}
64
65impl State {
66	/// Find the next non-tombstoned group at or after `index` in arrival order.
67	///
68	/// Returns the group and its absolute index so the consumer can advance past it.
69	fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
70		let start = index.saturating_sub(self.offset);
71		for (i, slot) in self.groups.iter().enumerate().skip(start) {
72			if let Some((group, _)) = slot
73				&& group.sequence >= min_sequence
74			{
75				return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
76			}
77		}
78
79		// TODO once we have drop notifications, check if index == final_sequence.
80		if self.final_sequence.is_some() {
81			Poll::Ready(Ok(None))
82		} else if let Some(err) = &self.abort {
83			Poll::Ready(Err(err.clone()))
84		} else {
85			Poll::Pending
86		}
87	}
88
89	/// Scan groups at or after `index` in arrival order, looking for the first with sequence
90	/// `>= next_sequence` that has a fully-buffered next frame. Returns the frame plus the
91	/// winning slot's absolute index and sequence so the consumer can advance past it.
92	fn poll_read_frame(
93		&self,
94		index: usize,
95		next_sequence: u64,
96		waiter: &conducer::Waiter,
97	) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
98		let start = index.saturating_sub(self.offset);
99		let mut pending_seen = false;
100		for (i, slot) in self.groups.iter().enumerate().skip(start) {
101			let Some((group, _)) = slot else { continue };
102			if group.sequence < next_sequence {
103				continue;
104			}
105
106			let mut consumer = group.consume();
107			match consumer.poll_read_frame(waiter) {
108				Poll::Ready(Ok(Some(frame))) => {
109					return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence))));
110				}
111				Poll::Ready(Ok(None)) => continue,
112				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
113				Poll::Pending => {
114					pending_seen = true;
115					continue;
116				}
117			}
118		}
119
120		// A pending group can still produce a frame even after finish() — finish only
121		// blocks new groups at/above final_sequence, not frames on existing groups.
122		if pending_seen {
123			Poll::Pending
124		} else if self.final_sequence.is_some() {
125			Poll::Ready(Ok(None))
126		} else if let Some(err) = &self.abort {
127			Poll::Ready(Err(err.clone()))
128		} else {
129			Poll::Pending
130		}
131	}
132
133	fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
134		// Search for the group with the matching sequence, skipping tombstones.
135		for (group, _) in self.groups.iter().flatten() {
136			if group.sequence == sequence {
137				return Poll::Ready(Ok(Some(group.consume())));
138			}
139		}
140
141		// Once final_sequence is set, groups at or past it can never exist.
142		if let Some(fin) = self.final_sequence
143			&& sequence >= fin
144		{
145			return Poll::Ready(Ok(None));
146		}
147
148		if let Some(err) = &self.abort {
149			return Poll::Ready(Err(err.clone()));
150		}
151
152		Poll::Pending
153	}
154
155	fn poll_closed(&self) -> Poll<Result<()>> {
156		if self.final_sequence.is_some() {
157			Poll::Ready(Ok(()))
158		} else if let Some(err) = &self.abort {
159			Poll::Ready(Err(err.clone()))
160		} else {
161			Poll::Pending
162		}
163	}
164
165	/// Evict groups older than MAX_GROUP_AGE, never evicting the max_sequence group.
166	///
167	/// Groups are in arrival order, so we can stop early when we hit a non-expired,
168	/// non-max_sequence group (everything after it arrived even later).
169	/// When max_sequence is at the front, we skip past it and tombstone expired groups
170	/// behind it.
171	fn evict_expired(&mut self, now: tokio::time::Instant) {
172		for slot in self.groups.iter_mut() {
173			let Some((group, created_at)) = slot else { continue };
174
175			if Some(group.sequence) == self.max_sequence {
176				continue;
177			}
178
179			if now.duration_since(*created_at) <= MAX_GROUP_AGE {
180				break;
181			}
182
183			self.duplicates.remove(&group.sequence);
184			*slot = None;
185		}
186
187		// Trim leading tombstones to advance the offset.
188		while let Some(None) = self.groups.front() {
189			self.groups.pop_front();
190			self.offset += 1;
191		}
192	}
193
194	fn poll_finished(&self) -> Poll<Result<u64>> {
195		if let Some(fin) = self.final_sequence {
196			Poll::Ready(Ok(fin))
197		} else if let Some(err) = &self.abort {
198			Poll::Ready(Err(err.clone()))
199		} else {
200			Poll::Pending
201		}
202	}
203}
204
205/// A producer for a track, used to create new groups.
206pub struct TrackProducer {
207	info: Track,
208	state: conducer::Producer<State>,
209}
210
211impl std::ops::Deref for TrackProducer {
212	type Target = Track;
213
214	fn deref(&self) -> &Self::Target {
215		&self.info
216	}
217}
218
219impl TrackProducer {
220	/// Build a producer for the given track metadata. Prefer [`Track::produce`].
221	pub fn new(info: Track) -> Self {
222		Self {
223			info,
224			state: conducer::Producer::default(),
225		}
226	}
227
228	/// Create a new group with the given sequence number.
229	pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
230		let group = info.produce();
231
232		let mut state = self.modify()?;
233		if let Some(fin) = state.final_sequence
234			&& group.sequence >= fin
235		{
236			return Err(Error::Closed);
237		}
238
239		if !state.duplicates.insert(group.sequence) {
240			return Err(Error::Duplicate);
241		}
242
243		let now = tokio::time::Instant::now();
244		state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence));
245		state.groups.push_back(Some((group.clone(), now)));
246		state.evict_expired(now);
247
248		Ok(group)
249	}
250
251	/// Create a new group with the next sequence number.
252	pub fn append_group(&mut self) -> Result<GroupProducer> {
253		let mut state = self.modify()?;
254		let sequence = match state.max_sequence {
255			Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
256			None => 0,
257		};
258		if let Some(fin) = state.final_sequence
259			&& sequence >= fin
260		{
261			return Err(Error::Closed);
262		}
263
264		let group = Group { sequence }.produce();
265
266		let now = tokio::time::Instant::now();
267		state.duplicates.insert(sequence);
268		state.max_sequence = Some(sequence);
269		state.groups.push_back(Some((group.clone(), now)));
270		state.evict_expired(now);
271
272		Ok(group)
273	}
274
275	/// Create a group with a single frame.
276	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
277		let mut group = self.append_group()?;
278		group.write_frame(frame.into())?;
279		group.finish()?;
280		Ok(())
281	}
282
283	/// Mark the track as finished after the last appended group.
284	///
285	/// Sets the final sequence to one past the current max_sequence.
286	/// No new groups at or above this sequence can be appended.
287	/// NOTE: Old groups with lower sequence numbers can still arrive.
288	pub fn finish(&mut self) -> Result<()> {
289		let mut state = self.modify()?;
290		if state.final_sequence.is_some() {
291			return Err(Error::Closed);
292		}
293		state.final_sequence = Some(match state.max_sequence {
294			Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
295			None => 0,
296		});
297		Ok(())
298	}
299
300	/// Mark the track as finished at an exact final sequence.
301	///
302	/// The caller must pass the current max_sequence exactly.
303	/// Freezes the final boundary at one past the current max_sequence.
304	/// No new groups at or above that sequence can be created.
305	/// NOTE: Old groups with lower sequence numbers can still arrive.
306	pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
307		let mut state = self.modify()?;
308		let max = state.max_sequence.ok_or(Error::Closed)?;
309		if state.final_sequence.is_some() || sequence != max {
310			return Err(Error::Closed);
311		}
312		state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
313		Ok(())
314	}
315
316	/// Abort the track with the given error.
317	///
318	/// Child groups are independent and must be aborted separately if desired;
319	/// existing group consumers can still finish reading any groups that were
320	/// already created.
321	pub fn abort(&mut self, err: Error) -> Result<()> {
322		let mut guard = self.modify()?;
323		guard.abort = Some(err);
324		guard.close();
325		Ok(())
326	}
327
328	/// Create a new consumer for the track, starting at the beginning.
329	pub fn consume(&self) -> TrackConsumer {
330		TrackConsumer {
331			info: self.info.clone(),
332			state: self.state.consume(),
333			index: 0,
334			min_sequence: 0,
335			next_sequence: 0,
336		}
337	}
338
339	/// Block until there are no active consumers.
340	pub async fn unused(&self) -> Result<()> {
341		self.state
342			.unused()
343			.await
344			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
345	}
346
347	/// Block until there is at least one active consumer.
348	pub async fn used(&self) -> Result<()> {
349		self.state
350			.used()
351			.await
352			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
353	}
354
355	/// Block until the track is closed or aborted, returning the cause.
356	pub async fn closed(&self) -> Error {
357		self.state.closed().await;
358		self.state.read().abort.clone().unwrap_or(Error::Dropped)
359	}
360
361	/// Return true if the track has been closed.
362	pub fn is_closed(&self) -> bool {
363		self.state.read().is_closed()
364	}
365
366	/// Return true if this is the same track.
367	pub fn is_clone(&self, other: &Self) -> bool {
368		self.state.same_channel(&other.state)
369	}
370
371	/// Create a weak reference that doesn't prevent auto-close.
372	pub(crate) fn weak(&self) -> TrackWeak {
373		TrackWeak {
374			info: self.info.clone(),
375			state: self.state.weak(),
376		}
377	}
378
379	fn modify(&self) -> Result<conducer::Mut<'_, State>> {
380		self.state
381			.write()
382			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
383	}
384}
385
386impl Clone for TrackProducer {
387	fn clone(&self) -> Self {
388		Self {
389			info: self.info.clone(),
390			state: self.state.clone(),
391		}
392	}
393}
394
395impl From<Track> for TrackProducer {
396	fn from(info: Track) -> Self {
397		TrackProducer::new(info)
398	}
399}
400
401/// A weak reference to a track that doesn't prevent auto-close.
402#[derive(Clone)]
403pub(crate) struct TrackWeak {
404	pub(crate) info: Track,
405	state: conducer::Weak<State>,
406}
407
408impl TrackWeak {
409	pub fn is_closed(&self) -> bool {
410		self.state.is_closed()
411	}
412
413	pub fn consume(&self) -> TrackConsumer {
414		TrackConsumer {
415			info: self.info.clone(),
416			state: self.state.consume(),
417			index: 0,
418			min_sequence: 0,
419			next_sequence: 0,
420		}
421	}
422
423	pub async fn unused(&self) -> crate::Result<()> {
424		self.state
425			.unused()
426			.await
427			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
428	}
429
430	pub fn is_clone(&self, other: &Self) -> bool {
431		self.state.same_channel(&other.state)
432	}
433}
434
435/// A consumer for a track, used to read groups.
436#[derive(Clone)]
437pub struct TrackConsumer {
438	info: Track,
439	state: conducer::Consumer<State>,
440	/// Arrival-order cursor used by [`Self::recv_group`].
441	index: usize,
442	/// Minimum sequence to return from any `recv` method. Set by [`Self::start_at`].
443	min_sequence: u64,
444	/// One past the highest sequence returned by [`Self::next_group`].
445	/// Used only by that method to skip late arrivals; does not affect [`Self::recv_group`].
446	next_sequence: u64,
447}
448
449impl std::ops::Deref for TrackConsumer {
450	type Target = Track;
451
452	fn deref(&self) -> &Self::Target {
453		&self.info
454	}
455}
456
457impl TrackConsumer {
458	// A helper to automatically apply Dropped if the state is closed without an error.
459	fn poll<F, R>(&self, waiter: &conducer::Waiter, f: F) -> Poll<Result<R>>
460	where
461		F: Fn(&conducer::Ref<'_, State>) -> Poll<Result<R>>,
462	{
463		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
464			Ok(res) => res,
465			// We try to clone abort just in case the function forgot to check for terminal state.
466			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
467		})
468	}
469
470	/// Poll for the next group in arrival order, without blocking.
471	///
472	/// Returns every group exactly once in the order it landed on the wire — which may be
473	/// out of sequence due to network reordering or loss. Use [`Self::poll_next_group`] if
474	/// you only want groups whose sequence number is higher than any previously returned.
475	///
476	/// Returns `Poll::Ready(Ok(Some(group)))` when a group is available,
477	/// `Poll::Ready(Ok(None))` when the track is finished,
478	/// `Poll::Ready(Err(e))` when the track has been aborted, or
479	/// `Poll::Pending` when no group is available yet.
480	pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
481		let Some((consumer, found_index)) =
482			ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
483		else {
484			return Poll::Ready(Ok(None));
485		};
486
487		self.index = found_index + 1;
488		Poll::Ready(Ok(Some(consumer)))
489	}
490
491	/// Receive the next group in arrival order.
492	///
493	/// Every group is returned exactly once, in the order it landed on the wire — which may
494	/// be out of sequence due to network reordering or loss. Use [`Self::next_group`] if you
495	/// only want groups whose sequence number is higher than any previously returned.
496	pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
497		conducer::wait(|waiter| self.poll_recv_group(waiter)).await
498	}
499
500	/// Poll for the next group with a higher sequence number than any previously returned.
501	///
502	/// Late arrivals (sequence at or below the last returned) are silently skipped, so this
503	/// produces a monotonically increasing sequence at the cost of dropping out-of-order
504	/// groups. Use [`Self::poll_recv_group`] to see every group in arrival order instead.
505	pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
506		loop {
507			let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
508				return Poll::Ready(Ok(None));
509			};
510			if group.sequence < self.next_sequence {
511				// Late arrival; discard and keep looking.
512				continue;
513			}
514			self.next_sequence = group.sequence.saturating_add(1);
515			return Poll::Ready(Ok(Some(group)));
516		}
517	}
518
519	/// Return the next group with a higher sequence number than any previously returned.
520	///
521	/// Late arrivals (sequence at or below the last returned) are silently skipped, so this
522	/// produces a monotonically increasing sequence at the cost of dropping out-of-order
523	/// groups. Use [`Self::recv_group`] to see every group in arrival order instead.
524	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
525		conducer::wait(|waiter| self.poll_next_group(waiter)).await
526	}
527
528	/// A helper that calls [`Self::poll_next_group`] and returns its first frame,
529	/// skipping the rest of the group. Intended for single-frame groups (see
530	/// [`TrackProducer::write_frame`]).
531	pub fn poll_read_frame(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
532		let lower = self.min_sequence.max(self.next_sequence);
533		let Some((frame, found_index, sequence)) =
534			ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
535		else {
536			return Poll::Ready(Ok(None));
537		};
538
539		self.index = found_index + 1;
540		self.next_sequence = sequence.saturating_add(1);
541		Poll::Ready(Ok(Some(frame)))
542	}
543
544	/// Read a single full frame from the next group in sequence order.
545	///
546	/// See [`Self::poll_read_frame`] for semantics.
547	pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
548		conducer::wait(|waiter| self.poll_read_frame(waiter)).await
549	}
550
551	/// Poll for the group with the given sequence, without blocking.
552	pub fn poll_get_group(&self, waiter: &conducer::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
553		self.poll(waiter, |state| state.poll_get_group(sequence))
554	}
555
556	/// Wait until the group with the given sequence becomes available.
557	///
558	/// Resolves to `Some(GroupConsumer)` once the group is in the cache.
559	/// Resolves to `None` only when `sequence` is at or past the track's
560	/// `final_sequence` (set by `finish()` / `finish_at()`), since such a
561	/// group can never be produced. Sequences below `final_sequence` still
562	/// wait, since older groups may still arrive out of order.
563	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
564		conducer::wait(|waiter| self.poll_get_group(waiter, sequence)).await
565	}
566
567	/// Poll for track closure, without blocking.
568	pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<Result<()>> {
569		self.poll(waiter, |state| state.poll_closed())
570	}
571
572	/// Block until the track is closed.
573	///
574	/// Returns Ok() is the track was cleanly finished.
575	pub async fn closed(&self) -> Result<()> {
576		conducer::wait(|waiter| self.poll_closed(waiter)).await
577	}
578
579	/// Whether `other` was cloned from this consumer (shares the same underlying state).
580	pub fn is_clone(&self, other: &Self) -> bool {
581		self.state.same_channel(&other.state)
582	}
583
584	/// Poll for the total number of groups in the track.
585	pub fn poll_finished(&mut self, waiter: &conducer::Waiter) -> Poll<Result<u64>> {
586		self.poll(waiter, |state| state.poll_finished())
587	}
588
589	/// Block until the track is finished, returning the total number of groups.
590	pub async fn finished(&mut self) -> Result<u64> {
591		conducer::wait(|waiter| self.poll_finished(waiter)).await
592	}
593
594	/// Start the consumer at the specified sequence.
595	pub fn start_at(&mut self, sequence: u64) {
596		self.min_sequence = sequence;
597	}
598
599	/// Return the latest sequence number in the track.
600	pub fn latest(&self) -> Option<u64> {
601		self.state.read().max_sequence
602	}
603
604	/// Create a weak reference that doesn't prevent auto-close.
605	pub(crate) fn weak(&self) -> TrackWeak {
606		TrackWeak {
607			info: self.info.clone(),
608			state: self.state.weak(),
609		}
610	}
611}
612
613#[cfg(test)]
614use futures::FutureExt;
615
616#[cfg(test)]
617impl TrackConsumer {
618	pub fn assert_group(&mut self) -> GroupConsumer {
619		self.recv_group()
620			.now_or_never()
621			.expect("group would have blocked")
622			.expect("would have errored")
623			.expect("track was closed")
624	}
625
626	pub fn assert_no_group(&mut self) {
627		assert!(
628			self.recv_group().now_or_never().is_none(),
629			"recv_group would not have blocked"
630		);
631	}
632
633	pub fn assert_not_closed(&self) {
634		assert!(self.closed().now_or_never().is_none(), "should not be closed");
635	}
636
637	pub fn assert_closed(&self) {
638		assert!(self.closed().now_or_never().is_some(), "should be closed");
639	}
640
641	// TODO assert specific errors after implementing PartialEq
642	pub fn assert_error(&self) {
643		assert!(
644			self.closed().now_or_never().expect("should not block").is_err(),
645			"should be error"
646		);
647	}
648
649	pub fn assert_is_clone(&self, other: &Self) {
650		assert!(self.is_clone(other), "should be clone");
651	}
652
653	pub fn assert_not_clone(&self, other: &Self) {
654		assert!(!self.is_clone(other), "should not be clone");
655	}
656}
657
658#[cfg(test)]
659mod test {
660	use super::*;
661
662	/// Helper: count non-tombstoned groups in state.
663	fn live_groups(state: &State) -> usize {
664		state.groups.iter().flatten().count()
665	}
666
667	/// Helper: get the sequence number of the first live group.
668	fn first_live_sequence(state: &State) -> u64 {
669		state.groups.iter().flatten().next().unwrap().0.sequence
670	}
671
672	#[tokio::test]
673	async fn evict_expired_groups() {
674		tokio::time::pause();
675
676		let mut producer = Track::new("test").produce();
677
678		// Create 3 groups at time 0.
679		producer.append_group().unwrap(); // seq 0
680		producer.append_group().unwrap(); // seq 1
681		producer.append_group().unwrap(); // seq 2
682
683		{
684			let state = producer.state.read();
685			assert_eq!(live_groups(&state), 3);
686			assert_eq!(state.offset, 0);
687		}
688
689		// Advance time past the eviction threshold.
690		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
691
692		// Append a new group to trigger eviction.
693		producer.append_group().unwrap(); // seq 3
694
695		// Groups 0, 1, 2 are expired but seq 3 (max_sequence) is kept.
696		// Leading tombstones are trimmed, so only seq 3 remains.
697		{
698			let state = producer.state.read();
699			assert_eq!(live_groups(&state), 1);
700			assert_eq!(first_live_sequence(&state), 3);
701			assert_eq!(state.offset, 3);
702			assert!(!state.duplicates.contains(&0));
703			assert!(!state.duplicates.contains(&1));
704			assert!(!state.duplicates.contains(&2));
705			assert!(state.duplicates.contains(&3));
706		}
707	}
708
709	#[tokio::test]
710	async fn evict_keeps_max_sequence() {
711		tokio::time::pause();
712
713		let mut producer = Track::new("test").produce();
714		producer.append_group().unwrap(); // seq 0
715
716		// Advance time past threshold.
717		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
718
719		// Append another group; seq 0 is expired and evicted.
720		producer.append_group().unwrap(); // seq 1
721
722		{
723			let state = producer.state.read();
724			assert_eq!(live_groups(&state), 1);
725			assert_eq!(first_live_sequence(&state), 1);
726			assert_eq!(state.offset, 1);
727		}
728	}
729
730	#[tokio::test]
731	async fn no_eviction_when_fresh() {
732		tokio::time::pause();
733
734		let mut producer = Track::new("test").produce();
735		producer.append_group().unwrap(); // seq 0
736		producer.append_group().unwrap(); // seq 1
737		producer.append_group().unwrap(); // seq 2
738
739		{
740			let state = producer.state.read();
741			assert_eq!(live_groups(&state), 3);
742			assert_eq!(state.offset, 0);
743		}
744	}
745
746	#[tokio::test]
747	async fn consumer_skips_evicted_groups() {
748		tokio::time::pause();
749
750		let mut producer = Track::new("test").produce();
751		producer.append_group().unwrap(); // seq 0
752
753		let mut consumer = producer.consume();
754
755		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
756		producer.append_group().unwrap(); // seq 1
757
758		// Group 0 was evicted. Consumer should get group 1.
759		let group = consumer.assert_group();
760		assert_eq!(group.sequence, 1);
761	}
762
763	#[tokio::test]
764	async fn out_of_order_max_sequence_at_front() {
765		tokio::time::pause();
766
767		let mut producer = Track::new("test").produce();
768
769		// Arrive out of order: seq 5 first, then 3, then 4.
770		producer.create_group(Group { sequence: 5 }).unwrap();
771		producer.create_group(Group { sequence: 3 }).unwrap();
772		producer.create_group(Group { sequence: 4 }).unwrap();
773
774		// max_sequence = 5, which is at the front of the VecDeque.
775		{
776			let state = producer.state.read();
777			assert_eq!(state.max_sequence, Some(5));
778		}
779
780		// Expire all three groups.
781		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
782
783		// Append seq 6 (becomes new max_sequence).
784		producer.append_group().unwrap(); // seq 6
785
786		// Seq 3, 4, 5 are all expired. Seq 5 was the old max_sequence but now 6 is.
787		// All old groups are evicted.
788		{
789			let state = producer.state.read();
790			assert_eq!(live_groups(&state), 1);
791			assert_eq!(first_live_sequence(&state), 6);
792			assert!(!state.duplicates.contains(&3));
793			assert!(!state.duplicates.contains(&4));
794			assert!(!state.duplicates.contains(&5));
795			assert!(state.duplicates.contains(&6));
796		}
797	}
798
799	#[tokio::test]
800	async fn max_sequence_at_front_blocks_trim() {
801		tokio::time::pause();
802
803		let mut producer = Track::new("test").produce();
804
805		// Arrive: seq 5, then seq 3.
806		producer.create_group(Group { sequence: 5 }).unwrap();
807
808		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
809
810		// Seq 3 arrives late; max_sequence is still 5 (at front).
811		producer.create_group(Group { sequence: 3 }).unwrap();
812
813		// Seq 5 is max_sequence (protected). Seq 3 is not expired (just created).
814		// Nothing should be evicted.
815		{
816			let state = producer.state.read();
817			assert_eq!(live_groups(&state), 2);
818			assert_eq!(state.offset, 0);
819		}
820
821		// Expire seq 3 as well.
822		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
823
824		// Seq 2 arrives late, triggering eviction.
825		producer.create_group(Group { sequence: 2 }).unwrap();
826
827		// Seq 5 is still max_sequence (protected, at front, blocks trim).
828		// Seq 3 is expired → tombstoned.
829		// Seq 2 is fresh → kept.
830		// VecDeque: [Some(5), None, Some(2)]. Leading entry is Some, so offset stays.
831		{
832			let state = producer.state.read();
833			assert_eq!(live_groups(&state), 2);
834			assert_eq!(state.offset, 0);
835			assert!(state.duplicates.contains(&5));
836			assert!(!state.duplicates.contains(&3));
837			assert!(state.duplicates.contains(&2));
838		}
839
840		// Consumer should still be able to read through the hole.
841		let mut consumer = producer.consume();
842		let group = consumer.assert_group();
843		// consume() starts at index 0, first non-tombstoned group is seq 5.
844		assert_eq!(group.sequence, 5);
845	}
846
847	#[test]
848	fn append_finish_cannot_be_rewritten() {
849		let mut producer = Track::new("test").produce();
850
851		// Finishing an empty track is valid (fin = 0, total groups = 0).
852		assert!(producer.finish().is_ok());
853		assert!(producer.finish().is_err());
854		assert!(producer.append_group().is_err());
855	}
856
857	#[test]
858	fn finish_after_groups() {
859		let mut producer = Track::new("test").produce();
860
861		producer.append_group().unwrap();
862		assert!(producer.finish().is_ok());
863		assert!(producer.finish().is_err());
864		assert!(producer.append_group().is_err());
865	}
866
867	#[test]
868	fn insert_finish_validates_sequence_and_freezes_to_max() {
869		let mut producer = Track::new("test").produce();
870		producer.create_group(Group { sequence: 5 }).unwrap();
871
872		assert!(producer.finish_at(4).is_err());
873		assert!(producer.finish_at(10).is_err());
874		assert!(producer.finish_at(5).is_ok());
875
876		{
877			let state = producer.state.read();
878			assert_eq!(state.final_sequence, Some(6));
879		}
880
881		assert!(producer.finish_at(5).is_err());
882		assert!(producer.create_group(Group { sequence: 4 }).is_ok());
883		assert!(producer.create_group(Group { sequence: 5 }).is_err());
884	}
885
886	#[tokio::test]
887	async fn recv_group_finishes_without_waiting_for_gaps() {
888		let mut producer = Track::new("test").produce();
889		producer.create_group(Group { sequence: 1 }).unwrap();
890		producer.finish_at(1).unwrap();
891
892		let mut consumer = producer.consume();
893		assert_eq!(consumer.assert_group().sequence, 1);
894
895		let done = consumer
896			.recv_group()
897			.now_or_never()
898			.expect("should not block")
899			.expect("would have errored");
900		assert!(done.is_none(), "track should finish without waiting for gaps");
901	}
902
903	#[tokio::test]
904	async fn next_group_skips_late_arrivals() {
905		let mut producer = Track::new("test").produce();
906		let mut consumer = producer.consume();
907
908		// Seq 5 arrives first.
909		producer.create_group(Group { sequence: 5 }).unwrap();
910		let group = consumer
911			.next_group()
912			.now_or_never()
913			.expect("should not block")
914			.expect("would have errored")
915			.expect("track should not be closed");
916		assert_eq!(group.sequence, 5);
917
918		// Seq 3 arrives late — skipped because 3 <= 5.
919		producer.create_group(Group { sequence: 3 }).unwrap();
920		// Seq 4 arrives late — also skipped.
921		producer.create_group(Group { sequence: 4 }).unwrap();
922		// Seq 7 arrives — returned.
923		producer.create_group(Group { sequence: 7 }).unwrap();
924
925		let group = consumer
926			.next_group()
927			.now_or_never()
928			.expect("should not block")
929			.expect("would have errored")
930			.expect("track should not be closed");
931		assert_eq!(group.sequence, 7);
932
933		// No more groups — would block.
934		assert!(
935			consumer.next_group().now_or_never().is_none(),
936			"should block waiting for a higher sequence"
937		);
938	}
939
940	#[tokio::test]
941	async fn next_group_returns_arrivals_in_order() {
942		let mut producer = Track::new("test").produce();
943		let mut consumer = producer.consume();
944
945		// Seq 3 arrives first, then seq 5 — both should be returned in arrival order.
946		producer.create_group(Group { sequence: 3 }).unwrap();
947		producer.create_group(Group { sequence: 5 }).unwrap();
948
949		let group = consumer
950			.next_group()
951			.now_or_never()
952			.expect("should not block")
953			.expect("would have errored")
954			.expect("track should not be closed");
955		assert_eq!(group.sequence, 3);
956
957		let group = consumer
958			.next_group()
959			.now_or_never()
960			.expect("should not block")
961			.expect("would have errored")
962			.expect("track should not be closed");
963		assert_eq!(group.sequence, 5);
964	}
965
966	#[tokio::test]
967	async fn recv_group_after_next_group_sees_late_arrivals() {
968		let mut producer = Track::new("test").produce();
969		let mut consumer = producer.consume();
970
971		producer.create_group(Group { sequence: 5 }).unwrap();
972		producer.create_group(Group { sequence: 3 }).unwrap();
973
974		// Ordered returns seq 5 and advances its internal cursor past it.
975		let group = consumer
976			.next_group()
977			.now_or_never()
978			.expect("should not block")
979			.expect("would have errored")
980			.expect("track should not be closed");
981		assert_eq!(group.sequence, 5);
982
983		// Intermixing: recv_group on the same consumer still returns the late seq 3.
984		// The ordered cursor is separate from the recv_group filter.
985		assert_eq!(consumer.assert_group().sequence, 3);
986	}
987
988	#[tokio::test]
989	async fn read_frame_returns_single_frame_per_group() {
990		let mut producer = Track::new("test").produce();
991		let mut consumer = producer.consume();
992
993		producer.write_frame(b"hello".as_slice()).unwrap();
994		producer.write_frame(b"world".as_slice()).unwrap();
995
996		let frame = consumer
997			.read_frame()
998			.now_or_never()
999			.expect("should not block")
1000			.expect("would have errored")
1001			.expect("track should not be closed");
1002		assert_eq!(&frame[..], b"hello");
1003
1004		let frame = consumer
1005			.read_frame()
1006			.now_or_never()
1007			.expect("should not block")
1008			.expect("would have errored")
1009			.expect("track should not be closed");
1010		assert_eq!(&frame[..], b"world");
1011	}
1012
1013	#[tokio::test]
1014	async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1015		let mut producer = Track::new("test").produce();
1016		let mut consumer = producer.consume();
1017
1018		// Seq 3: group open, no frame yet (stalled).
1019		let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1020		// Seq 5: fully-written group with a frame.
1021		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1022		g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1023		g5.finish().unwrap();
1024
1025		// read_frame should not block on the stalled seq 3 — it returns seq 5's frame.
1026		let frame = consumer
1027			.read_frame()
1028			.now_or_never()
1029			.expect("should not block on stalled earlier group")
1030			.expect("would have errored")
1031			.expect("track should not be closed");
1032		assert_eq!(&frame[..], b"later");
1033	}
1034
1035	#[tokio::test]
1036	async fn read_frame_discards_rest_of_multi_frame_group() {
1037		let mut producer = Track::new("test").produce();
1038		let mut consumer = producer.consume();
1039
1040		// Group 0 has two frames; only the first is returned.
1041		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1042		g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1043		g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1044		g0.finish().unwrap();
1045
1046		// Group 1 is a normal single-frame group.
1047		producer.write_frame(b"next".as_slice()).unwrap();
1048
1049		let frame = consumer
1050			.read_frame()
1051			.now_or_never()
1052			.expect("should not block")
1053			.expect("would have errored")
1054			.expect("track should not be closed");
1055		assert_eq!(&frame[..], b"one");
1056
1057		// The second frame of group 0 is discarded; the next read jumps to group 1.
1058		let frame = consumer
1059			.read_frame()
1060			.now_or_never()
1061			.expect("should not block")
1062			.expect("would have errored")
1063			.expect("track should not be closed");
1064		assert_eq!(&frame[..], b"next");
1065	}
1066
1067	#[tokio::test]
1068	async fn read_frame_waits_for_pending_group_after_finish() {
1069		// finish() sets final_sequence, but groups already created with lower sequences
1070		// can still produce frames. read_frame must not return None prematurely.
1071		let mut producer = Track::new("test").produce();
1072		let mut consumer = producer.consume();
1073
1074		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1075		producer.finish().unwrap();
1076
1077		// Track is finished but group 0 has no frame yet — must block, not return None.
1078		assert!(
1079			consumer.read_frame().now_or_never().is_none(),
1080			"read_frame must block on a pending group even after finish()"
1081		);
1082
1083		// A late frame on the pending group is still delivered.
1084		g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1085		let frame = consumer
1086			.read_frame()
1087			.now_or_never()
1088			.expect("should not block once a frame is written")
1089			.expect("would have errored")
1090			.expect("track should not be closed");
1091		assert_eq!(&frame[..], b"late");
1092	}
1093
1094	#[tokio::test]
1095	async fn read_frame_respects_start_at() {
1096		// start_at sets min_sequence; read_frame must skip groups below it even though
1097		// next_sequence is still 0.
1098		let mut producer = Track::new("test").produce();
1099		let mut consumer = producer.consume();
1100		consumer.start_at(5);
1101
1102		// Seq 3 has a frame but is below min_sequence — must be skipped.
1103		let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1104		g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1105		g3.finish().unwrap();
1106
1107		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1108		g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1109		g5.finish().unwrap();
1110
1111		let frame = consumer
1112			.read_frame()
1113			.now_or_never()
1114			.expect("should not block")
1115			.expect("would have errored")
1116			.expect("track should not be closed");
1117		assert_eq!(&frame[..], b"keep");
1118	}
1119
1120	#[tokio::test]
1121	async fn read_frame_returns_none_when_finished() {
1122		let mut producer = Track::new("test").produce();
1123		let mut consumer = producer.consume();
1124
1125		producer.write_frame(b"only".as_slice()).unwrap();
1126		producer.finish().unwrap();
1127
1128		let frame = consumer
1129			.read_frame()
1130			.now_or_never()
1131			.expect("should not block")
1132			.expect("would have errored")
1133			.expect("track should not be closed");
1134		assert_eq!(&frame[..], b"only");
1135
1136		let done = consumer
1137			.read_frame()
1138			.now_or_never()
1139			.expect("should not block")
1140			.expect("would have errored");
1141		assert!(done.is_none());
1142	}
1143
1144	#[tokio::test]
1145	async fn get_group_finishes_without_waiting_for_gaps() {
1146		let mut producer = Track::new("test").produce();
1147		producer.create_group(Group { sequence: 1 }).unwrap();
1148		producer.finish_at(1).unwrap();
1149
1150		let consumer = producer.consume();
1151		// get_group(0) blocks because group 0 is below final_sequence and could still arrive.
1152		assert!(
1153			consumer.get_group(0).now_or_never().is_none(),
1154			"sequence below fin should block (group could still arrive)"
1155		);
1156		assert!(
1157			consumer
1158				.get_group(2)
1159				.now_or_never()
1160				.expect("sequence at-or-after fin should resolve")
1161				.expect("should not error")
1162				.is_none(),
1163			"sequence at-or-after fin should not exist"
1164		);
1165	}
1166
1167	#[test]
1168	fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1169		let mut producer = Track::new("test").produce();
1170		{
1171			let mut state = producer.state.write().ok().unwrap();
1172			state.max_sequence = Some(u64::MAX);
1173		}
1174
1175		assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1176	}
1177}