moq_lite/model/
track.rs

1//! A track is a collection of semi-reliable and semi-ordered streams, split into a [TrackProducer] and [TrackConsumer] handle.
2//!
3//! A [TrackProducer] creates streams with a sequence number and priority.
4//! The sequest number is used to determine the order of streams, while the priority is used to determine which stream to transmit first.
5//! This may seem counter-intuitive, but is designed for live streaming where the newest streams may be higher priority.
6//! A cloned [Producer] can be used to create streams in parallel, but will error if a duplicate sequence number is used.
7//!
8//! A [TrackConsumer] may not receive all streams in order or at all.
9//! These streams are meant to be transmitted over congested networks and the key to MoQ Tranport is to not block on them.
10//! streams will be cached for a potentially limited duration added to the unreliable nature.
11//! A cloned [Consumer] will receive a copy of all new stream going forward (fanout).
12//!
13//! The track is closed with [Error] when all writers or readers are dropped.
14
15use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::cmp::Ordering;
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26	pub name: String,
27	pub priority: i8,
28}
29
30impl Track {
31	pub fn produce(self) -> TrackProducer {
32		TrackProducer::new(self)
33	}
34}
35
36#[derive(Default)]
37struct TrackState {
38	latest: Option<GroupConsumer>,
39	closed: Option<Result<()>>,
40}
41
42/// A producer for a track, used to create new groups.
43#[derive(Clone)]
44pub struct TrackProducer {
45	pub info: Track,
46	state: watch::Sender<TrackState>,
47}
48
49impl TrackProducer {
50	pub fn new(info: Track) -> Self {
51		Self {
52			info,
53			state: Default::default(),
54		}
55	}
56
57	/// Insert a group into the track, returning true if this is the latest group.
58	pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
59		self.state.send_if_modified(|state| {
60			assert!(state.closed.is_none());
61
62			if let Some(latest) = &state.latest {
63				match group.info.cmp(&latest.info) {
64					Ordering::Less => return false,
65					Ordering::Equal => return false,
66					Ordering::Greater => (),
67				}
68			}
69
70			state.latest = Some(group.clone());
71			true
72		})
73	}
74
75	/// Create a new group with the given sequence number.
76	///
77	/// If the sequence number is not the latest, this method will return None.
78	pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
79		let group = GroupProducer::new(info);
80		self.insert_group(group.consume()).then_some(group)
81	}
82
83	/// Create a new group with the next sequence number.
84	pub fn append_group(&mut self) -> GroupProducer {
85		// TODO remove this extra lock
86		let sequence = self
87			.state
88			.borrow()
89			.latest
90			.as_ref()
91			.map_or(0, |group| group.info.sequence + 1);
92
93		let group = Group { sequence };
94		self.create_group(group).unwrap()
95	}
96
97	pub fn finish(&mut self) {
98		self.state.send_modify(|state| state.closed = Some(Ok(())));
99	}
100
101	pub fn abort(&mut self, err: Error) {
102		self.state.send_modify(|state| state.closed = Some(Err(err)));
103	}
104
105	/// Create a new consumer for the track.
106	pub fn consume(&self) -> TrackConsumer {
107		TrackConsumer {
108			info: self.info.clone(),
109			state: self.state.subscribe(),
110			prev: None,
111		}
112	}
113
114	/// Block until there are no active consumers.
115	pub async fn unused(&self) {
116		self.state.closed().await
117	}
118}
119
120impl From<Track> for TrackProducer {
121	fn from(info: Track) -> Self {
122		TrackProducer::new(info)
123	}
124}
125
126/// A consumer for a track, used to read groups.
127#[derive(Clone)]
128pub struct TrackConsumer {
129	pub info: Track,
130	state: watch::Receiver<TrackState>,
131	prev: Option<u64>, // The previous sequence number
132}
133
134impl TrackConsumer {
135	/// Return the next group in order.
136	///
137	/// NOTE: This can have gaps if the reader is too slow or there were network slowdowns.
138	pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
139		// Wait until there's a new latest group or the track is closed.
140		let state = match self
141			.state
142			.wait_for(|state| {
143				state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
144			})
145			.await
146		{
147			Ok(state) => state,
148			Err(_) => return Err(Error::Cancel),
149		};
150
151		match &state.closed {
152			Some(Ok(_)) => return Ok(None),
153			Some(Err(err)) => return Err(err.clone()),
154			_ => {}
155		}
156
157		// If there's a new latest group, return it.
158		let group = state.latest.clone().unwrap();
159		self.prev = Some(group.info.sequence);
160
161		Ok(Some(group))
162	}
163
164	/// Block until the track is closed.
165	pub async fn closed(&self) -> Result<()> {
166		match self.state.clone().wait_for(|state| state.closed.is_some()).await {
167			Ok(state) => match &state.closed {
168				Some(Ok(_)) => Ok(()),
169				Some(Err(err)) => Err(err.clone()),
170				_ => unreachable!(),
171			},
172			Err(_) => Err(Error::Cancel),
173		}
174	}
175}