Skip to main content

moq_net/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequence number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Transport is to not block on them.
10//! Streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new streams going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use crate::{Error, Result, coding};
16
17use super::{Group, GroupConsumer, GroupProducer};
18
19use std::{
20	collections::{HashSet, VecDeque},
21	task::{Poll, ready},
22	time::Duration,
23};
24
25/// Groups older than this are evicted from the track cache (unless they are the max_sequence group).
26// TODO: Replace with a configurable cache size.
27const MAX_GROUP_AGE: Duration = Duration::from_secs(5);
28
29/// A track is a collection of groups, delivered out-of-order until expired.
30#[derive(Clone, Debug, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct Track {
33	/// Identifier within a broadcast. Unique per [`crate::Broadcast`].
34	pub name: String,
35	/// Delivery priority. Higher values preempt lower ones when bandwidth is constrained.
36	pub priority: u8,
37}
38
39impl Track {
40	/// Create a track with the given name and default priority (`0`).
41	pub fn new<T: Into<String>>(name: T) -> Self {
42		Self {
43			name: name.into(),
44			priority: 0,
45		}
46	}
47
48	/// Consume this [`Track`] to create a producer that owns its metadata.
49	pub fn produce(self) -> TrackProducer {
50		TrackProducer::new(self)
51	}
52}
53
54#[derive(Default)]
55struct State {
56	/// Groups in arrival order. `None` entries are tombstones for evicted groups.
57	groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
58	duplicates: HashSet<u64>,
59	offset: usize,
60	max_sequence: Option<u64>,
61	final_sequence: Option<u64>,
62	abort: Option<Error>,
63}
64
65impl State {
66	/// Find the next non-tombstoned group at or after `index` in arrival order.
67	///
68	/// Returns the group and its absolute index so the consumer can advance past it.
69	fn poll_recv_group(&self, index: usize, min_sequence: u64) -> Poll<Result<Option<(GroupConsumer, usize)>>> {
70		let start = index.saturating_sub(self.offset);
71		for (i, slot) in self.groups.iter().enumerate().skip(start) {
72			if let Some((group, _)) = slot
73				&& group.sequence >= min_sequence
74			{
75				return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
76			}
77		}
78
79		// TODO once we have drop notifications, check if index == final_sequence.
80		if self.final_sequence.is_some() {
81			Poll::Ready(Ok(None))
82		} else if let Some(err) = &self.abort {
83			Poll::Ready(Err(err.clone()))
84		} else {
85			Poll::Pending
86		}
87	}
88
89	/// Scan groups at or after `index` in arrival order, looking for the first with sequence
90	/// `>= next_sequence` that has a fully-buffered next frame. Returns the frame plus the
91	/// winning slot's absolute index and sequence so the consumer can advance past it.
92	fn poll_read_frame(
93		&self,
94		index: usize,
95		next_sequence: u64,
96		waiter: &kio::Waiter,
97	) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
98		let start = index.saturating_sub(self.offset);
99		let mut pending_seen = false;
100		for (i, slot) in self.groups.iter().enumerate().skip(start) {
101			let Some((group, _)) = slot else { continue };
102			if group.sequence < next_sequence {
103				continue;
104			}
105
106			let mut consumer = group.consume();
107			match consumer.poll_read_frame(waiter) {
108				Poll::Ready(Ok(Some(frame))) => {
109					return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence))));
110				}
111				Poll::Ready(Ok(None)) => continue,
112				Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
113				Poll::Pending => {
114					pending_seen = true;
115					continue;
116				}
117			}
118		}
119
120		// A pending group can still produce a frame even after finish() — finish only
121		// blocks new groups at/above final_sequence, not frames on existing groups.
122		if pending_seen {
123			Poll::Pending
124		} else if self.final_sequence.is_some() {
125			Poll::Ready(Ok(None))
126		} else if let Some(err) = &self.abort {
127			Poll::Ready(Err(err.clone()))
128		} else {
129			Poll::Pending
130		}
131	}
132
133	fn poll_get_group(&self, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
134		// Search for the group with the matching sequence, skipping tombstones.
135		for (group, _) in self.groups.iter().flatten() {
136			if group.sequence == sequence {
137				return Poll::Ready(Ok(Some(group.consume())));
138			}
139		}
140
141		// Once final_sequence is set, groups at or past it can never exist.
142		if let Some(fin) = self.final_sequence
143			&& sequence >= fin
144		{
145			return Poll::Ready(Ok(None));
146		}
147
148		if let Some(err) = &self.abort {
149			return Poll::Ready(Err(err.clone()));
150		}
151
152		Poll::Pending
153	}
154
155	fn poll_closed(&self) -> Poll<Result<()>> {
156		if self.final_sequence.is_some() {
157			Poll::Ready(Ok(()))
158		} else if let Some(err) = &self.abort {
159			Poll::Ready(Err(err.clone()))
160		} else {
161			Poll::Pending
162		}
163	}
164
165	/// Evict groups older than MAX_GROUP_AGE, never evicting the max_sequence group.
166	///
167	/// Groups are in arrival order, so we can stop early when we hit a non-expired,
168	/// non-max_sequence group (everything after it arrived even later).
169	/// When max_sequence is at the front, we skip past it and tombstone expired groups
170	/// behind it.
171	fn evict_expired(&mut self, now: tokio::time::Instant) {
172		for slot in self.groups.iter_mut() {
173			let Some((group, created_at)) = slot else { continue };
174
175			if Some(group.sequence) == self.max_sequence {
176				continue;
177			}
178
179			if now.duration_since(*created_at) <= MAX_GROUP_AGE {
180				break;
181			}
182
183			self.duplicates.remove(&group.sequence);
184			*slot = None;
185		}
186
187		// Trim leading tombstones to advance the offset.
188		while let Some(None) = self.groups.front() {
189			self.groups.pop_front();
190			self.offset += 1;
191		}
192	}
193
194	fn poll_finished(&self) -> Poll<Result<u64>> {
195		if let Some(fin) = self.final_sequence {
196			Poll::Ready(Ok(fin))
197		} else if let Some(err) = &self.abort {
198			Poll::Ready(Err(err.clone()))
199		} else {
200			Poll::Pending
201		}
202	}
203}
204
205/// A producer for a track, used to create new groups.
206pub struct TrackProducer {
207	info: Track,
208	state: kio::Producer<State>,
209}
210
211impl std::ops::Deref for TrackProducer {
212	type Target = Track;
213
214	fn deref(&self) -> &Self::Target {
215		&self.info
216	}
217}
218
219impl TrackProducer {
220	/// Build a producer for the given track metadata. Prefer [`Track::produce`].
221	pub fn new(info: Track) -> Self {
222		Self {
223			info,
224			state: kio::Producer::default(),
225		}
226	}
227
228	/// Create a new group with the given sequence number.
229	pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> {
230		let group = info.produce();
231
232		let mut state = self.modify()?;
233		if let Some(fin) = state.final_sequence
234			&& group.sequence >= fin
235		{
236			return Err(Error::Closed);
237		}
238
239		if !state.duplicates.insert(group.sequence) {
240			return Err(Error::Duplicate);
241		}
242
243		let now = tokio::time::Instant::now();
244		state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence));
245		state.groups.push_back(Some((group.clone(), now)));
246		state.evict_expired(now);
247
248		Ok(group)
249	}
250
251	/// Create a new group with the next sequence number.
252	pub fn append_group(&mut self) -> Result<GroupProducer> {
253		let mut state = self.modify()?;
254		let sequence = match state.max_sequence {
255			Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?,
256			None => 0,
257		};
258		if let Some(fin) = state.final_sequence
259			&& sequence >= fin
260		{
261			return Err(Error::Closed);
262		}
263
264		let group = Group { sequence }.produce();
265
266		let now = tokio::time::Instant::now();
267		state.duplicates.insert(sequence);
268		state.max_sequence = Some(sequence);
269		state.groups.push_back(Some((group.clone(), now)));
270		state.evict_expired(now);
271
272		Ok(group)
273	}
274
275	/// Create a group with a single frame.
276	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
277		let mut group = self.append_group()?;
278		group.write_frame(frame.into())?;
279		group.finish()?;
280		Ok(())
281	}
282
283	/// Mark the track as finished after the last appended group.
284	///
285	/// Sets the final sequence to one past the current max_sequence.
286	/// No new groups at or above this sequence can be appended.
287	/// NOTE: Old groups with lower sequence numbers can still arrive.
288	pub fn finish(&mut self) -> Result<()> {
289		let mut state = self.modify()?;
290		if state.final_sequence.is_some() {
291			return Err(Error::Closed);
292		}
293		state.final_sequence = Some(match state.max_sequence {
294			Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?,
295			None => 0,
296		});
297		Ok(())
298	}
299
300	/// Mark the track as finished at an exact final sequence.
301	///
302	/// The caller must pass the current max_sequence exactly.
303	/// Freezes the final boundary at one past the current max_sequence.
304	/// No new groups at or above that sequence can be created.
305	/// NOTE: Old groups with lower sequence numbers can still arrive.
306	pub fn finish_at(&mut self, sequence: u64) -> Result<()> {
307		let mut state = self.modify()?;
308		let max = state.max_sequence.ok_or(Error::Closed)?;
309		if state.final_sequence.is_some() || sequence != max {
310			return Err(Error::Closed);
311		}
312		state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?);
313		Ok(())
314	}
315
316	/// Abort the track with the given error.
317	///
318	/// Drops the cached groups so a stale [`TrackConsumer`] can't pin them (and
319	/// their frame buffers) in memory forever. Consumers that haven't drained yet
320	/// surface the abort error instead of the leftover cache. Child groups are
321	/// independent: a consumer that already pulled a [`GroupConsumer`] keeps its
322	/// own handle and can finish reading it.
323	pub fn abort(&mut self, err: Error) -> Result<()> {
324		let mut guard = self.modify()?;
325		guard.abort = Some(err);
326		guard.groups.clear();
327		guard.duplicates.clear();
328		guard.close();
329		Ok(())
330	}
331
332	/// Create a new consumer for the track, starting at the beginning.
333	pub fn consume(&self) -> TrackConsumer {
334		TrackConsumer {
335			info: self.info.clone(),
336			state: self.state.consume(),
337			index: 0,
338			min_sequence: 0,
339			next_sequence: 0,
340		}
341	}
342
343	/// The track name.
344	pub fn name(&self) -> &str {
345		&self.info.name
346	}
347
348	/// A cloneable watch-only handle to subscriber demand.
349	pub fn demand(&self) -> TrackDemand {
350		TrackDemand {
351			name: self.info.name.clone(),
352			state: self.state.weak(),
353		}
354	}
355
356	/// Block until there are no active consumers.
357	pub async fn unused(&self) -> Result<()> {
358		self.state
359			.unused()
360			.await
361			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
362	}
363
364	/// Block until there is at least one active consumer.
365	pub async fn used(&self) -> Result<()> {
366		self.state
367			.used()
368			.await
369			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
370	}
371
372	/// Block until the track is closed or aborted, returning the cause.
373	pub async fn closed(&self) -> Error {
374		self.state.closed().await;
375		self.state.read().abort.clone().unwrap_or(Error::Dropped)
376	}
377
378	/// Return true if the track has been closed.
379	pub fn is_closed(&self) -> bool {
380		self.state.read().is_closed()
381	}
382
383	/// Return true if this is the same track.
384	pub fn is_clone(&self, other: &Self) -> bool {
385		self.state.same_channel(&other.state)
386	}
387
388	/// Create a weak reference that doesn't prevent auto-close.
389	pub(crate) fn weak(&self) -> TrackWeak {
390		TrackWeak {
391			info: self.info.clone(),
392			state: self.state.weak(),
393		}
394	}
395
396	fn modify(&self) -> Result<kio::Mut<'_, State>> {
397		self.state
398			.write()
399			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
400	}
401}
402
403impl Clone for TrackProducer {
404	fn clone(&self) -> Self {
405		Self {
406			info: self.info.clone(),
407			state: self.state.clone(),
408		}
409	}
410}
411
412impl Drop for TrackProducer {
413	fn drop(&mut self) {
414		// The last producer going away without finishing is an abrupt teardown:
415		// release the cached groups so a stale consumer can't pin them (and their
416		// frame buffers) forever, the same as an explicit abort. A cleanly
417		// finished track keeps its cache so consumers can still drain it.
418		if !self.state.is_last() {
419			return;
420		}
421		if let Ok(mut state) = self.state.write()
422			&& state.final_sequence.is_none()
423		{
424			state.groups.clear();
425			state.duplicates.clear();
426		}
427	}
428}
429
430impl From<Track> for TrackProducer {
431	fn from(info: Track) -> Self {
432		TrackProducer::new(info)
433	}
434}
435
436/// A weak reference to a track that doesn't prevent auto-close.
437#[derive(Clone)]
438pub(crate) struct TrackWeak {
439	pub(crate) info: Track,
440	state: kio::Weak<State>,
441}
442
443/// A cloneable, watch-only handle to a track's subscriber demand.
444#[derive(Clone)]
445pub struct TrackDemand {
446	name: String,
447	state: kio::Weak<State>,
448}
449
450impl TrackDemand {
451	/// The track name this handle is bound to.
452	pub fn name(&self) -> &str {
453		&self.name
454	}
455
456	/// Block until there is at least one active consumer.
457	pub async fn used(&self) -> Result<()> {
458		self.state
459			.used()
460			.await
461			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
462	}
463
464	/// Block until there are no active consumers.
465	pub async fn unused(&self) -> Result<()> {
466		self.state
467			.unused()
468			.await
469			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
470	}
471
472	/// Block until the track is closed or aborted, returning the cause.
473	pub async fn closed(&self) -> Error {
474		if let Some(state) = self.state.produce() {
475			state.closed().await;
476		}
477
478		self.state.read().abort.clone().unwrap_or(Error::Dropped)
479	}
480}
481
482impl TrackWeak {
483	pub fn is_closed(&self) -> bool {
484		self.state.is_closed()
485	}
486
487	pub fn consume(&self) -> TrackConsumer {
488		TrackConsumer {
489			info: self.info.clone(),
490			state: self.state.consume(),
491			index: 0,
492			min_sequence: 0,
493			next_sequence: 0,
494		}
495	}
496
497	pub async fn unused(&self) -> crate::Result<()> {
498		self.state
499			.unused()
500			.await
501			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
502	}
503
504	pub fn is_clone(&self, other: &Self) -> bool {
505		self.state.same_channel(&other.state)
506	}
507}
508
509/// A consumer for a track, used to read groups.
510#[derive(Clone)]
511pub struct TrackConsumer {
512	info: Track,
513	state: kio::Consumer<State>,
514	/// Arrival-order cursor used by [`Self::recv_group`].
515	index: usize,
516	/// Minimum sequence to return from any `recv` method. Set by [`Self::start_at`].
517	min_sequence: u64,
518	/// One past the highest sequence returned by [`Self::next_group`].
519	/// Used only by that method to skip late arrivals; does not affect [`Self::recv_group`].
520	next_sequence: u64,
521}
522
523impl std::ops::Deref for TrackConsumer {
524	type Target = Track;
525
526	fn deref(&self) -> &Self::Target {
527		&self.info
528	}
529}
530
531impl TrackConsumer {
532	/// The track name this handle is bound to.
533	pub fn name(&self) -> &str {
534		&self.info.name
535	}
536
537	// A helper to automatically apply Dropped if the state is closed without an error.
538	fn poll<F, R>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<R>>
539	where
540		F: Fn(&kio::Ref<'_, State>) -> Poll<Result<R>>,
541	{
542		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
543			Ok(res) => res,
544			// We try to clone abort just in case the function forgot to check for terminal state.
545			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
546		})
547	}
548
549	/// Poll for the next group in arrival order, without blocking.
550	///
551	/// Returns every group exactly once in the order it landed on the wire — which may be
552	/// out of sequence due to network reordering or loss. Use [`Self::poll_next_group`] if
553	/// you only want groups whose sequence number is higher than any previously returned.
554	///
555	/// Returns `Poll::Ready(Ok(Some(group)))` when a group is available,
556	/// `Poll::Ready(Ok(None))` when the track is finished,
557	/// `Poll::Ready(Err(e))` when the track has been aborted, or
558	/// `Poll::Pending` when no group is available yet.
559	pub fn poll_recv_group(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
560		let Some((consumer, found_index)) =
561			ready!(self.poll(waiter, |state| state.poll_recv_group(self.index, self.min_sequence))?)
562		else {
563			return Poll::Ready(Ok(None));
564		};
565
566		self.index = found_index + 1;
567		Poll::Ready(Ok(Some(consumer)))
568	}
569
570	/// Receive the next group in arrival order.
571	///
572	/// Every group is returned exactly once, in the order it landed on the wire — which may
573	/// be out of sequence due to network reordering or loss. Use [`Self::next_group`] if you
574	/// only want groups whose sequence number is higher than any previously returned.
575	pub async fn recv_group(&mut self) -> Result<Option<GroupConsumer>> {
576		kio::wait(|waiter| self.poll_recv_group(waiter)).await
577	}
578
579	/// Poll for the next group with a higher sequence number than any previously returned.
580	///
581	/// Late arrivals (sequence at or below the last returned) are silently skipped, so this
582	/// produces a monotonically increasing sequence at the cost of dropping out-of-order
583	/// groups. Use [`Self::poll_recv_group`] to see every group in arrival order instead.
584	pub fn poll_next_group(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<GroupConsumer>>> {
585		loop {
586			let Some(group) = ready!(self.poll_recv_group(waiter)?) else {
587				return Poll::Ready(Ok(None));
588			};
589			if group.sequence < self.next_sequence {
590				// Late arrival; discard and keep looking.
591				continue;
592			}
593			self.next_sequence = group.sequence.saturating_add(1);
594			return Poll::Ready(Ok(Some(group)));
595		}
596	}
597
598	/// Return the next group with a higher sequence number than any previously returned.
599	///
600	/// Late arrivals (sequence at or below the last returned) are silently skipped, so this
601	/// produces a monotonically increasing sequence at the cost of dropping out-of-order
602	/// groups. Use [`Self::recv_group`] to see every group in arrival order instead.
603	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
604		kio::wait(|waiter| self.poll_next_group(waiter)).await
605	}
606
607	/// A helper that calls [`Self::poll_next_group`] and returns its first frame,
608	/// skipping the rest of the group. Intended for single-frame groups (see
609	/// [`TrackProducer::write_frame`]).
610	pub fn poll_read_frame(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
611		let lower = self.min_sequence.max(self.next_sequence);
612		let Some((frame, found_index, sequence)) =
613			ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
614		else {
615			return Poll::Ready(Ok(None));
616		};
617
618		self.index = found_index + 1;
619		self.next_sequence = sequence.saturating_add(1);
620		Poll::Ready(Ok(Some(frame)))
621	}
622
623	/// Read a single full frame from the next group in sequence order.
624	///
625	/// See [`Self::poll_read_frame`] for semantics.
626	pub async fn read_frame(&mut self) -> Result<Option<bytes::Bytes>> {
627		kio::wait(|waiter| self.poll_read_frame(waiter)).await
628	}
629
630	/// Poll for the group with the given sequence, without blocking.
631	pub fn poll_get_group(&self, waiter: &kio::Waiter, sequence: u64) -> Poll<Result<Option<GroupConsumer>>> {
632		self.poll(waiter, |state| state.poll_get_group(sequence))
633	}
634
635	/// Wait until the group with the given sequence becomes available.
636	///
637	/// Resolves to `Some(GroupConsumer)` once the group is in the cache.
638	/// Resolves to `None` only when `sequence` is at or past the track's
639	/// `final_sequence` (set by `finish()` / `finish_at()`), since such a
640	/// group can never be produced. Sequences below `final_sequence` still
641	/// wait, since older groups may still arrive out of order.
642	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
643		kio::wait(|waiter| self.poll_get_group(waiter, sequence)).await
644	}
645
646	/// Poll for track closure, without blocking.
647	pub fn poll_closed(&self, waiter: &kio::Waiter) -> Poll<Result<()>> {
648		self.poll(waiter, |state| state.poll_closed())
649	}
650
651	/// Block until the track is closed.
652	///
653	/// Returns Ok() is the track was cleanly finished.
654	pub async fn closed(&self) -> Result<()> {
655		kio::wait(|waiter| self.poll_closed(waiter)).await
656	}
657
658	/// Whether `other` was cloned from this consumer (shares the same underlying state).
659	pub fn is_clone(&self, other: &Self) -> bool {
660		self.state.same_channel(&other.state)
661	}
662
663	/// Poll for the total number of groups in the track.
664	pub fn poll_finished(&mut self, waiter: &kio::Waiter) -> Poll<Result<u64>> {
665		self.poll(waiter, |state| state.poll_finished())
666	}
667
668	/// Block until the track is finished, returning the total number of groups.
669	pub async fn finished(&mut self) -> Result<u64> {
670		kio::wait(|waiter| self.poll_finished(waiter)).await
671	}
672
673	/// Start the consumer at the specified sequence.
674	pub fn start_at(&mut self, sequence: u64) {
675		self.min_sequence = sequence;
676	}
677
678	/// Return the latest sequence number in the track.
679	pub fn latest(&self) -> Option<u64> {
680		self.state.read().max_sequence
681	}
682
683	/// Create a weak reference that doesn't prevent auto-close.
684	pub(crate) fn weak(&self) -> TrackWeak {
685		TrackWeak {
686			info: self.info.clone(),
687			state: self.state.weak(),
688		}
689	}
690}
691
692#[cfg(test)]
693use futures::FutureExt;
694
695#[cfg(test)]
696impl TrackConsumer {
697	pub fn assert_group(&mut self) -> GroupConsumer {
698		self.recv_group()
699			.now_or_never()
700			.expect("group would have blocked")
701			.expect("would have errored")
702			.expect("track was closed")
703	}
704
705	pub fn assert_no_group(&mut self) {
706		assert!(
707			self.recv_group().now_or_never().is_none(),
708			"recv_group would not have blocked"
709		);
710	}
711
712	pub fn assert_not_closed(&self) {
713		assert!(self.closed().now_or_never().is_none(), "should not be closed");
714	}
715
716	pub fn assert_closed(&self) {
717		assert!(self.closed().now_or_never().is_some(), "should be closed");
718	}
719
720	// TODO assert specific errors after implementing PartialEq
721	pub fn assert_error(&self) {
722		assert!(
723			self.closed().now_or_never().expect("should not block").is_err(),
724			"should be error"
725		);
726	}
727
728	pub fn assert_is_clone(&self, other: &Self) {
729		assert!(self.is_clone(other), "should be clone");
730	}
731
732	pub fn assert_not_clone(&self, other: &Self) {
733		assert!(!self.is_clone(other), "should not be clone");
734	}
735}
736
737#[cfg(test)]
738mod test {
739	use super::*;
740
741	/// Helper: count non-tombstoned groups in state.
742	fn live_groups(state: &State) -> usize {
743		state.groups.iter().flatten().count()
744	}
745
746	/// Helper: get the sequence number of the first live group.
747	fn first_live_sequence(state: &State) -> u64 {
748		state.groups.iter().flatten().next().unwrap().0.sequence
749	}
750
751	#[tokio::test]
752	async fn evict_expired_groups() {
753		tokio::time::pause();
754
755		let mut producer = Track::new("test").produce();
756
757		// Create 3 groups at time 0.
758		producer.append_group().unwrap(); // seq 0
759		producer.append_group().unwrap(); // seq 1
760		producer.append_group().unwrap(); // seq 2
761
762		{
763			let state = producer.state.read();
764			assert_eq!(live_groups(&state), 3);
765			assert_eq!(state.offset, 0);
766		}
767
768		// Advance time past the eviction threshold.
769		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
770
771		// Append a new group to trigger eviction.
772		producer.append_group().unwrap(); // seq 3
773
774		// Groups 0, 1, 2 are expired but seq 3 (max_sequence) is kept.
775		// Leading tombstones are trimmed, so only seq 3 remains.
776		{
777			let state = producer.state.read();
778			assert_eq!(live_groups(&state), 1);
779			assert_eq!(first_live_sequence(&state), 3);
780			assert_eq!(state.offset, 3);
781			assert!(!state.duplicates.contains(&0));
782			assert!(!state.duplicates.contains(&1));
783			assert!(!state.duplicates.contains(&2));
784			assert!(state.duplicates.contains(&3));
785		}
786	}
787
788	#[tokio::test]
789	async fn evict_keeps_max_sequence() {
790		tokio::time::pause();
791
792		let mut producer = Track::new("test").produce();
793		producer.append_group().unwrap(); // seq 0
794
795		// Advance time past threshold.
796		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
797
798		// Append another group; seq 0 is expired and evicted.
799		producer.append_group().unwrap(); // seq 1
800
801		{
802			let state = producer.state.read();
803			assert_eq!(live_groups(&state), 1);
804			assert_eq!(first_live_sequence(&state), 1);
805			assert_eq!(state.offset, 1);
806		}
807	}
808
809	#[tokio::test]
810	async fn no_eviction_when_fresh() {
811		tokio::time::pause();
812
813		let mut producer = Track::new("test").produce();
814		producer.append_group().unwrap(); // seq 0
815		producer.append_group().unwrap(); // seq 1
816		producer.append_group().unwrap(); // seq 2
817
818		{
819			let state = producer.state.read();
820			assert_eq!(live_groups(&state), 3);
821			assert_eq!(state.offset, 0);
822		}
823	}
824
825	#[tokio::test]
826	async fn consumer_skips_evicted_groups() {
827		tokio::time::pause();
828
829		let mut producer = Track::new("test").produce();
830		producer.append_group().unwrap(); // seq 0
831
832		let mut consumer = producer.consume();
833
834		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
835		producer.append_group().unwrap(); // seq 1
836
837		// Group 0 was evicted. Consumer should get group 1.
838		let group = consumer.assert_group();
839		assert_eq!(group.sequence, 1);
840	}
841
842	#[tokio::test]
843	async fn out_of_order_max_sequence_at_front() {
844		tokio::time::pause();
845
846		let mut producer = Track::new("test").produce();
847
848		// Arrive out of order: seq 5 first, then 3, then 4.
849		producer.create_group(Group { sequence: 5 }).unwrap();
850		producer.create_group(Group { sequence: 3 }).unwrap();
851		producer.create_group(Group { sequence: 4 }).unwrap();
852
853		// max_sequence = 5, which is at the front of the VecDeque.
854		{
855			let state = producer.state.read();
856			assert_eq!(state.max_sequence, Some(5));
857		}
858
859		// Expire all three groups.
860		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
861
862		// Append seq 6 (becomes new max_sequence).
863		producer.append_group().unwrap(); // seq 6
864
865		// Seq 3, 4, 5 are all expired. Seq 5 was the old max_sequence but now 6 is.
866		// All old groups are evicted.
867		{
868			let state = producer.state.read();
869			assert_eq!(live_groups(&state), 1);
870			assert_eq!(first_live_sequence(&state), 6);
871			assert!(!state.duplicates.contains(&3));
872			assert!(!state.duplicates.contains(&4));
873			assert!(!state.duplicates.contains(&5));
874			assert!(state.duplicates.contains(&6));
875		}
876	}
877
878	#[tokio::test]
879	async fn max_sequence_at_front_blocks_trim() {
880		tokio::time::pause();
881
882		let mut producer = Track::new("test").produce();
883
884		// Arrive: seq 5, then seq 3.
885		producer.create_group(Group { sequence: 5 }).unwrap();
886
887		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
888
889		// Seq 3 arrives late; max_sequence is still 5 (at front).
890		producer.create_group(Group { sequence: 3 }).unwrap();
891
892		// Seq 5 is max_sequence (protected). Seq 3 is not expired (just created).
893		// Nothing should be evicted.
894		{
895			let state = producer.state.read();
896			assert_eq!(live_groups(&state), 2);
897			assert_eq!(state.offset, 0);
898		}
899
900		// Expire seq 3 as well.
901		tokio::time::advance(MAX_GROUP_AGE + Duration::from_secs(1)).await;
902
903		// Seq 2 arrives late, triggering eviction.
904		producer.create_group(Group { sequence: 2 }).unwrap();
905
906		// Seq 5 is still max_sequence (protected, at front, blocks trim).
907		// Seq 3 is expired → tombstoned.
908		// Seq 2 is fresh → kept.
909		// VecDeque: [Some(5), None, Some(2)]. Leading entry is Some, so offset stays.
910		{
911			let state = producer.state.read();
912			assert_eq!(live_groups(&state), 2);
913			assert_eq!(state.offset, 0);
914			assert!(state.duplicates.contains(&5));
915			assert!(!state.duplicates.contains(&3));
916			assert!(state.duplicates.contains(&2));
917		}
918
919		// Consumer should still be able to read through the hole.
920		let mut consumer = producer.consume();
921		let group = consumer.assert_group();
922		// consume() starts at index 0, first non-tombstoned group is seq 5.
923		assert_eq!(group.sequence, 5);
924	}
925
926	#[tokio::test]
927	async fn abort_clears_cached_groups() {
928		let mut producer = Track::new("test").produce();
929		producer.append_group().unwrap();
930		producer.append_group().unwrap();
931
932		// A stale consumer that never drains must not pin the cached groups.
933		let mut consumer = producer.consume();
934		assert_eq!(live_groups(&producer.state.read()), 2);
935
936		producer.abort(Error::Cancel).unwrap();
937
938		{
939			let state = producer.state.read();
940			assert!(state.groups.is_empty(), "cached groups should be dropped on abort");
941			assert!(state.duplicates.is_empty());
942		}
943
944		// The consumer now surfaces the abort error rather than the leftover cache.
945		let result = consumer.recv_group().now_or_never().expect("should not block");
946		assert!(matches!(result, Err(Error::Cancel)));
947	}
948
949	#[tokio::test]
950	async fn drop_unfinished_clears_cached_groups() {
951		let producer = Track::new("test").produce();
952		let mut writer = producer.clone();
953		writer.append_group().unwrap();
954
955		// A stale consumer keeps the channel (and thus the cache) alive.
956		let mut consumer = producer.consume();
957		assert_eq!(live_groups(&producer.state.read()), 1);
958
959		// Drop every producer without finishing: the cache is released.
960		drop(writer);
961		drop(producer);
962
963		let result = consumer.recv_group().now_or_never().expect("should not block");
964		assert!(matches!(result, Err(Error::Dropped)));
965	}
966
967	#[tokio::test]
968	async fn drop_finished_keeps_cached_groups() {
969		let mut producer = Track::new("test").produce();
970		producer.append_group().unwrap();
971		producer.finish().unwrap();
972
973		let mut consumer = producer.consume();
974		drop(producer);
975
976		// A cleanly finished track keeps its cache so the consumer can still drain.
977		assert_eq!(consumer.assert_group().sequence, 0);
978		let done = consumer.recv_group().now_or_never().expect("should not block").unwrap();
979		assert!(done.is_none(), "consumer should drain then see clean finish");
980	}
981
982	#[test]
983	fn append_finish_cannot_be_rewritten() {
984		let mut producer = Track::new("test").produce();
985
986		// Finishing an empty track is valid (fin = 0, total groups = 0).
987		assert!(producer.finish().is_ok());
988		assert!(producer.finish().is_err());
989		assert!(producer.append_group().is_err());
990	}
991
992	#[test]
993	fn finish_after_groups() {
994		let mut producer = Track::new("test").produce();
995
996		producer.append_group().unwrap();
997		assert!(producer.finish().is_ok());
998		assert!(producer.finish().is_err());
999		assert!(producer.append_group().is_err());
1000	}
1001
1002	#[test]
1003	fn insert_finish_validates_sequence_and_freezes_to_max() {
1004		let mut producer = Track::new("test").produce();
1005		producer.create_group(Group { sequence: 5 }).unwrap();
1006
1007		assert!(producer.finish_at(4).is_err());
1008		assert!(producer.finish_at(10).is_err());
1009		assert!(producer.finish_at(5).is_ok());
1010
1011		{
1012			let state = producer.state.read();
1013			assert_eq!(state.final_sequence, Some(6));
1014		}
1015
1016		assert!(producer.finish_at(5).is_err());
1017		assert!(producer.create_group(Group { sequence: 4 }).is_ok());
1018		assert!(producer.create_group(Group { sequence: 5 }).is_err());
1019	}
1020
1021	#[tokio::test]
1022	async fn recv_group_finishes_without_waiting_for_gaps() {
1023		let mut producer = Track::new("test").produce();
1024		producer.create_group(Group { sequence: 1 }).unwrap();
1025		producer.finish_at(1).unwrap();
1026
1027		let mut consumer = producer.consume();
1028		assert_eq!(consumer.assert_group().sequence, 1);
1029
1030		let done = consumer
1031			.recv_group()
1032			.now_or_never()
1033			.expect("should not block")
1034			.expect("would have errored");
1035		assert!(done.is_none(), "track should finish without waiting for gaps");
1036	}
1037
1038	#[tokio::test]
1039	async fn next_group_skips_late_arrivals() {
1040		let mut producer = Track::new("test").produce();
1041		let mut consumer = producer.consume();
1042
1043		// Seq 5 arrives first.
1044		producer.create_group(Group { sequence: 5 }).unwrap();
1045		let group = consumer
1046			.next_group()
1047			.now_or_never()
1048			.expect("should not block")
1049			.expect("would have errored")
1050			.expect("track should not be closed");
1051		assert_eq!(group.sequence, 5);
1052
1053		// Seq 3 arrives late — skipped because 3 <= 5.
1054		producer.create_group(Group { sequence: 3 }).unwrap();
1055		// Seq 4 arrives late — also skipped.
1056		producer.create_group(Group { sequence: 4 }).unwrap();
1057		// Seq 7 arrives — returned.
1058		producer.create_group(Group { sequence: 7 }).unwrap();
1059
1060		let group = consumer
1061			.next_group()
1062			.now_or_never()
1063			.expect("should not block")
1064			.expect("would have errored")
1065			.expect("track should not be closed");
1066		assert_eq!(group.sequence, 7);
1067
1068		// No more groups — would block.
1069		assert!(
1070			consumer.next_group().now_or_never().is_none(),
1071			"should block waiting for a higher sequence"
1072		);
1073	}
1074
1075	#[tokio::test]
1076	async fn next_group_returns_arrivals_in_order() {
1077		let mut producer = Track::new("test").produce();
1078		let mut consumer = producer.consume();
1079
1080		// Seq 3 arrives first, then seq 5 — both should be returned in arrival order.
1081		producer.create_group(Group { sequence: 3 }).unwrap();
1082		producer.create_group(Group { sequence: 5 }).unwrap();
1083
1084		let group = consumer
1085			.next_group()
1086			.now_or_never()
1087			.expect("should not block")
1088			.expect("would have errored")
1089			.expect("track should not be closed");
1090		assert_eq!(group.sequence, 3);
1091
1092		let group = consumer
1093			.next_group()
1094			.now_or_never()
1095			.expect("should not block")
1096			.expect("would have errored")
1097			.expect("track should not be closed");
1098		assert_eq!(group.sequence, 5);
1099	}
1100
1101	#[tokio::test]
1102	async fn recv_group_after_next_group_sees_late_arrivals() {
1103		let mut producer = Track::new("test").produce();
1104		let mut consumer = producer.consume();
1105
1106		producer.create_group(Group { sequence: 5 }).unwrap();
1107		producer.create_group(Group { sequence: 3 }).unwrap();
1108
1109		// Ordered returns seq 5 and advances its internal cursor past it.
1110		let group = consumer
1111			.next_group()
1112			.now_or_never()
1113			.expect("should not block")
1114			.expect("would have errored")
1115			.expect("track should not be closed");
1116		assert_eq!(group.sequence, 5);
1117
1118		// Intermixing: recv_group on the same consumer still returns the late seq 3.
1119		// The ordered cursor is separate from the recv_group filter.
1120		assert_eq!(consumer.assert_group().sequence, 3);
1121	}
1122
1123	#[tokio::test]
1124	async fn read_frame_returns_single_frame_per_group() {
1125		let mut producer = Track::new("test").produce();
1126		let mut consumer = producer.consume();
1127
1128		producer.write_frame(b"hello".as_slice()).unwrap();
1129		producer.write_frame(b"world".as_slice()).unwrap();
1130
1131		let frame = consumer
1132			.read_frame()
1133			.now_or_never()
1134			.expect("should not block")
1135			.expect("would have errored")
1136			.expect("track should not be closed");
1137		assert_eq!(&frame[..], b"hello");
1138
1139		let frame = consumer
1140			.read_frame()
1141			.now_or_never()
1142			.expect("should not block")
1143			.expect("would have errored")
1144			.expect("track should not be closed");
1145		assert_eq!(&frame[..], b"world");
1146	}
1147
1148	#[tokio::test]
1149	async fn read_frame_skips_stalled_group_for_newer_ready_frame() {
1150		let mut producer = Track::new("test").produce();
1151		let mut consumer = producer.consume();
1152
1153		// Seq 3: group open, no frame yet (stalled).
1154		let _stalled = producer.create_group(Group { sequence: 3 }).unwrap();
1155		// Seq 5: fully-written group with a frame.
1156		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1157		g5.write_frame(bytes::Bytes::from_static(b"later")).unwrap();
1158		g5.finish().unwrap();
1159
1160		// read_frame should not block on the stalled seq 3 — it returns seq 5's frame.
1161		let frame = consumer
1162			.read_frame()
1163			.now_or_never()
1164			.expect("should not block on stalled earlier group")
1165			.expect("would have errored")
1166			.expect("track should not be closed");
1167		assert_eq!(&frame[..], b"later");
1168	}
1169
1170	#[tokio::test]
1171	async fn read_frame_discards_rest_of_multi_frame_group() {
1172		let mut producer = Track::new("test").produce();
1173		let mut consumer = producer.consume();
1174
1175		// Group 0 has two frames; only the first is returned.
1176		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1177		g0.write_frame(bytes::Bytes::from_static(b"one")).unwrap();
1178		g0.write_frame(bytes::Bytes::from_static(b"two")).unwrap();
1179		g0.finish().unwrap();
1180
1181		// Group 1 is a normal single-frame group.
1182		producer.write_frame(b"next".as_slice()).unwrap();
1183
1184		let frame = consumer
1185			.read_frame()
1186			.now_or_never()
1187			.expect("should not block")
1188			.expect("would have errored")
1189			.expect("track should not be closed");
1190		assert_eq!(&frame[..], b"one");
1191
1192		// The second frame of group 0 is discarded; the next read jumps to group 1.
1193		let frame = consumer
1194			.read_frame()
1195			.now_or_never()
1196			.expect("should not block")
1197			.expect("would have errored")
1198			.expect("track should not be closed");
1199		assert_eq!(&frame[..], b"next");
1200	}
1201
1202	#[tokio::test]
1203	async fn read_frame_waits_for_pending_group_after_finish() {
1204		// finish() sets final_sequence, but groups already created with lower sequences
1205		// can still produce frames. read_frame must not return None prematurely.
1206		let mut producer = Track::new("test").produce();
1207		let mut consumer = producer.consume();
1208
1209		let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1210		producer.finish().unwrap();
1211
1212		// Track is finished but group 0 has no frame yet — must block, not return None.
1213		assert!(
1214			consumer.read_frame().now_or_never().is_none(),
1215			"read_frame must block on a pending group even after finish()"
1216		);
1217
1218		// A late frame on the pending group is still delivered.
1219		g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1220		let frame = consumer
1221			.read_frame()
1222			.now_or_never()
1223			.expect("should not block once a frame is written")
1224			.expect("would have errored")
1225			.expect("track should not be closed");
1226		assert_eq!(&frame[..], b"late");
1227	}
1228
1229	#[tokio::test]
1230	async fn read_frame_respects_start_at() {
1231		// start_at sets min_sequence; read_frame must skip groups below it even though
1232		// next_sequence is still 0.
1233		let mut producer = Track::new("test").produce();
1234		let mut consumer = producer.consume();
1235		consumer.start_at(5);
1236
1237		// Seq 3 has a frame but is below min_sequence — must be skipped.
1238		let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1239		g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1240		g3.finish().unwrap();
1241
1242		let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1243		g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1244		g5.finish().unwrap();
1245
1246		let frame = consumer
1247			.read_frame()
1248			.now_or_never()
1249			.expect("should not block")
1250			.expect("would have errored")
1251			.expect("track should not be closed");
1252		assert_eq!(&frame[..], b"keep");
1253	}
1254
1255	#[tokio::test]
1256	async fn read_frame_returns_none_when_finished() {
1257		let mut producer = Track::new("test").produce();
1258		let mut consumer = producer.consume();
1259
1260		producer.write_frame(b"only".as_slice()).unwrap();
1261		producer.finish().unwrap();
1262
1263		let frame = consumer
1264			.read_frame()
1265			.now_or_never()
1266			.expect("should not block")
1267			.expect("would have errored")
1268			.expect("track should not be closed");
1269		assert_eq!(&frame[..], b"only");
1270
1271		let done = consumer
1272			.read_frame()
1273			.now_or_never()
1274			.expect("should not block")
1275			.expect("would have errored");
1276		assert!(done.is_none());
1277	}
1278
1279	#[tokio::test]
1280	async fn get_group_finishes_without_waiting_for_gaps() {
1281		let mut producer = Track::new("test").produce();
1282		producer.create_group(Group { sequence: 1 }).unwrap();
1283		producer.finish_at(1).unwrap();
1284
1285		let consumer = producer.consume();
1286		// get_group(0) blocks because group 0 is below final_sequence and could still arrive.
1287		assert!(
1288			consumer.get_group(0).now_or_never().is_none(),
1289			"sequence below fin should block (group could still arrive)"
1290		);
1291		assert!(
1292			consumer
1293				.get_group(2)
1294				.now_or_never()
1295				.expect("sequence at-or-after fin should resolve")
1296				.expect("should not error")
1297				.is_none(),
1298			"sequence at-or-after fin should not exist"
1299		);
1300	}
1301
1302	#[test]
1303	fn append_group_returns_bounds_exceeded_on_sequence_overflow() {
1304		let mut producer = Track::new("test").produce();
1305		{
1306			let mut state = producer.state.write().ok().unwrap();
1307			state.max_sequence = Some(u64::MAX);
1308		}
1309
1310		assert!(matches!(producer.append_group(), Err(Error::BoundsExceeded(_))));
1311	}
1312}