moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Produce, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19/// A group contains a sequence number because they can arrive out of order.
20///
21/// You can use [crate::TrackProducer::append_group] if you just want to +1 the sequence number.
22#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct Group {
25	pub sequence: u64,
26}
27
28impl Group {
29	pub fn produce(self) -> Produce<GroupProducer, GroupConsumer> {
30		let producer = GroupProducer::new(self);
31		let consumer = producer.consume();
32		Produce { producer, consumer }
33	}
34}
35
36impl From<usize> for Group {
37	fn from(sequence: usize) -> Self {
38		Self {
39			sequence: sequence as u64,
40		}
41	}
42}
43
44impl From<u64> for Group {
45	fn from(sequence: u64) -> Self {
46		Self { sequence }
47	}
48}
49
50impl From<u32> for Group {
51	fn from(sequence: u32) -> Self {
52		Self {
53			sequence: sequence as u64,
54		}
55	}
56}
57
58impl From<u16> for Group {
59	fn from(sequence: u16) -> Self {
60		Self {
61			sequence: sequence as u64,
62		}
63	}
64}
65
66#[derive(Default)]
67struct GroupState {
68	// The frames that has been written thus far
69	frames: Vec<FrameConsumer>,
70
71	// Whether the group is closed
72	closed: Option<Result<()>>,
73}
74
75/// Create a group, frame-by-frame.
76#[derive(Clone)]
77pub struct GroupProducer {
78	// Mutable stream state.
79	state: watch::Sender<GroupState>,
80
81	// Immutable stream state.
82	pub info: Group,
83}
84
85impl GroupProducer {
86	fn new(info: Group) -> Self {
87		Self {
88			info,
89			state: Default::default(),
90		}
91	}
92
93	/// A helper method to write a frame from a single byte buffer.
94	///
95	/// If you want to write multiple chunks, use [Self::create_frame] to get a frame producer.
96	/// But an upfront size is required.
97	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
98		let data = frame.into();
99		let frame = Frame {
100			size: data.len() as u64,
101		};
102		let mut frame = self.create_frame(frame);
103		frame.write_chunk(data);
104		frame.close();
105	}
106
107	/// Create a frame with an upfront size
108	pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
109		let frame = Frame::produce(info);
110		self.append_frame(frame.consumer);
111		frame.producer
112	}
113
114	/// Append a frame to the group.
115	pub fn append_frame(&mut self, consumer: FrameConsumer) {
116		self.state.send_modify(|state| {
117			assert!(state.closed.is_none());
118			state.frames.push(consumer)
119		});
120	}
121
122	// Clean termination of the group.
123	pub fn close(self) {
124		self.state.send_modify(|state| state.closed = Some(Ok(())));
125	}
126
127	pub fn abort(self, err: Error) {
128		self.state.send_modify(|state| state.closed = Some(Err(err)));
129	}
130
131	/// Create a new consumer for the group.
132	pub fn consume(&self) -> GroupConsumer {
133		GroupConsumer {
134			info: self.info.clone(),
135			state: self.state.subscribe(),
136			index: 0,
137			active: None,
138		}
139	}
140
141	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
142		let state = self.state.clone();
143		async move {
144			state.closed().await;
145		}
146	}
147}
148
149impl From<Group> for GroupProducer {
150	fn from(info: Group) -> Self {
151		GroupProducer::new(info)
152	}
153}
154
155/// Consume a group, frame-by-frame.
156#[derive(Clone)]
157pub struct GroupConsumer {
158	// Modify the stream state.
159	state: watch::Receiver<GroupState>,
160
161	// Immutable stream state.
162	pub info: Group,
163
164	// The number of frames we've read.
165	// NOTE: Cloned readers inherit this offset, but then run in parallel.
166	index: usize,
167
168	// Used to make read_frame cancel safe.
169	active: Option<FrameConsumer>,
170}
171
172impl GroupConsumer {
173	/// Read the next frame.
174	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
175		// In order to be cancel safe, we need to save the active frame.
176		// That way if this method gets cancelled, we can resume where we left off.
177		if self.active.is_none() {
178			self.active = self.next_frame().await?;
179		};
180
181		// Read the frame in one go, which is cancel safe.
182		let Some(frame) = self.active.as_mut() else {
183			return Ok(None);
184		};
185		let frame = frame.read_all().await?;
186
187		self.active = None;
188
189		Ok(Some(frame))
190	}
191
192	/// Return a reader for the next frame.
193	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
194		// Just in case someone called read_frame, cancelled it, then called next_frame.
195		if let Some(frame) = self.active.take() {
196			return Ok(Some(frame));
197		}
198
199		loop {
200			{
201				let state = self.state.borrow_and_update();
202
203				if let Some(frame) = state.frames.get(self.index).cloned() {
204					self.index += 1;
205					return Ok(Some(frame));
206				}
207
208				match &state.closed {
209					Some(Ok(_)) => return Ok(None),
210					Some(Err(err)) => return Err(err.clone()),
211					_ => {}
212				}
213			}
214
215			if self.state.changed().await.is_err() {
216				return Err(Error::Cancel);
217			}
218		}
219	}
220}