Skip to main content

moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [TrackProducer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [TrackConsumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{collections::VecDeque, future::Future};
22
23const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27pub struct Track {
28	pub name: String,
29	pub priority: u8,
30}
31
32impl Track {
33	pub fn new<T: Into<String>>(name: T) -> Self {
34		Self {
35			name: name.into(),
36			priority: 0,
37		}
38	}
39
40	pub fn produce(self) -> TrackProducer {
41		TrackProducer::new(self)
42	}
43}
44
45#[derive(Default)]
46struct TrackState {
47	groups: VecDeque<(tokio::time::Instant, GroupConsumer)>,
48	closed: Option<Result<()>>,
49	offset: usize,
50
51	max_sequence: Option<u64>,
52
53	// The largest sequence number that has been dropped.
54	drop_sequence: Option<u64>,
55}
56
57impl TrackState {
58	fn trim(&mut self, now: tokio::time::Instant) {
59		while let Some((timestamp, _)) = self.groups.front() {
60			if now.saturating_duration_since(*timestamp) > MAX_CACHE {
61				let (_, group) = self.groups.pop_front().unwrap();
62				self.drop_sequence = Some(self.drop_sequence.unwrap_or(0).max(group.info.sequence));
63				self.offset += 1;
64			} else {
65				break;
66			}
67		}
68	}
69}
70
71/// A producer for a track, used to create new groups.
72#[derive(Clone)]
73pub struct TrackProducer {
74	pub info: Track,
75	state: watch::Sender<TrackState>,
76}
77
78impl TrackProducer {
79	pub fn new(info: Track) -> Self {
80		Self {
81			info,
82			state: Default::default(),
83		}
84	}
85
86	/// Insert a group into the track, returning true if this is the latest group.
87	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
88		self.state.send_if_modified(|state| {
89			assert!(state.closed.is_none());
90			let now = tokio::time::Instant::now();
91			state.trim(now);
92			state.groups.push_back((now, group.clone()));
93			state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
94			true
95		})
96	}
97
98	/// Create a new group with the given sequence number.
99	///
100	/// If the sequence number is not the latest, this method will return None.
101	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
102		let group = info.produce();
103		self.insert_group(group.consume()).then_some(group)
104	}
105
106	/// Create a new group with the next sequence number.
107	pub fn append_group(&mut self) -> GroupProducer {
108		let mut producer = None;
109
110		self.state.send_if_modified(|state| {
111			assert!(state.closed.is_none());
112
113			let now = tokio::time::Instant::now();
114			state.trim(now);
115
116			let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1);
117			let group = Group { sequence }.produce();
118			state.groups.push_back((now, group.consume()));
119			state.max_sequence = Some(sequence);
120
121			producer = Some(group);
122			true
123		});
124
125		producer.unwrap()
126	}
127
128	/// Create a group with a single frame.
129	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
130		let mut group = self.append_group();
131		group.write_frame(frame.into());
132		group.close();
133	}
134
135	pub fn close(self) {
136		self.state.send_modify(|state| state.closed = Some(Ok(())));
137	}
138
139	pub fn abort(self, err: Error) {
140		self.state.send_modify(|state| state.closed = Some(Err(err)));
141	}
142
143	/// Create a new consumer for the track.
144	pub fn consume(&self) -> TrackConsumer {
145		let state = self.state.borrow();
146		TrackConsumer {
147			info: self.info.clone(),
148			state: self.state.subscribe(),
149			// Start at the latest group
150			index: state.offset + state.groups.len().saturating_sub(1),
151		}
152	}
153
154	/// Block until there are no active consumers.
155	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
156		let state = self.state.clone();
157		async move {
158			state.closed().await;
159		}
160	}
161
162	/// Return true if this is the same track.
163	pub fn is_clone(&self, other: &Self) -> bool {
164		self.state.same_channel(&other.state)
165	}
166}
167
168impl From<Track> for TrackProducer {
169	fn from(info: Track) -> Self {
170		TrackProducer::new(info)
171	}
172}
173
174/// A consumer for a track, used to read groups.
175#[derive(Clone)]
176pub struct TrackConsumer {
177	pub info: Track,
178	state: watch::Receiver<TrackState>,
179	index: usize,
180}
181
182impl TrackConsumer {
183	/// Return the next group in order.
184	///
185	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
186	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
187		// Wait until there's a new latest group or the track is closed.
188		let Ok(state) = self
189			.state
190			.wait_for(|state| {
191				let index = self.index.saturating_sub(state.offset);
192				state.groups.get(index).is_some() || state.closed.is_some()
193			})
194			.await
195		else {
196			return Err(Error::Cancel);
197		};
198
199		let index = self.index.saturating_sub(state.offset);
200		if let Some(group) = state.groups.get(index) {
201			self.index = state.offset + index + 1;
202			return Ok(Some(group.1.clone()));
203		}
204
205		match &state.closed {
206			Some(Ok(_)) => Ok(None),
207			Some(Err(err)) => Err(err.clone()),
208			_ => unreachable!(),
209		}
210	}
211
212	/// Block until the group is available.
213	///
214	/// NOTE: This can block indefinitely if the requested group is dropped.
215	pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
216		let mut state = self.state.clone();
217
218		let Ok(state) = state
219			.wait_for(|state| {
220				if state.closed.is_some() {
221					return true;
222				}
223
224				if let Some(drop_sequence) = state.drop_sequence
225					&& drop_sequence >= sequence
226				{
227					return true;
228				}
229
230				state.groups.iter().any(|(_, group)| group.info.sequence == sequence)
231			})
232			.await
233		else {
234			return Err(Error::Cancel);
235		};
236
237		if let Some((_, group)) = state.groups.iter().find(|(_, group)| group.info.sequence == sequence) {
238			return Ok(Some(group.clone()));
239		}
240
241		match &state.closed {
242			Some(Ok(_)) => Ok(None), // end of stream
243			Some(Err(err)) => Err(err.clone()),
244			None => Ok(None), // Dropped
245		}
246	}
247
248	/// Block until the track is closed.
249	pub async fn closed(&self) -> Result<()> {
250		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
251			Ok(state) => state.closed.clone().unwrap(),
252			Err(_) => Err(Error::Cancel),
253		}
254	}
255
256	pub fn is_clone(&self, other: &Self) -> bool {
257		self.state.same_channel(&other.state)
258	}
259}
260
261#[cfg(test)]
262use futures::FutureExt;
263
264#[cfg(test)]
265impl TrackConsumer {
266	pub fn assert_group(&mut self) -> GroupConsumer {
267		self.next_group()
268			.now_or_never()
269			.expect("group would have blocked")
270			.expect("would have errored")
271			.expect("track was closed")
272	}
273
274	pub fn assert_no_group(&mut self) {
275		assert!(
276			self.next_group().now_or_never().is_none(),
277			"next group would not have blocked"
278		);
279	}
280
281	pub fn assert_not_closed(&self) {
282		assert!(self.closed().now_or_never().is_none(), "should not be closed");
283	}
284
285	pub fn assert_closed(&self) {
286		assert!(self.closed().now_or_never().is_some(), "should be closed");
287	}
288
289	// TODO assert specific errors after implementing PartialEq
290	pub fn assert_error(&self) {
291		assert!(
292			self.closed().now_or_never().expect("should not block").is_err(),
293			"should be error"
294		);
295	}
296
297	pub fn assert_is_clone(&self, other: &Self) {
298		assert!(self.is_clone(other), "should be clone");
299	}
300
301	pub fn assert_not_clone(&self, other: &Self) {
302		assert!(!self.is_clone(other), "should not be clone");
303	}
304}