moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [Producer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [Consumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26	pub name: String,
27	pub priority: u8,
28}
29
30impl Track {
31	pub fn new<T: Into<String>>(name: T) -> Self {
32		Self {
33			name: name.into(),
34			priority: 0,
35		}
36	}
37
38	pub fn produce(self) -> TrackProducer {
39		TrackProducer::new(self)
40	}
41}
42
43#[derive(Default)]
44struct TrackState {
45	latest: Option<GroupConsumer>,
46	closed: Option<Result<()>>,
47}
48
49/// A producer for a track, used to create new groups.
50#[derive(Clone)]
51pub struct TrackProducer {
52	pub info: Track,
53	state: watch::Sender<TrackState>,
54}
55
56impl TrackProducer {
57	pub fn new(info: Track) -> Self {
58		Self {
59			info,
60			state: Default::default(),
61		}
62	}
63
64	/// Insert a group into the track, returning true if this is the latest group.
65	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
66		self.state.send_if_modified(|state| {
67			assert!(state.closed.is_none());
68
69			if let Some(latest) = &state.latest {
70				match group.info.cmp(&latest.info) {
71					Ordering::Less => return false,
72					Ordering::Equal => return false,
73					Ordering::Greater => (),
74				}
75			}
76
77			state.latest = Some(group.clone());
78			true
79		})
80	}
81
82	/// Create a new group with the given sequence number.
83	///
84	/// If the sequence number is not the latest, this method will return None.
85	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
86		let group = GroupProducer::new(info);
87		self.insert_group(group.consume()).then_some(group)
88	}
89
90	/// Create a new group with the next sequence number.
91	pub fn append_group(&mut self) -> GroupProducer {
92		// TODO remove this extra lock
93		let sequence = self
94			.state
95			.borrow()
96			.latest
97			.as_ref()
98			.map_or(0, |group| group.info.sequence + 1);
99
100		let group = Group { sequence };
101		self.create_group(group).unwrap()
102	}
103
104	pub fn finish(self) {
105		self.state.send_modify(|state| state.closed = Some(Ok(())));
106	}
107
108	pub fn abort(self, err: Error) {
109		self.state.send_modify(|state| state.closed = Some(Err(err)));
110	}
111
112	/// Create a new consumer for the track.
113	pub fn consume(&self) -> TrackConsumer {
114		TrackConsumer {
115			info: self.info.clone(),
116			state: self.state.subscribe(),
117			prev: None,
118		}
119	}
120
121	/// Block until there are no active consumers.
122	pub fn unused(&self) -> impl Future<Output = ()> {
123		let state = self.state.clone();
124		async move {
125			state.closed().await;
126		}
127	}
128
129	/// Return true if this is the same track.
130	pub fn is_clone(&self, other: &Self) -> bool {
131		self.state.same_channel(&other.state)
132	}
133}
134
135impl From<Track> for TrackProducer {
136	fn from(info: Track) -> Self {
137		TrackProducer::new(info)
138	}
139}
140
141/// A consumer for a track, used to read groups.
142#[derive(Clone)]
143pub struct TrackConsumer {
144	pub info: Track,
145	state: watch::Receiver<TrackState>,
146	prev: Option<u64>, // The previous sequence number
147}
148
149impl TrackConsumer {
150	/// Return the next group in order.
151	///
152	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
153	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
154		// Wait until there's a new latest group or the track is closed.
155		let state = match self
156			.state
157			.wait_for(|state| {
158				state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
159			})
160			.await
161		{
162			Ok(state) => state,
163			Err(_) => return Err(Error::Cancel),
164		};
165
166		match &state.closed {
167			Some(Ok(_)) => return Ok(None),
168			Some(Err(err)) => return Err(err.clone()),
169			_ => {}
170		}
171
172		// If there's a new latest group, return it.
173		let group = state.latest.clone().unwrap();
174		self.prev = Some(group.info.sequence);
175
176		Ok(Some(group))
177	}
178
179	/// Block until the track is closed.
180	pub async fn closed(&self) -> Result<()> {
181		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
182			Ok(state) => state.closed.clone().unwrap(),
183			Err(_) => Err(Error::Cancel),
184		}
185	}
186
187	pub fn is_clone(&self, other: &Self) -> bool {
188		self.state.same_channel(&other.state)
189	}
190}
191
192#[cfg(test)]
193use futures::FutureExt;
194
195#[cfg(test)]
196impl TrackConsumer {
197	pub fn assert_group(&mut self) -> GroupConsumer {
198		self.next_group()
199			.now_or_never()
200			.expect("group would have blocked")
201			.expect("would have errored")
202			.expect("track was closed")
203	}
204
205	pub fn assert_no_group(&mut self) {
206		assert!(
207			self.next_group().now_or_never().is_none(),
208			"next group would not have blocked"
209		);
210	}
211
212	pub fn assert_not_closed(&self) {
213		assert!(self.closed().now_or_never().is_none(), "should not be closed");
214	}
215
216	pub fn assert_closed(&self) {
217		assert!(self.closed().now_or_never().is_some(), "should be closed");
218	}
219
220	// TODO assert specific errors after implementing PartialEq
221	pub fn assert_error(&self) {
222		assert!(
223			self.closed().now_or_never().expect("should not block").is_err(),
224			"should be error"
225		);
226	}
227
228	pub fn assert_is_clone(&self, other: &Self) {
229		assert!(self.is_clone(other), "should be clone");
230	}
231
232	pub fn assert_not_clone(&self, other: &Self) {
233		assert!(!self.is_clone(other), "should not be clone");
234	}
235}