moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [Producer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [Consumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Produce, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26	pub name: String,
27	pub priority: u8,
28}
29
30impl Track {
31	pub fn new<T: Into<String>>(name: T) -> Self {
32		Self {
33			name: name.into(),
34			priority: 0,
35		}
36	}
37
38	pub fn produce(self) -> Produce<TrackProducer, TrackConsumer> {
39		let producer = TrackProducer::new(self);
40		let consumer = producer.consume();
41		Produce { producer, consumer }
42	}
43}
44
45#[derive(Default)]
46struct TrackState {
47	latest: Option<GroupConsumer>,
48	closed: Option<Result<()>>,
49}
50
51/// A producer for a track, used to create new groups.
52#[derive(Clone)]
53pub struct TrackProducer {
54	pub info: Track,
55	state: watch::Sender<TrackState>,
56}
57
58impl TrackProducer {
59	fn new(info: Track) -> Self {
60		Self {
61			info,
62			state: Default::default(),
63		}
64	}
65
66	/// Insert a group into the track, returning true if this is the latest group.
67	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
68		self.state.send_if_modified(|state| {
69			assert!(state.closed.is_none());
70
71			if let Some(latest) = &state.latest {
72				match group.info.cmp(&latest.info) {
73					Ordering::Less => return false,
74					Ordering::Equal => return false,
75					Ordering::Greater => (),
76				}
77			}
78
79			state.latest = Some(group.clone());
80			true
81		})
82	}
83
84	/// Create a new group with the given sequence number.
85	///
86	/// If the sequence number is not the latest, this method will return None.
87	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
88		let group = info.produce();
89		self.insert_group(group.consumer).then_some(group.producer)
90	}
91
92	/// Create a new group with the next sequence number.
93	pub fn append_group(&mut self) -> GroupProducer {
94		let mut producer = None;
95
96		self.state.send_if_modified(|state| {
97			assert!(state.closed.is_none());
98
99			let sequence = state.latest.as_ref().map_or(0, |group| group.info.sequence + 1);
100			let group = Group { sequence }.produce();
101			state.latest = Some(group.consumer);
102			producer = Some(group.producer);
103
104			true
105		});
106
107		producer.unwrap()
108	}
109
110	/// Create a group with a single frame.
111	pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
112		let mut group = self.append_group();
113		group.write_frame(frame.into());
114		group.close();
115	}
116
117	pub fn close(self) {
118		self.state.send_modify(|state| state.closed = Some(Ok(())));
119	}
120
121	pub fn abort(self, err: Error) {
122		self.state.send_modify(|state| state.closed = Some(Err(err)));
123	}
124
125	/// Create a new consumer for the track.
126	pub fn consume(&self) -> TrackConsumer {
127		TrackConsumer {
128			info: self.info.clone(),
129			state: self.state.subscribe(),
130			prev: None,
131		}
132	}
133
134	/// Block until there are no active consumers.
135	pub fn unused(&self) -> impl Future<Output = ()> {
136		let state = self.state.clone();
137		async move {
138			state.closed().await;
139		}
140	}
141
142	/// Return true if this is the same track.
143	pub fn is_clone(&self, other: &Self) -> bool {
144		self.state.same_channel(&other.state)
145	}
146}
147
148impl From<Track> for TrackProducer {
149	fn from(info: Track) -> Self {
150		TrackProducer::new(info)
151	}
152}
153
154/// A consumer for a track, used to read groups.
155#[derive(Clone)]
156pub struct TrackConsumer {
157	pub info: Track,
158	state: watch::Receiver<TrackState>,
159	prev: Option<u64>, // The previous sequence number
160}
161
162impl TrackConsumer {
163	/// Return the next group in order.
164	///
165	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
166	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
167		// Wait until there's a new latest group or the track is closed.
168		let state = match self
169			.state
170			.wait_for(|state| {
171				state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
172			})
173			.await
174		{
175			Ok(state) => state,
176			Err(_) => return Err(Error::Cancel),
177		};
178
179		match &state.closed {
180			Some(Ok(_)) => return Ok(None),
181			Some(Err(err)) => return Err(err.clone()),
182			_ => {}
183		}
184
185		// If there's a new latest group, return it.
186		let group = state.latest.clone().unwrap();
187		self.prev = Some(group.info.sequence);
188
189		Ok(Some(group))
190	}
191
192	/// Block until the track is closed.
193	pub async fn closed(&self) -> Result<()> {
194		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
195			Ok(state) => state.closed.clone().unwrap(),
196			Err(_) => Err(Error::Cancel),
197		}
198	}
199
200	pub fn is_clone(&self, other: &Self) -> bool {
201		self.state.same_channel(&other.state)
202	}
203}
204
205#[cfg(test)]
206use futures::FutureExt;
207
208#[cfg(test)]
209impl TrackConsumer {
210	pub fn assert_group(&mut self) -> GroupConsumer {
211		self.next_group()
212			.now_or_never()
213			.expect("group would have blocked")
214			.expect("would have errored")
215			.expect("track was closed")
216	}
217
218	pub fn assert_no_group(&mut self) {
219		assert!(
220			self.next_group().now_or_never().is_none(),
221			"next group would not have blocked"
222		);
223	}
224
225	pub fn assert_not_closed(&self) {
226		assert!(self.closed().now_or_never().is_none(), "should not be closed");
227	}
228
229	pub fn assert_closed(&self) {
230		assert!(self.closed().now_or_never().is_some(), "should be closed");
231	}
232
233	// TODO assert specific errors after implementing PartialEq
234	pub fn assert_error(&self) {
235		assert!(
236			self.closed().now_or_never().expect("should not block").is_err(),
237			"should be error"
238		);
239	}
240
241	pub fn assert_is_clone(&self, other: &Self) {
242		assert!(self.is_clone(other), "should be clone");
243	}
244
245	pub fn assert_not_clone(&self, other: &Self) {
246		assert!(!self.is_clone(other), "should not be clone");
247	}
248}