Skip to main content

moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20	collections::{HashSet, VecDeque},
21	task::{Poll, ready},
22	time::Duration,
23};
24
25/// Groups older than this are evicted from the track cache (unless they are the max_sequence group).
26// TODO: Replace with a configurable cache size.
27const MAX_GROUP_AGE: Duration = Duration::from_secs(30);
28
29/// A track is a collection of groups, delivered out-of-order until expired.
30#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33	pub name: String,
34	pub priority: u8,
35}
36
37impl Track {
38	pub fn new<T: Into<String>>(name: T) -> Self {
39		Self {
40			name: name.into(),
41			priority: 0,
42		}
43	}
44
45	pub fn produce(self) -> TrackProducer {
46		TrackProducer::new(self)
47	}
48}
49
50#[derive(Default)]
51struct State {
52	/// Groups in arrival order. `None` entries are tombstones for evicted groups.
53	groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
54	duplicates: HashSet<u64>,
55	offset: usize,
56	max_sequence: Option<u64>,
57	final_sequence: Option<u64>,
58	abort: Option<Error>,
59}
60
61impl State {
62	/// Find the next non-tombstoned group at or after `index` in arrival order.
63	///
64	/// Returns the group and its absolute index so the consumer can advance past it.
65	fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
66		let start = index.saturating_sub(self.offset);
67		for (i, slot) in self.groups.iter().enumerate().skip(start) {
68			if let Some((group, _)) = slot
69				&& group.info.sequence >= min_sequence
70			{
71				return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
72			}
73		}
74
75		// TODO once we have drop notifications, check if index == final_sequence.
76		if self.final_sequence.is_some() {
77			Poll::Ready(Ok(None))
78		} else if let Some(err) = &self.abort {
79			Poll::Ready(Err(err.clone()))
80		} else {
81			Poll::Pending
82		}
83	}
84
85	/// Scan groups at or after `index` in arrival order, looking for the first with sequence
86	/// `>= next_sequence` that has a fully-buffered next frame. Returns the frame plus the
87	/// winning slot's absolute index and sequence so the consumer can advance past it.
88	fn poll_read_frame(
89		&self,
90		index: usize,
91		next_sequence: u64,
92		waiter: &conducer::Waiter,
93	) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
94		let start = index.saturating_sub(self.offset);
95		let mut pending_seen = false;
96		for (i, slot) in self.groups.iter().enumerate().skip(start) {
97			let Some((group, _)) = slot else { continue };
98			if group.info.sequence < next_sequence {
99				continue;
100			}
101
102			let mut consumer = group.consume();
103			match consumer.poll_read_frame(waiter) {
104				Poll::Ready(Ok(Some(frame))) => {
105					return Poll::Ready(Ok(Some((frame, self.offset + i, group.info.sequence))));
106				}
107				Poll::Ready(Ok(None)) => continue,
108				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
109				Poll::Pending => {
110					pending_seen = true;
111					continue;
112				}
113			}
114		}
115
116		// A pending group can still produce a frame even after finish() — finish only
117		// blocks new groups at/above final_sequence, not frames on existing groups.
118		if pending_seen {
119			Poll::Pending
120		} else if self.final_sequence.is_some() {
121			Poll::Ready(Ok(None))
122		} else if let Some(err) = &self.abort {
123			Poll::Ready(Err(err.clone()))
124		} else {
125			Poll::Pending
126		}
127	}
128
129	fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
130		// Search for the group with the matching sequence, skipping tombstones.
131		for (group, _) in self.groups.iter().flatten() {
132			if group.info.sequence == sequence {
133				return Poll::Ready(Ok(Some(group.consume())));
134			}
135		}
136
137		// Once final_sequence is set, groups at or past it can never exist.
138		if let Some(fin) = self.final_sequence
139			&& sequence >= fin
140		{
141			return Poll::Ready(Ok(None));
142		}
143
144		if let Some(err) = &self.abort {
145			return Poll::Ready(Err(err.clone()));
146		}
147
148		Poll::Pending
149	}
150
151	fn poll_closed(&self) -> Poll<Result<()>> {
152		if self.final_sequence.is_some() {
153			Poll::Ready(Ok(()))
154		} else if let Some(err) = &self.abort {
155			Poll::Ready(Err(err.clone()))
156		} else {
157			Poll::Pending
158		}
159	}
160
161	/// Evict groups older than MAX_GROUP_AGE, never evicting the max_sequence group.
162	///
163	/// Groups are in arrival order, so we can stop early when we hit a non-expired,
164	/// non-max_sequence group (everything after it arrived even later).
165	/// When max_sequence is at the front, we skip past it and tombstone expired groups
166	/// behind it.
167	fn evict_expired(&mut self, now: tokio::time::Instant) {
168		for slot in self.groups.iter_mut() {
169			let Some((group, created_at)) = slot else { continue };
170
171			if Some(group.info.sequence) == self.max_sequence {
172				continue;
173			}
174
175			if now.duration_since(*created_at) <= MAX_GROUP_AGE {
176				break;
177			}
178
179			self.duplicates.remove(&group.info.sequence);
180			*slot = None;
181		}
182
183		// Trim leading tombstones to advance the offset.
184		while let Some(None) = self.groups.front() {
185			self.groups.pop_front();
186			self.offset += 1;
187		}
188	}
189
190	fn poll_finished(&self) -> Poll<Result<u64>> {
191		if let Some(fin) = self.final_sequence {
192			Poll::Ready(Ok(fin))
193		} else if let Some(err) = &self.abort {
194			Poll::Ready(Err(err.clone()))
195		} else {
196			Poll::Pending
197		}
198	}
199}
200
201/// A producer for a track, used to create new groups.
202pub struct TrackProducer {
203	pub info: Track,
204	state: conducer::Producer<State>,
205}
206
207impl TrackProducer {
208	pub fn new(info: Track) -> Self {
209		Self {
210			info,
211			state: conducer::Producer::default(),
212		}
213	}
214
215	/// Create a new group with the given sequence number.
216	pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
217		let group = info.produce();
218
219		let mut state = self.modify()?;
220		if let Some(fin) = state.final_sequence
221			&& group.info.sequence >= fin
222		{
223			return Err(Error::Closed);
224		}
225
226		if !state.duplicates.insert(group.info.sequence) {
227			return Err(Error::Duplicate);
228		}
229
230		let now = tokio::time::Instant::now();
231		state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
232		state.groups.push_back(Some((group.clone(), now)));
233		state.evict_expired(now);
234
235		Ok(group)
236	}
237
238	/// Create a new group with the next sequence number.
239	pub fn append_group(&mut self) -> Result<GroupProducer> {
240		let mut state = self.modify()?;
241		let sequence = match state.max_sequence {
242			Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
243			None => 0,
244		};
245		if let Some(fin) = state.final_sequence
246			&& sequence >= fin
247		{
248			return Err(Error::Closed);
249		}
250
251		let group = Group { sequence }.produce();
252
253		let now = tokio::time::Instant::now();
254		state.duplicates.insert(sequence);
255		state.max_sequence = Some(sequence);
256		state.groups.push_back(Some((group.clone(), now)));
257		state.evict_expired(now);
258
259		Ok(group)
260	}
261
262	/// Create a group with a single frame.
263	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
264		let mut group = self.append_group()?;
265		group.write_frame(frame.into())?;
266		group.finish()?;
267		Ok(())
268	}
269
270	/// Mark the track as finished after the last appended group.
271	///
272	/// Sets the final sequence to one past the current max_sequence.
273	/// No new groups at or above this sequence can be appended.
274	/// NOTE: Old groups with lower sequence numbers can still arrive.
275	pub fn finish(&mut self) -> Result<()> {
276		let mut state = self.modify()?;
277		if state.final_sequence.is_some() {
278			return Err(Error::Closed);
279		}
280		state.final_sequence = Some(match state.max_sequence {
281			Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
282			None => 0,
283		});
284		Ok(())
285	}
286
287	/// Mark the track as finished after the last appended group.
288	///
289	/// Deprecated: use [`Self::finish`] for this behavior, or
290	/// [`Self::finish_at`] to set an explicit final sequence.
291	#[deprecated(note = "use finish() or finish_at(sequence) instead")]
292	pub fn close(&mut self) -> Result<()> {
293		self.finish()
294	}
295
296	/// Mark the track as finished at an exact final sequence.
297	///
298	/// The caller must pass the current max_sequence exactly.
299	/// Freezes the final boundary at one past the current max_sequence.
300	/// No new groups at or above that sequence can be created.
301	/// NOTE: Old groups with lower sequence numbers can still arrive.
302	pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
303		let mut state = self.modify()?;
304		let max = state.max_sequence.ok_or(Error::Closed)?;
305		if state.final_sequence.is_some() || sequence != max {
306			return Err(Error::Closed);
307		}
308		state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
309		Ok(())
310	}
311
312	/// Abort the track with the given error.
313	pub fn abort(&mut self, err: Error) -> Result<()> {
314		let mut guard = self.modify()?;
315
316		// Abort all groups still in progress.
317		for (group, _) in guard.groups.iter_mut().flatten() {
318			// Ignore errors, we don't care if the group was already closed.
319			group.abort(err.clone()).ok();
320		}
321
322		guard.abort = Some(err);
323		guard.close();
324		Ok(())
325	}
326
327	/// Create a new consumer for the track, starting at the beginning.
328	pub fn consume(&self) -> TrackConsumer {
329		TrackConsumer {
330			info: self.info.clone(),
331			state: self.state.consume(),
332			index: 0,
333			min_sequence: 0,
334			next_sequence: 0,
335		}
336	}
337
338	/// Block until there are no active consumers.
339	pub async fn unused(&self) -> Result<()> {
340		self.state
341			.unused()
342			.await
343			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
344	}
345
346	/// Block until there is at least one active consumer.
347	pub async fn used(&self) -> Result<()> {
348		self.state
349			.used()
350			.await
351			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
352	}
353
354	/// Return true if the track has been closed.
355	pub fn is_closed(&self) -> bool {
356		self.state.read().is_closed()
357	}
358
359	/// Return true if this is the same track.
360	pub fn is_clone(&self, other: &Self) -> bool {
361		self.state.same_channel(&other.state)
362	}
363
364	/// Create a weak reference that doesn't prevent auto-close.
365	pub(crate) fn weak(&self) -> TrackWeak {
366		TrackWeak {
367			info: self.info.clone(),
368			state: self.state.weak(),
369		}
370	}
371
372	fn modify(&self) -> Result<conducer::Mut<'_, State>> {
373		self.state
374			.write()
375			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
376	}
377}
378
379impl Clone for TrackProducer {
380	fn clone(&self) -> Self {
381		Self {
382			info: self.info.clone(),
383			state: self.state.clone(),
384		}
385	}
386}
387
388impl From<Track> for TrackProducer {
389	fn from(info: Track) -> Self {
390		TrackProducer::new(info)
391	}
392}
393
394/// A weak reference to a track that doesn't prevent auto-close.
395#[derive(Clone)]
396pub(crate) struct TrackWeak {
397	pub info: Track,
398	state: conducer::Weak<State>,
399}
400
401impl TrackWeak {
402	pub fn abort(&self, err: Error) {
403		let Ok(mut guard) = self.state.write() else { return };
404
405		// Cascade abort to all groups.
406		for (group, _) in guard.groups.iter_mut().flatten() {
407			group.abort(err.clone()).ok();
408		}
409
410		guard.abort = Some(err);
411		guard.close();
412	}
413
414	pub fn is_closed(&self) -> bool {
415		self.state.is_closed()
416	}
417
418	pub fn consume(&self) -> TrackConsumer {
419		TrackConsumer {
420			info: self.info.clone(),
421			state: self.state.consume(),
422			index: 0,
423			min_sequence: 0,
424			next_sequence: 0,
425		}
426	}
427
428	pub async fn unused(&self) -> crate::Result<()> {
429		self.state
430			.unused()
431			.await
432			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
433	}
434
435	pub fn is_clone(&self, other: &Self) -> bool {
436		self.state.same_channel(&other.state)
437	}
438}
439
440/// A consumer for a track, used to read groups.
441#[derive(Clone)]
442pub struct TrackConsumer {
443	pub info: Track,
444	state: conducer::Consumer<State>,
445	/// Arrival-order cursor used by [`Self::recv_group`].
446	index: usize,
447	/// Minimum sequence to return from any `recv` method. Set by [`Self::start_at`].
448	min_sequence: u64,
449	/// One past the highest sequence returned by [`Self::next_group_ordered`].
450	/// Used only by that method to skip late arrivals; does not affect [`Self::recv_group`].
451	next_sequence: u64,
452}
453
454impl TrackConsumer {
455	// A helper to automatically apply Dropped if the state is closed without an error.
456	fn poll<F, R>(&self, waiter: &conducer::Waiter, f: F) -> Poll<Result<R>>
457	where
458		F: Fn(&conducer::Ref<'_, State>) -> Poll<Result<R>>,
459	{
460		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
461			Ok(res) => res,
462			// We try to clone abort just in case the function forgot to check for terminal state.
463			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
464		})
465	}
466
467	/// Poll for the next group received over the network, in arrival order, without blocking.
468	///
469	/// Groups may arrive out of order or with gaps due to network conditions.
470	/// Use [`Self::next_group_ordered`] if you need groups in sequence order,
471	/// skipping those that arrive too late.
472	///
473	/// Returns `Poll::Ready(Ok(Some(group)))` when a group is available,
474	/// `Poll::Ready(Ok(None))` when the track is finished,
475	/// `Poll::Ready(Err(e))` when the track has been aborted, or
476	/// `Poll::Pending` when no group is available yet.
477	pub fn poll_recv_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
478		let Some((consumer, found_index)) =
479			ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
480		else {
481			return Poll::Ready(Ok(None));
482		};
483
484		self.index = found_index + 1;
485		Poll::Ready(Ok(Some(consumer)))
486	}
487
488	/// Receive the next group available on this track, in arrival order.
489	///
490	/// Groups may arrive out of order or with gaps due to network conditions.
491	/// Use [`Self::next_group_ordered`] if you need groups in sequence order,
492	/// skipping those that arrive too late.
493	pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
494		conducer::wait(|waiter| self.poll_recv_group(waiter)).await
495	}
496
497	/// Deprecated alias for [`Self::poll_recv_group`].
498	#[deprecated(note = "use poll_recv_group for arrival order, or poll_next_group_ordered for sequence order")]
499	pub fn poll_next_group(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
500		self.poll_recv_group(waiter)
501	}
502
503	/// Deprecated alias for [`Self::recv_group`].
504	#[deprecated(note = "use recv_group for arrival order, or next_group_ordered for sequence order")]
505	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
506		self.recv_group().await
507	}
508
509	/// A helper that calls [`Self::poll_recv_group`] but only returns groups with a sequence number higher than any previously returned.
510	///
511	/// NOTE: This will be renamed to `poll_next_group` in the next major version.
512	pub fn poll_next_group_ordered(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
513		loop {
514			let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
515				return Poll::Ready(Ok(None));
516			};
517			if group.info.sequence < self.next_sequence {
518				// Late arrival; discard and keep looking.
519				continue;
520			}
521			self.next_sequence = group.info.sequence.saturating_add(1);
522			return Poll::Ready(Ok(Some(group)));
523		}
524	}
525
526	/// Return the next group with a strictly-greater sequence number than the last returned.
527	///
528	/// Groups that arrive late (with a sequence number at or below the last one returned)
529	/// are silently skipped.
530	///
531	/// NOTE: This will be renamed to `next_group` in the next major version.
532	pub async fn next_group_ordered(&mut self) -> Result<Option<GroupConsumer>> {
533		conducer::wait(|waiter| self.poll_next_group_ordered(waiter)).await
534	}
535
536	/// A helper that calls [`Self::poll_next_group_ordered`] and returns its first frame,
537	/// skipping the rest of the group. Intended for single-frame groups (see
538	/// [`TrackProducer::write_frame`]).
539	pub fn poll_read_frame(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
540		let lower = self.min_sequence.max(self.next_sequence);
541		let Some((frame, found_index, sequence)) =
542			ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
543		else {
544			return Poll::Ready(Ok(None));
545		};
546
547		self.index = found_index + 1;
548		self.next_sequence = sequence.saturating_add(1);
549		Poll::Ready(Ok(Some(frame)))
550	}
551
552	/// Read a single full frame from the next group in sequence order.
553	///
554	/// See [`Self::poll_read_frame`] for semantics.
555	pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
556		conducer::wait(|waiter| self.poll_read_frame(waiter)).await
557	}
558
559	/// Poll for the group with the given sequence, without blocking.
560	pub fn poll_get_group(&self, waiter: &conducer::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
561		self.poll(waiter, |state| state.poll_get_group(sequence))
562	}
563
564	/// Block until the group with the given sequence is available.
565	///
566	/// Returns None if the group is not in the cache and a newer group exists.
567	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
568		conducer::wait(|waiter| self.poll_get_group(waiter, sequence)).await
569	}
570
571	/// Poll for track closure, without blocking.
572	pub fn poll_closed(&self, waiter: &conducer::Waiter) -> Poll<Result<()>> {
573		self.poll(waiter, |state| state.poll_closed())
574	}
575
576	/// Block until the track is closed.
577	///
578	/// Returns Ok() is the track was cleanly finished.
579	pub async fn closed(&self) -> Result<()> {
580		conducer::wait(|waiter| self.poll_closed(waiter)).await
581	}
582
583	pub fn is_clone(&self, other: &Self) -> bool {
584		self.state.same_channel(&other.state)
585	}
586
587	/// Poll for the total number of groups in the track.
588	pub fn poll_finished(&mut self, waiter: &conducer::Waiter) -> Poll<Result<u64>> {
589		self.poll(waiter, |state| state.poll_finished())
590	}
591
592	/// Block until the track is finished, returning the total number of groups.
593	pub async fn finished(&mut self) -> Result<u64> {
594		conducer::wait(|waiter| self.poll_finished(waiter)).await
595	}
596
597	/// Start the consumer at the specified sequence.
598	pub fn start_at(&mut self, sequence: u64) {
599		self.min_sequence = sequence;
600	}
601
602	/// Return the latest sequence number in the track.
603	pub fn latest(&self) -> Option<u64> {
604		self.state.read().max_sequence
605	}
606
607	/// Upgrade this consumer back to a [TrackProducer] sharing the same state.
608	///
609	/// This enables zero-copy track sharing between broadcasts: subscribe to a
610	/// track, then [`crate::BroadcastProducer::insert_track`] the producer into another
611	/// broadcast. Both broadcasts serve the same underlying track data with no
612	/// forwarding overhead.
613	///
614	/// # Shared Ownership
615	///
616	/// The returned producer shares state with the original track. Mutations
617	/// (appending groups, finishing, aborting) through either producer affect
618	/// all consumers of the track. The returned producer keeps the track alive
619	/// (prevents auto-close) as long as it exists, even if the original producer
620	/// is dropped.
621	pub fn produce(&self) -> Result<TrackProducer> {
622		let state = self
623			.state
624			.produce()
625			.ok_or_else(|| self.state.read().abort.clone().unwrap_or(Error::Dropped))?;
626		Ok(TrackProducer {
627			info: self.info.clone(),
628			state,
629		})
630	}
631}
632
633#[cfg(test)]
634use futures::FutureExt;
635
636#[cfg(test)]
637impl TrackConsumer {
638	pub fn assert_group(&mut self) -> GroupConsumer {
639		self.recv_group()
640			.now_or_never()
641			.expect("group would have blocked")
642			.expect("would have errored")
643			.expect("track was closed")
644	}
645
646	pub fn assert_no_group(&mut self) {
647		assert!(
648			self.recv_group().now_or_never().is_none(),
649			"recv_group would not have blocked"
650		);
651	}
652
653	pub fn assert_not_closed(&self) {
654		assert!(self.closed().now_or_never().is_none(), "should not be closed");
655	}
656
657	pub fn assert_closed(&self) {
658		assert!(self.closed().now_or_never().is_some(), "should be closed");
659	}
660
661	// TODO assert specific errors after implementing PartialEq
662	pub fn assert_error(&self) {
663		assert!(
664			self.closed().now_or_never().expect("should not block").is_err(),
665			"should be error"
666		);
667	}
668
669	pub fn assert_is_clone(&self, other: &Self) {
670		assert!(self.is_clone(other), "should be clone");
671	}
672
673	pub fn assert_not_clone(&self, other: &Self) {
674		assert!(!self.is_clone(other), "should not be clone");
675	}
676}
677
678#[cfg(test)]
679mod test {
680	use super::*;
681
682	/// Helper: count non-tombstoned groups in state.
683	fn live_groups(state: &State) -> usize {
684		state.groups.iter().flatten().count()
685	}
686
687	/// Helper: get the sequence number of the first live group.
688	fn first_live_sequence(state: &State) -> u64 {
689		state.groups.iter().flatten().next().unwrap().0.info.sequence
690	}
691
692	#[tokio::test]
693	async fn evict_expired_groups() {
694		tokio::time::pause();
695
696		let mut producer = Track::new("test").produce();
697
698		// Create 3 groups at time 0.
699		producer.append_group().unwrap(); // seq 0
700		producer.append_group().unwrap(); // seq 1
701		producer.append_group().unwrap(); // seq 2
702
703		{
704			let state = producer.state.read();
705			assert_eq!(live_groups(&state), 3);
706			assert_eq!(state.offset, 0);
707		}
708
709		// Advance time past the eviction threshold.
710		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
711
712		// Append a new group to trigger eviction.
713		producer.append_group().unwrap(); // seq 3
714
715		// Groups 0, 1, 2 are expired but seq 3 (max_sequence) is kept.
716		// Leading tombstones are trimmed, so only seq 3 remains.
717		{
718			let state = producer.state.read();
719			assert_eq!(live_groups(&state), 1);
720			assert_eq!(first_live_sequence(&state), 3);
721			assert_eq!(state.offset, 3);
722			assert!(!state.duplicates.contains(&0));
723			assert!(!state.duplicates.contains(&1));
724			assert!(!state.duplicates.contains(&2));
725			assert!(state.duplicates.contains(&3));
726		}
727	}
728
729	#[tokio::test]
730	async fn evict_keeps_max_sequence() {
731		tokio::time::pause();
732
733		let mut producer = Track::new("test").produce();
734		producer.append_group().unwrap(); // seq 0
735
736		// Advance time past threshold.
737		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
738
739		// Append another group; seq 0 is expired and evicted.
740		producer.append_group().unwrap(); // seq 1
741
742		{
743			let state = producer.state.read();
744			assert_eq!(live_groups(&state), 1);
745			assert_eq!(first_live_sequence(&state), 1);
746			assert_eq!(state.offset, 1);
747		}
748	}
749
750	#[tokio::test]
751	async fn no_eviction_when_fresh() {
752		tokio::time::pause();
753
754		let mut producer = Track::new("test").produce();
755		producer.append_group().unwrap(); // seq 0
756		producer.append_group().unwrap(); // seq 1
757		producer.append_group().unwrap(); // seq 2
758
759		{
760			let state = producer.state.read();
761			assert_eq!(live_groups(&state), 3);
762			assert_eq!(state.offset, 0);
763		}
764	}
765
766	#[tokio::test]
767	async fn consumer_skips_evicted_groups() {
768		tokio::time::pause();
769
770		let mut producer = Track::new("test").produce();
771		producer.append_group().unwrap(); // seq 0
772
773		let mut consumer = producer.consume();
774
775		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
776		producer.append_group().unwrap(); // seq 1
777
778		// Group 0 was evicted. Consumer should get group 1.
779		let group = consumer.assert_group();
780		assert_eq!(group.info.sequence, 1);
781	}
782
783	#[tokio::test]
784	async fn out_of_order_max_sequence_at_front() {
785		tokio::time::pause();
786
787		let mut producer = Track::new("test").produce();
788
789		// Arrive out of order: seq 5 first, then 3, then 4.
790		producer.create_group(Group { sequence: 5 }).unwrap();
791		producer.create_group(Group { sequence: 3 }).unwrap();
792		producer.create_group(Group { sequence: 4 }).unwrap();
793
794		// max_sequence = 5, which is at the front of the VecDeque.
795		{
796			let state = producer.state.read();
797			assert_eq!(state.max_sequence, Some(5));
798		}
799
800		// Expire all three groups.
801		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
802
803		// Append seq 6 (becomes new max_sequence).
804		producer.append_group().unwrap(); // seq 6
805
806		// Seq 3, 4, 5 are all expired. Seq 5 was the old max_sequence but now 6 is.
807		// All old groups are evicted.
808		{
809			let state = producer.state.read();
810			assert_eq!(live_groups(&state), 1);
811			assert_eq!(first_live_sequence(&state), 6);
812			assert!(!state.duplicates.contains(&3));
813			assert!(!state.duplicates.contains(&4));
814			assert!(!state.duplicates.contains(&5));
815			assert!(state.duplicates.contains(&6));
816		}
817	}
818
819	#[tokio::test]
820	async fn max_sequence_at_front_blocks_trim() {
821		tokio::time::pause();
822
823		let mut producer = Track::new("test").produce();
824
825		// Arrive: seq 5, then seq 3.
826		producer.create_group(Group { sequence: 5 }).unwrap();
827
828		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
829
830		// Seq 3 arrives late; max_sequence is still 5 (at front).
831		producer.create_group(Group { sequence: 3 }).unwrap();
832
833		// Seq 5 is max_sequence (protected). Seq 3 is not expired (just created).
834		// Nothing should be evicted.
835		{
836			let state = producer.state.read();
837			assert_eq!(live_groups(&state), 2);
838			assert_eq!(state.offset, 0);
839		}
840
841		// Expire seq 3 as well.
842		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
843
844		// Seq 2 arrives late, triggering eviction.
845		producer.create_group(Group { sequence: 2 }).unwrap();
846
847		// Seq 5 is still max_sequence (protected, at front, blocks trim).
848		// Seq 3 is expired → tombstoned.
849		// Seq 2 is fresh → kept.
850		// VecDeque: [Some(5), None, Some(2)]. Leading entry is Some, so offset stays.
851		{
852			let state = producer.state.read();
853			assert_eq!(live_groups(&state), 2);
854			assert_eq!(state.offset, 0);
855			assert!(state.duplicates.contains(&5));
856			assert!(!state.duplicates.contains(&3));
857			assert!(state.duplicates.contains(&2));
858		}
859
860		// Consumer should still be able to read through the hole.
861		let mut consumer = producer.consume();
862		let group = consumer.assert_group();
863		// consume() starts at index 0, first non-tombstoned group is seq 5.
864		assert_eq!(group.info.sequence, 5);
865	}
866
867	#[test]
868	fn append_finish_cannot_be_rewritten() {
869		let mut producer = Track::new("test").produce();
870
871		// Finishing an empty track is valid (fin = 0, total groups = 0).
872		assert!(producer.finish().is_ok());
873		assert!(producer.finish().is_err());
874		assert!(producer.append_group().is_err());
875	}
876
877	#[test]
878	fn finish_after_groups() {
879		let mut producer = Track::new("test").produce();
880
881		producer.append_group().unwrap();
882		assert!(producer.finish().is_ok());
883		assert!(producer.finish().is_err());
884		assert!(producer.append_group().is_err());
885	}
886
887	#[test]
888	fn insert_finish_validates_sequence_and_freezes_to_max() {
889		let mut producer = Track::new("test").produce();
890		producer.create_group(Group { sequence: 5 }).unwrap();
891
892		assert!(producer.finish_at(4).is_err());
893		assert!(producer.finish_at(10).is_err());
894		assert!(producer.finish_at(5).is_ok());
895
896		{
897			let state = producer.state.read();
898			assert_eq!(state.final_sequence, Some(6));
899		}
900
901		assert!(producer.finish_at(5).is_err());
902		assert!(producer.create_group(Group { sequence: 4 }).is_ok());
903		assert!(producer.create_group(Group { sequence: 5 }).is_err());
904	}
905
906	#[tokio::test]
907	async fn recv_group_finishes_without_waiting_for_gaps() {
908		let mut producer = Track::new("test").produce();
909		producer.create_group(Group { sequence: 1 }).unwrap();
910		producer.finish_at(1).unwrap();
911
912		let mut consumer = producer.consume();
913		assert_eq!(consumer.assert_group().info.sequence, 1);
914
915		let done = consumer
916			.recv_group()
917			.now_or_never()
918			.expect("should not block")
919			.expect("would have errored");
920		assert!(done.is_none(), "track should finish without waiting for gaps");
921	}
922
923	#[tokio::test]
924	async fn next_group_ordered_skips_late_arrivals() {
925		let mut producer = Track::new("test").produce();
926		let mut consumer = producer.consume();
927
928		// Seq 5 arrives first.
929		producer.create_group(Group { sequence: 5 }).unwrap();
930		let group = consumer
931			.next_group_ordered()
932			.now_or_never()
933			.expect("should not block")
934			.expect("would have errored")
935			.expect("track should not be closed");
936		assert_eq!(group.info.sequence, 5);
937
938		// Seq 3 arrives late — skipped because 3 <= 5.
939		producer.create_group(Group { sequence: 3 }).unwrap();
940		// Seq 4 arrives late — also skipped.
941		producer.create_group(Group { sequence: 4 }).unwrap();
942		// Seq 7 arrives — returned.
943		producer.create_group(Group { sequence: 7 }).unwrap();
944
945		let group = consumer
946			.next_group_ordered()
947			.now_or_never()
948			.expect("should not block")
949			.expect("would have errored")
950			.expect("track should not be closed");
951		assert_eq!(group.info.sequence, 7);
952
953		// No more groups — would block.
954		assert!(
955			consumer.next_group_ordered().now_or_never().is_none(),
956			"should block waiting for a higher sequence"
957		);
958	}
959
960	#[tokio::test]
961	async fn next_group_ordered_returns_arrivals_in_order() {
962		let mut producer = Track::new("test").produce();
963		let mut consumer = producer.consume();
964
965		// Seq 3 arrives first, then seq 5 — both should be returned in arrival order.
966		producer.create_group(Group { sequence: 3 }).unwrap();
967		producer.create_group(Group { sequence: 5 }).unwrap();
968
969		let group = consumer
970			.next_group_ordered()
971			.now_or_never()
972			.expect("should not block")
973			.expect("would have errored")
974			.expect("track should not be closed");
975		assert_eq!(group.info.sequence, 3);
976
977		let group = consumer
978			.next_group_ordered()
979			.now_or_never()
980			.expect("should not block")
981			.expect("would have errored")
982			.expect("track should not be closed");
983		assert_eq!(group.info.sequence, 5);
984	}
985
986	#[tokio::test]
987	async fn recv_group_after_next_group_ordered_sees_late_arrivals() {
988		let mut producer = Track::new("test").produce();
989		let mut consumer = producer.consume();
990
991		producer.create_group(Group { sequence: 5 }).unwrap();
992		producer.create_group(Group { sequence: 3 }).unwrap();
993
994		// Ordered returns seq 5 and advances its internal cursor past it.
995		let group = consumer
996			.next_group_ordered()
997			.now_or_never()
998			.expect("should not block")
999			.expect("would have errored")
1000			.expect("track should not be closed");
1001		assert_eq!(group.info.sequence, 5);
1002
1003		// Intermixing: recv_group on the same consumer still returns the late seq 3.
1004		// The ordered cursor is separate from the recv_group filter.
1005		assert_eq!(consumer.assert_group().info.sequence, 3);
1006	}
1007
1008	#[tokio::test]
1009	async fn read_frame_returns_single_frame_per_group() {
1010		let mut producer = Track::new("test").produce();
1011		let mut consumer = producer.consume();
1012
1013		producer.write_frame(b"hello".as_slice()).unwrap();
1014		producer.write_frame(b"world".as_slice()).unwrap();
1015
1016		let frame = consumer
1017			.read_frame()
1018			.now_or_never()
1019			.expect("should not block")
1020			.expect("would have errored")
1021			.expect("track should not be closed");
1022		assert_eq!(&frame[..], b"hello");
1023
1024		let frame = consumer
1025			.read_frame()
1026			.now_or_never()
1027			.expect("should not block")
1028			.expect("would have errored")
1029			.expect("track should not be closed");
1030		assert_eq!(&frame[..], b"world");
1031	}
1032
1033	#[tokio::test]
1034	async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1035		let mut producer = Track::new("test").produce();
1036		let mut consumer = producer.consume();
1037
1038		// Seq 3: group open, no frame yet (stalled).
1039		let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1040		// Seq 5: fully-written group with a frame.
1041		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1042		g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1043		g5.finish().unwrap();
1044
1045		// read_frame should not block on the stalled seq 3 — it returns seq 5's frame.
1046		let frame = consumer
1047			.read_frame()
1048			.now_or_never()
1049			.expect("should not block on stalled earlier group")
1050			.expect("would have errored")
1051			.expect("track should not be closed");
1052		assert_eq!(&frame[..], b"later");
1053	}
1054
1055	#[tokio::test]
1056	async fn read_frame_discards_rest_of_multi_frame_group() {
1057		let mut producer = Track::new("test").produce();
1058		let mut consumer = producer.consume();
1059
1060		// Group 0 has two frames; only the first is returned.
1061		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1062		g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1063		g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1064		g0.finish().unwrap();
1065
1066		// Group 1 is a normal single-frame group.
1067		producer.write_frame(b"next".as_slice()).unwrap();
1068
1069		let frame = consumer
1070			.read_frame()
1071			.now_or_never()
1072			.expect("should not block")
1073			.expect("would have errored")
1074			.expect("track should not be closed");
1075		assert_eq!(&frame[..], b"one");
1076
1077		// The second frame of group 0 is discarded; the next read jumps to group 1.
1078		let frame = consumer
1079			.read_frame()
1080			.now_or_never()
1081			.expect("should not block")
1082			.expect("would have errored")
1083			.expect("track should not be closed");
1084		assert_eq!(&frame[..], b"next");
1085	}
1086
1087	#[tokio::test]
1088	async fn read_frame_waits_for_pending_group_after_finish() {
1089		// finish() sets final_sequence, but groups already created with lower sequences
1090		// can still produce frames. read_frame must not return None prematurely.
1091		let mut producer = Track::new("test").produce();
1092		let mut consumer = producer.consume();
1093
1094		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1095		producer.finish().unwrap();
1096
1097		// Track is finished but group 0 has no frame yet — must block, not return None.
1098		assert!(
1099			consumer.read_frame().now_or_never().is_none(),
1100			"read_frame must block on a pending group even after finish()"
1101		);
1102
1103		// A late frame on the pending group is still delivered.
1104		g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1105		let frame = consumer
1106			.read_frame()
1107			.now_or_never()
1108			.expect("should not block once a frame is written")
1109			.expect("would have errored")
1110			.expect("track should not be closed");
1111		assert_eq!(&frame[..], b"late");
1112	}
1113
1114	#[tokio::test]
1115	async fn read_frame_respects_start_at() {
1116		// start_at sets min_sequence; read_frame must skip groups below it even though
1117		// next_sequence is still 0.
1118		let mut producer = Track::new("test").produce();
1119		let mut consumer = producer.consume();
1120		consumer.start_at(5);
1121
1122		// Seq 3 has a frame but is below min_sequence — must be skipped.
1123		let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1124		g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1125		g3.finish().unwrap();
1126
1127		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1128		g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1129		g5.finish().unwrap();
1130
1131		let frame = consumer
1132			.read_frame()
1133			.now_or_never()
1134			.expect("should not block")
1135			.expect("would have errored")
1136			.expect("track should not be closed");
1137		assert_eq!(&frame[..], b"keep");
1138	}
1139
1140	#[tokio::test]
1141	async fn read_frame_returns_none_when_finished() {
1142		let mut producer = Track::new("test").produce();
1143		let mut consumer = producer.consume();
1144
1145		producer.write_frame(b"only".as_slice()).unwrap();
1146		producer.finish().unwrap();
1147
1148		let frame = consumer
1149			.read_frame()
1150			.now_or_never()
1151			.expect("should not block")
1152			.expect("would have errored")
1153			.expect("track should not be closed");
1154		assert_eq!(&frame[..], b"only");
1155
1156		let done = consumer
1157			.read_frame()
1158			.now_or_never()
1159			.expect("should not block")
1160			.expect("would have errored");
1161		assert!(done.is_none());
1162	}
1163
1164	#[tokio::test]
1165	async fn get_group_finishes_without_waiting_for_gaps() {
1166		let mut producer = Track::new("test").produce();
1167		producer.create_group(Group { sequence: 1 }).unwrap();
1168		producer.finish_at(1).unwrap();
1169
1170		let consumer = producer.consume();
1171		// get_group(0) blocks because group 0 is below final_sequence and could still arrive.
1172		assert!(
1173			consumer.get_group(0).now_or_never().is_none(),
1174			"sequence below fin should block (group could still arrive)"
1175		);
1176		assert!(
1177			consumer
1178				.get_group(2)
1179				.now_or_never()
1180				.expect("sequence at-or-after fin should resolve")
1181				.expect("should not error")
1182				.is_none(),
1183			"sequence at-or-after fin should not exist"
1184		);
1185	}
1186
1187	#[test]
1188	fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1189		let mut producer = Track::new("test").produce();
1190		{
1191			let mut state = producer.state.write().ok().unwrap();
1192			state.max_sequence = Some(u64::MAX);
1193		}
1194
1195		assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1196	}
1197
1198	#[tokio::test]
1199	async fn consumer_produce() {
1200		let mut producer = Track::new("test").produce();
1201		producer.append_group().unwrap();
1202
1203		let consumer = producer.consume();
1204
1205		// Upgrade consumer back to producer — shared state.
1206		let got = consumer.produce().expect("should produce");
1207		assert!(got.is_clone(&producer), "should be the same track");
1208
1209		// Writing through the upgraded producer is visible to new consumers.
1210		got.clone().append_group().unwrap();
1211		let mut sub = producer.consume();
1212		sub.assert_group(); // group 0
1213		sub.assert_group(); // group 1, written via upgraded producer
1214	}
1215
1216	#[tokio::test]
1217	async fn consumer_produce_after_drop() {
1218		let producer = Track::new("test").produce();
1219		let consumer = producer.consume();
1220		drop(producer);
1221
1222		// Original producer dropped. Consumer's produce() should fail
1223		// because there are no remaining producers.
1224		let err = consumer.produce();
1225		assert!(matches!(err, Err(Error::Dropped)), "expected Dropped");
1226	}
1227
1228	#[tokio::test]
1229	async fn consumer_produce_after_abort() {
1230		let mut producer = Track::new("test").produce();
1231		let consumer = producer.consume();
1232		producer.abort(Error::Cancel).unwrap();
1233		drop(producer);
1234
1235		// Track was aborted — produce() should return the abort error, not Dropped.
1236		let err = consumer.produce();
1237		assert!(matches!(err, Err(Error::Cancel)), "expected Cancel");
1238	}
1239
1240	#[tokio::test]
1241	async fn consumer_produce_keeps_alive() {
1242		let producer = Track::new("test").produce();
1243		let consumer = producer.consume();
1244		let upgraded = consumer.produce().expect("should produce");
1245		drop(producer);
1246
1247		// Channel still open because upgraded producer exists.
1248		assert!(consumer.closed().now_or_never().is_none(), "should not be closed");
1249		drop(upgraded);
1250
1251		// Now it should close.
1252		assert!(consumer.closed().now_or_never().is_some(), "should be closed");
1253	}
1254}