Skip to main content

moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{collections::VecDeque, future::Future};
22
23const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30);
24
25/// A track is a collection of groups, delivered out-of-order until expired.
26#[derive(Clone, Debug, PartialEq, Eq)]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28pub struct Track {
29	pub name: String,
30	pub priority: u8,
31}
32
33impl Track {
34	pub fn new<T: Into<String>>(name: T) -> Self {
35		Self {
36			name: name.into(),
37			priority: 0,
38		}
39	}
40
41	pub fn produce(self) -> TrackProducer {
42		TrackProducer::new(self)
43	}
44}
45
46#[derive(Default)]
47struct TrackState {
48	groups: VecDeque<(tokio::time::Instant, GroupConsumer)>,
49	closed: Option<Result<()>>,
50	offset: usize,
51
52	max_sequence: Option<u64>,
53
54	// The largest sequence number that has been dropped.
55	drop_sequence: Option<u64>,
56}
57
58impl TrackState {
59	fn trim(&mut self, now: tokio::time::Instant) {
60		while let Some((timestamp, _)) = self.groups.front() {
61			if now.saturating_duration_since(*timestamp) > MAX_CACHE {
62				let (_, group) = self.groups.pop_front().unwrap();
63				self.drop_sequence = Some(self.drop_sequence.unwrap_or(0).max(group.info.sequence));
64				self.offset += 1;
65			} else {
66				break;
67			}
68		}
69	}
70}
71
72/// A producer for a track, used to create new groups.
73#[derive(Clone)]
74pub struct TrackProducer {
75	pub info: Track,
76	state: watch::Sender<TrackState>,
77}
78
79impl TrackProducer {
80	pub fn new(info: Track) -> Self {
81		Self {
82			info,
83			state: Default::default(),
84		}
85	}
86
87	/// Insert a group into the track, returning true if this is the latest group.
88	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
89		self.state.send_if_modified(|state| {
90			assert!(state.closed.is_none());
91			let now = tokio::time::Instant::now();
92			state.trim(now);
93			state.groups.push_back((now, group.clone()));
94			state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
95			true
96		})
97	}
98
99	/// Create a new group with the given sequence number.
100	///
101	/// If the sequence number is not the latest, this method will return None.
102	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
103		let group = info.produce();
104		self.insert_group(group.consume()).then_some(group)
105	}
106
107	/// Create a new group with the next sequence number.
108	pub fn append_group(&mut self) -> GroupProducer {
109		let mut producer = None;
110
111		self.state.send_if_modified(|state| {
112			assert!(state.closed.is_none());
113
114			let now = tokio::time::Instant::now();
115			state.trim(now);
116
117			let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1);
118			let group = Group { sequence }.produce();
119			state.groups.push_back((now, group.consume()));
120			state.max_sequence = Some(sequence);
121
122			producer = Some(group);
123			true
124		});
125
126		producer.unwrap()
127	}
128
129	/// Create a group with a single frame.
130	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
131		let mut group = self.append_group();
132		group.write_frame(frame.into());
133		group.close();
134	}
135
136	pub fn close(self) {
137		self.state.send_modify(|state| state.closed = Some(Ok(())));
138	}
139
140	pub fn abort(self, err: Error) {
141		self.state.send_modify(|state| state.closed = Some(Err(err)));
142	}
143
144	/// Create a new consumer for the track.
145	pub fn consume(&self) -> TrackConsumer {
146		let state = self.state.borrow();
147		TrackConsumer {
148			info: self.info.clone(),
149			state: self.state.subscribe(),
150			// Start at the latest group
151			index: state.offset + state.groups.len().saturating_sub(1),
152		}
153	}
154
155	/// Block until there are no active consumers.
156	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
157		let state = self.state.clone();
158		async move {
159			state.closed().await;
160		}
161	}
162
163	/// Return true if the track has been closed or aborted.
164	pub fn is_closed(&self) -> bool {
165		self.state.borrow().closed.is_some()
166	}
167
168	/// Return true if this is the same track.
169	pub fn is_clone(&self, other: &Self) -> bool {
170		self.state.same_channel(&other.state)
171	}
172}
173
174impl From<Track> for TrackProducer {
175	fn from(info: Track) -> Self {
176		TrackProducer::new(info)
177	}
178}
179
180/// A consumer for a track, used to read groups.
181#[derive(Clone)]
182pub struct TrackConsumer {
183	pub info: Track,
184	state: watch::Receiver<TrackState>,
185	index: usize,
186}
187
188impl TrackConsumer {
189	/// Return the next group in order.
190	///
191	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
192	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
193		// Wait until there's a new latest group or the track is closed.
194		let Ok(state) = self
195			.state
196			.wait_for(|state| {
197				let index = self.index.saturating_sub(state.offset);
198				state.groups.get(index).is_some() || state.closed.is_some()
199			})
200			.await
201		else {
202			return Err(Error::Cancel);
203		};
204
205		let index = self.index.saturating_sub(state.offset);
206		if let Some(group) = state.groups.get(index) {
207			self.index = state.offset + index + 1;
208			return Ok(Some(group.1.clone()));
209		}
210
211		match &state.closed {
212			Some(Ok(_)) => Ok(None),
213			Some(Err(err)) => Err(err.clone()),
214			_ => unreachable!(),
215		}
216	}
217
218	/// Block until the group is available.
219	///
220	/// NOTE: This can block indefinitely if the requested group is dropped.
221	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
222		let mut state = self.state.clone();
223
224		let Ok(state) = state
225			.wait_for(|state| {
226				if state.closed.is_some() {
227					return true;
228				}
229
230				if let Some(drop_sequence) = state.drop_sequence
231					&& drop_sequence >= sequence
232				{
233					return true;
234				}
235
236				state.groups.iter().any(|(_, group)| group.info.sequence == sequence)
237			})
238			.await
239		else {
240			return Err(Error::Cancel);
241		};
242
243		if let Some((_, group)) = state.groups.iter().find(|(_, group)| group.info.sequence == sequence) {
244			return Ok(Some(group.clone()));
245		}
246
247		match &state.closed {
248			Some(Ok(_)) => Ok(None), // end of stream
249			Some(Err(err)) => Err(err.clone()),
250			None => Ok(None), // Dropped
251		}
252	}
253
254	/// Block until the track is closed.
255	pub async fn closed(&self) -> Result<()> {
256		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
257			Ok(state) => state.closed.clone().unwrap(),
258			Err(_) => Err(Error::Cancel),
259		}
260	}
261
262	pub fn is_clone(&self, other: &Self) -> bool {
263		self.state.same_channel(&other.state)
264	}
265}
266
267#[cfg(test)]
268use futures::FutureExt;
269
270#[cfg(test)]
271impl TrackConsumer {
272	pub fn assert_group(&mut self) -> GroupConsumer {
273		self.next_group()
274			.now_or_never()
275			.expect("group would have blocked")
276			.expect("would have errored")
277			.expect("track was closed")
278	}
279
280	pub fn assert_no_group(&mut self) {
281		assert!(
282			self.next_group().now_or_never().is_none(),
283			"next group would not have blocked"
284		);
285	}
286
287	pub fn assert_not_closed(&self) {
288		assert!(self.closed().now_or_never().is_none(), "should not be closed");
289	}
290
291	pub fn assert_closed(&self) {
292		assert!(self.closed().now_or_never().is_some(), "should be closed");
293	}
294
295	// TODO assert specific errors after implementing PartialEq
296	pub fn assert_error(&self) {
297		assert!(
298			self.closed().now_or_never().expect("should not block").is_err(),
299			"should be error"
300		);
301	}
302
303	pub fn assert_is_clone(&self, other: &Self) {
304		assert!(self.is_clone(other), "should be clone");
305	}
306
307	pub fn assert_not_clone(&self, other: &Self) {
308		assert!(!self.is_clone(other), "should not be clone");
309	}
310}