moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [Producer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [Consumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Produce, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26	pub name: String,
27	pub priority: u8,
28}
29
30impl Track {
31	pub fn new<T: Into<String>>(name: T) -> Self {
32		Self {
33			name: name.into(),
34			priority: 0,
35		}
36	}
37
38	pub fn produce(self) -> Produce<TrackProducer, TrackConsumer> {
39		let producer = TrackProducer::new(self);
40		let consumer = producer.consume();
41		Produce { producer, consumer }
42	}
43}
44
45#[derive(Default)]
46struct TrackState {
47	latest: Option<GroupConsumer>,
48	closed: Option<Result<()>>,
49}
50
51/// A producer for a track, used to create new groups.
52#[derive(Clone)]
53pub struct TrackProducer {
54	pub info: Track,
55	state: watch::Sender<TrackState>,
56}
57
58impl TrackProducer {
59	fn new(info: Track) -> Self {
60		Self {
61			info,
62			state: Default::default(),
63		}
64	}
65
66	/// Insert a group into the track, returning true if this is the latest group.
67	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
68		self.state.send_if_modified(|state| {
69			assert!(state.closed.is_none());
70
71			if let Some(latest) = &state.latest {
72				match group.info.cmp(&latest.info) {
73					Ordering::Less => return false,
74					Ordering::Equal => return false,
75					Ordering::Greater => (),
76				}
77			}
78
79			state.latest = Some(group.clone());
80			true
81		})
82	}
83
84	/// Create a new group with the given sequence number.
85	///
86	/// If the sequence number is not the latest, this method will return None.
87	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
88		let group = Group::produce(info);
89		self.insert_group(group.consumer).then_some(group.producer)
90	}
91
92	/// Create a new group with the next sequence number.
93	pub fn append_group(&mut self) -> GroupProducer {
94		// TODO remove this extra lock
95		let sequence = self
96			.state
97			.borrow()
98			.latest
99			.as_ref()
100			.map_or(0, |group| group.info.sequence + 1);
101
102		let group = Group { sequence };
103		self.create_group(group).unwrap()
104	}
105
106	pub fn close(self) {
107		self.state.send_modify(|state| state.closed = Some(Ok(())));
108	}
109
110	pub fn abort(self, err: Error) {
111		self.state.send_modify(|state| state.closed = Some(Err(err)));
112	}
113
114	/// Create a new consumer for the track.
115	pub fn consume(&self) -> TrackConsumer {
116		TrackConsumer {
117			info: self.info.clone(),
118			state: self.state.subscribe(),
119			prev: None,
120		}
121	}
122
123	/// Block until there are no active consumers.
124	pub fn unused(&self) -> impl Future<Output = ()> {
125		let state = self.state.clone();
126		async move {
127			state.closed().await;
128		}
129	}
130
131	/// Return true if this is the same track.
132	pub fn is_clone(&self, other: &Self) -> bool {
133		self.state.same_channel(&other.state)
134	}
135}
136
137impl From<Track> for TrackProducer {
138	fn from(info: Track) -> Self {
139		TrackProducer::new(info)
140	}
141}
142
143/// A consumer for a track, used to read groups.
144#[derive(Clone)]
145pub struct TrackConsumer {
146	pub info: Track,
147	state: watch::Receiver<TrackState>,
148	prev: Option<u64>, // The previous sequence number
149}
150
151impl TrackConsumer {
152	/// Return the next group in order.
153	///
154	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
155	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
156		// Wait until there's a new latest group or the track is closed.
157		let state = match self
158			.state
159			.wait_for(|state| {
160				state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
161			})
162			.await
163		{
164			Ok(state) => state,
165			Err(_) => return Err(Error::Cancel),
166		};
167
168		match &state.closed {
169			Some(Ok(_)) => return Ok(None),
170			Some(Err(err)) => return Err(err.clone()),
171			_ => {}
172		}
173
174		// If there's a new latest group, return it.
175		let group = state.latest.clone().unwrap();
176		self.prev = Some(group.info.sequence);
177
178		Ok(Some(group))
179	}
180
181	/// Block until the track is closed.
182	pub async fn closed(&self) -> Result<()> {
183		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
184			Ok(state) => state.closed.clone().unwrap(),
185			Err(_) => Err(Error::Cancel),
186		}
187	}
188
189	pub fn is_clone(&self, other: &Self) -> bool {
190		self.state.same_channel(&other.state)
191	}
192}
193
194#[cfg(test)]
195use futures::FutureExt;
196
197#[cfg(test)]
198impl TrackConsumer {
199	pub fn assert_group(&mut self) -> GroupConsumer {
200		self.next_group()
201			.now_or_never()
202			.expect("group would have blocked")
203			.expect("would have errored")
204			.expect("track was closed")
205	}
206
207	pub fn assert_no_group(&mut self) {
208		assert!(
209			self.next_group().now_or_never().is_none(),
210			"next group would not have blocked"
211		);
212	}
213
214	pub fn assert_not_closed(&self) {
215		assert!(self.closed().now_or_never().is_none(), "should not be closed");
216	}
217
218	pub fn assert_closed(&self) {
219		assert!(self.closed().now_or_never().is_some(), "should be closed");
220	}
221
222	// TODO assert specific errors after implementing PartialEq
223	pub fn assert_error(&self) {
224		assert!(
225			self.closed().now_or_never().expect("should not block").is_err(),
226			"should be error"
227		);
228	}
229
230	pub fn assert_is_clone(&self, other: &Self) {
231		assert!(self.is_clone(other), "should be clone");
232	}
233
234	pub fn assert_not_clone(&self, other: &Self) {
235		assert!(!self.is_clone(other), "should not be clone");
236	}
237}