Skip to main content

moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19/// A group contains a sequence number because they can arrive out of order.
20///
21/// You can use [crate::TrackProducer::append_group] if you just want to +1 the sequence number.
22#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct Group {
25	pub sequence: u64,
26}
27
28impl Group {
29	pub fn produce(self) -> GroupProducer {
30		GroupProducer::new(self)
31	}
32}
33
34impl From<usize> for Group {
35	fn from(sequence: usize) -> Self {
36		Self {
37			sequence: sequence as u64,
38		}
39	}
40}
41
42impl From<u64> for Group {
43	fn from(sequence: u64) -> Self {
44		Self { sequence }
45	}
46}
47
48impl From<u32> for Group {
49	fn from(sequence: u32) -> Self {
50		Self {
51			sequence: sequence as u64,
52		}
53	}
54}
55
56impl From<u16> for Group {
57	fn from(sequence: u16) -> Self {
58		Self {
59			sequence: sequence as u64,
60		}
61	}
62}
63
64#[derive(Default)]
65struct GroupState {
66	// The frames that has been written thus far
67	frames: Vec<FrameConsumer>,
68
69	// Whether the group is closed
70	closed: Option<Result<()>>,
71}
72
73/// Create a group, frame-by-frame.
74#[derive(Clone)]
75pub struct GroupProducer {
76	// Mutable stream state.
77	state: watch::Sender<GroupState>,
78
79	// Immutable stream state.
80	pub info: Group,
81}
82
83impl GroupProducer {
84	pub fn new(info: Group) -> Self {
85		Self {
86			info,
87			state: Default::default(),
88		}
89	}
90
91	/// A helper method to write a frame from a single byte buffer.
92	///
93	/// If you want to write multiple chunks, use [Self::create_frame] to get a frame producer.
94	/// But an upfront size is required.
95	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
96		let data = frame.into();
97		let frame = Frame {
98			size: data.len() as u64,
99		};
100		let mut frame = self.create_frame(frame);
101		frame.write_chunk(data);
102		frame.close();
103	}
104
105	/// Create a frame with an upfront size
106	pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
107		let frame = info.produce();
108		self.append_frame(frame.consume());
109		frame
110	}
111
112	/// Append a frame to the group.
113	pub fn append_frame(&mut self, consumer: FrameConsumer) {
114		self.state.send_modify(|state| {
115			assert!(state.closed.is_none());
116			state.frames.push(consumer)
117		});
118	}
119
120	// Clean termination of the group.
121	pub fn close(self) {
122		self.state.send_modify(|state| state.closed = Some(Ok(())));
123	}
124
125	pub fn abort(self, err: Error) {
126		self.state.send_modify(|state| state.closed = Some(Err(err)));
127	}
128
129	/// Create a new consumer for the group.
130	pub fn consume(&self) -> GroupConsumer {
131		GroupConsumer {
132			info: self.info.clone(),
133			state: self.state.subscribe(),
134			index: 0,
135			active: None,
136		}
137	}
138
139	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
140		let state = self.state.clone();
141		async move {
142			state.closed().await;
143		}
144	}
145}
146
147impl From<Group> for GroupProducer {
148	fn from(info: Group) -> Self {
149		GroupProducer::new(info)
150	}
151}
152
153/// Consume a group, frame-by-frame.
154#[derive(Clone)]
155pub struct GroupConsumer {
156	// Modify the stream state.
157	state: watch::Receiver<GroupState>,
158
159	// Immutable stream state.
160	pub info: Group,
161
162	// The number of frames we've read.
163	// NOTE: Cloned readers inherit this offset, but then run in parallel.
164	index: usize,
165
166	// Used to make read_frame cancel safe.
167	active: Option<FrameConsumer>,
168}
169
170impl GroupConsumer {
171	/// Read the next frame.
172	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
173		// In order to be cancel safe, we need to save the active frame.
174		// That way if this method gets cancelled, we can resume where we left off.
175		if self.active.is_none() {
176			self.active = self.next_frame().await?;
177		};
178
179		// Read the frame in one go, which is cancel safe.
180		let Some(frame) = self.active.as_mut() else {
181			return Ok(None);
182		};
183		let frame = frame.read_all().await?;
184
185		self.active = None;
186
187		Ok(Some(frame))
188	}
189
190	/// Return a reader for the next frame.
191	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
192		// Just in case someone called read_frame, cancelled it, then called next_frame.
193		if let Some(frame) = self.active.take() {
194			return Ok(Some(frame));
195		}
196
197		loop {
198			{
199				let state = self.state.borrow_and_update();
200
201				if let Some(frame) = state.frames.get(self.index).cloned() {
202					self.index += 1;
203					return Ok(Some(frame));
204				}
205
206				match &state.closed {
207					Some(Ok(_)) => return Ok(None),
208					Some(Err(err)) => return Err(err.clone()),
209					_ => {}
210				}
211			}
212
213			if self.state.changed().await.is_err() {
214				return Err(Error::Cancel);
215			}
216		}
217	}
218
219	pub async fn get_frame(&self, index: usize) -> Result<Option<FrameConsumer>> {
220		let mut state = self.state.clone();
221		let Ok(state) = state
222			.wait_for(|state| index < state.frames.len() || state.closed.is_some())
223			.await
224		else {
225			return Err(Error::Cancel);
226		};
227
228		if let Some(frame) = state.frames.get(index).cloned() {
229			return Ok(Some(frame));
230		}
231
232		match &state.closed {
233			Some(Ok(_)) => Ok(None),
234			Some(Err(err)) => Err(err.clone()),
235			_ => unreachable!(),
236		}
237	}
238}