moq_transfork/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [Producer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [Consumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use super::{Group, GroupConsumer, GroupProducer};
18pub use crate::message::GroupOrder;
19use crate::Error;
20
21use std::{cmp::Ordering, ops, sync::Arc};
22
23/// A track, a collection of indepedent groups (streams) with a specified order/priority.
24#[derive(Clone, PartialEq, Eq, Debug)]
25pub struct Track {
26	/// The path of the track.
27	pub path: String,
28
29	/// The priority of the track, relative to other tracks in the same session/broadcast.
30	pub priority: i8,
31
32	/// The preferred order to deliver groups in the track.
33	pub order: GroupOrder,
34}
35
36impl Track {
37	pub fn new<S: ToString>(path: S) -> Self {
38		Self {
39			path: path.to_string(),
40			..Default::default()
41		}
42	}
43
44	pub fn build() -> TrackBuilder {
45		TrackBuilder::new()
46	}
47
48	pub fn produce(self) -> (TrackProducer, TrackConsumer) {
49		let (send, recv) = watch::channel(TrackState::default());
50		let info = Arc::new(self);
51
52		let writer = TrackProducer::new(send, info.clone());
53		let reader = TrackConsumer::new(recv, info);
54
55		(writer, reader)
56	}
57}
58
59impl Default for Track {
60	fn default() -> Self {
61		Self {
62			path: Default::default(),
63			priority: 0,
64			order: GroupOrder::Desc,
65		}
66	}
67}
68
69/// Build a track with optional parameters.
70pub struct TrackBuilder {
71	track: Track,
72}
73
74impl Default for TrackBuilder {
75	fn default() -> Self {
76		Self::new()
77	}
78}
79
80impl TrackBuilder {
81	pub fn new() -> Self {
82		Self {
83			track: Default::default(),
84		}
85	}
86
87	pub fn path<T: ToString>(mut self, path: T) -> Self {
88		self.track.path = path.to_string();
89		self
90	}
91
92	pub fn priority(mut self, priority: i8) -> Self {
93		self.track.priority = priority;
94		self
95	}
96
97	pub fn group_order(mut self, order: GroupOrder) -> Self {
98		self.track.order = order;
99		self
100	}
101
102	pub fn produce(self) -> (TrackProducer, TrackConsumer) {
103		self.track.produce()
104	}
105
106	// I don't know why From isn't sufficient, but this prevents annoying Rust errors.
107	pub fn into(self) -> Track {
108		self.track
109	}
110}
111
112impl From<TrackBuilder> for Track {
113	fn from(builder: TrackBuilder) -> Self {
114		builder.track
115	}
116}
117
118#[derive(Debug)]
119struct TrackState {
120	latest: Option<GroupConsumer>,
121	closed: Result<(), Error>,
122}
123
124impl Default for TrackState {
125	fn default() -> Self {
126		Self {
127			latest: None,
128			closed: Ok(()),
129		}
130	}
131}
132
133/// A producer for a track, used to create new groups.
134#[derive(Clone, Debug)]
135pub struct TrackProducer {
136	pub info: Arc<Track>,
137	state: watch::Sender<TrackState>,
138}
139
140impl TrackProducer {
141	fn new(state: watch::Sender<TrackState>, info: Arc<Track>) -> Self {
142		Self { info, state }
143	}
144
145	/// Build a new group with the given sequence number.
146	pub fn create_group(&mut self, sequence: u64) -> GroupProducer {
147		let group = Group::new(sequence);
148		let (writer, reader) = group.produce();
149
150		self.state.send_if_modified(|state| {
151			if let Some(latest) = &state.latest {
152				match reader.sequence.cmp(&latest.sequence) {
153					Ordering::Less => return false,  // Not modified,
154					Ordering::Equal => return false, // TODO error?
155					Ordering::Greater => (),
156				}
157			}
158
159			state.latest = Some(reader);
160			true
161		});
162
163		writer
164	}
165
166	/// Build a new group with the next sequence number.
167	pub fn append_group(&mut self) -> GroupProducer {
168		// TODO remove this extra lock
169		let sequence = self
170			.state
171			.borrow()
172			.latest
173			.as_ref()
174			.map_or(0, |group| group.sequence + 1);
175
176		self.create_group(sequence)
177	}
178
179	/// Close the track with an error.
180	pub fn close(self, err: Error) {
181		self.state.send_modify(|state| {
182			state.closed = Err(err);
183		});
184	}
185
186	/// Create a new consumer for the track.
187	pub fn subscribe(&self) -> TrackConsumer {
188		TrackConsumer::new(self.state.subscribe(), self.info.clone())
189	}
190
191	/// Block until there are no active consumers.
192	pub async fn unused(&self) {
193		self.state.closed().await
194	}
195}
196
197impl ops::Deref for TrackProducer {
198	type Target = Track;
199
200	fn deref(&self) -> &Self::Target {
201		&self.info
202	}
203}
204
205/// A consumer for a track, used to read groups.
206#[derive(Clone, Debug)]
207pub struct TrackConsumer {
208	pub info: Arc<Track>,
209	state: watch::Receiver<TrackState>,
210	prev: Option<u64>, // The previous sequence number
211}
212
213impl TrackConsumer {
214	fn new(state: watch::Receiver<TrackState>, info: Arc<Track>) -> Self {
215		Self {
216			state,
217			info,
218			prev: None,
219		}
220	}
221
222	pub fn get_group(&self, sequence: u64) -> Result<GroupConsumer, Error> {
223		let state = self.state.borrow();
224
225		// TODO support more than just the latest group
226		if let Some(latest) = &state.latest {
227			if latest.sequence == sequence {
228				return Ok(latest.clone());
229			}
230		}
231
232		state.closed.clone()?;
233		Err(Error::NotFound)
234	}
235
236	// NOTE: This can return groups out of order.
237	// TODO obey order
238	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>, Error> {
239		// Wait until there's a new latest group or the track is closed.
240		let state = match self
241			.state
242			.wait_for(|state| state.latest.as_ref().map(|group| group.sequence) != self.prev || state.closed.is_err())
243			.await
244		{
245			Ok(state) => state,
246			Err(_) => return Ok(None),
247		};
248
249		// If there's a new latest group, return it.
250		if let Some(group) = state.latest.as_ref() {
251			if Some(group.sequence) != self.prev {
252				self.prev = Some(group.sequence);
253				return Ok(Some(group.clone()));
254			}
255		}
256
257		// Otherwise the track is closed.
258		Err(state.closed.clone().unwrap_err())
259	}
260
261	// Returns the largest group
262	pub fn latest_group(&self) -> u64 {
263		let state = self.state.borrow();
264		state.latest.as_ref().map(|group| group.sequence).unwrap_or_default()
265	}
266
267	pub async fn closed(&self) -> Result<(), Error> {
268		match self.state.clone().wait_for(|state| state.closed.is_err()).await {
269			Ok(state) => state.closed.clone(),
270			Err(_) => Ok(()),
271		}
272	}
273}
274
275impl ops::Deref for TrackConsumer {
276	type Target = Track;
277
278	fn deref(&self) -> &Self::Target {
279		&self.info
280	}
281}