moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use bytes::Bytes;
11use tokio::sync::watch;
12
13use crate::{Error, Result};
14
15use super::{Frame, FrameConsumer, FrameProducer};
16
17#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
18#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
19pub struct Group {
20	pub sequence: u64,
21}
22
23impl Group {
24	pub fn produce(self) -> GroupProducer {
25		GroupProducer::new(self)
26	}
27}
28
29impl From<usize> for Group {
30	fn from(sequence: usize) -> Self {
31		Self {
32			sequence: sequence as u64,
33		}
34	}
35}
36
37impl From<u64> for Group {
38	fn from(sequence: u64) -> Self {
39		Self { sequence }
40	}
41}
42
43impl From<u32> for Group {
44	fn from(sequence: u32) -> Self {
45		Self {
46			sequence: sequence as u64,
47		}
48	}
49}
50
51impl From<u16> for Group {
52	fn from(sequence: u16) -> Self {
53		Self {
54			sequence: sequence as u64,
55		}
56	}
57}
58
59#[derive(Default)]
60struct GroupState {
61	// The frames that has been written thus far
62	frames: Vec<FrameConsumer>,
63
64	// Whether the group is finished
65	closed: Option<Result<()>>,
66}
67
68/// Create a group, frame-by-frame.
69#[derive(Clone)]
70pub struct GroupProducer {
71	// Mutable stream state.
72	state: watch::Sender<GroupState>,
73
74	// Immutable stream state.
75	pub info: Group,
76}
77
78impl GroupProducer {
79	pub fn new(info: Group) -> Self {
80		Self {
81			info,
82			state: Default::default(),
83		}
84	}
85
86	/// A helper method to write a frame from a single byte buffer.
87	///
88	/// If you want to write multiple chunks, use [Self::create_frame] or [Self::append_frame].
89	/// But an upfront size is required.
90	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
91		let data = frame.into();
92		let frame = Frame {
93			size: data.len() as u64,
94		};
95		let mut frame = self.create_frame(frame);
96		frame.write(data);
97		frame.finish();
98	}
99
100	/// Create a frame with an upfront size
101	pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
102		let producer = FrameProducer::new(info);
103		self.append_frame(producer.consume());
104		producer
105	}
106
107	/// Append a frame to the group.
108	pub fn append_frame(&mut self, consumer: FrameConsumer) {
109		self.state.send_modify(|state| {
110			assert!(state.closed.is_none());
111			state.frames.push(consumer)
112		});
113	}
114
115	// Clean termination of the group.
116	pub fn finish(&mut self) {
117		self.state.send_modify(|state| state.closed = Some(Ok(())));
118	}
119
120	pub fn abort(&mut self, err: Error) {
121		self.state.send_modify(|state| state.closed = Some(Err(err)));
122	}
123
124	/// Create a new consumer for the group.
125	pub fn consume(&self) -> GroupConsumer {
126		GroupConsumer {
127			info: self.info.clone(),
128			state: self.state.subscribe(),
129			index: 0,
130			active: None,
131		}
132	}
133
134	pub async fn unused(&self) {
135		self.state.closed().await;
136	}
137}
138
139impl From<Group> for GroupProducer {
140	fn from(info: Group) -> Self {
141		GroupProducer::new(info)
142	}
143}
144
145/// Consume a group, frame-by-frame.
146#[derive(Clone)]
147pub struct GroupConsumer {
148	// Modify the stream state.
149	state: watch::Receiver<GroupState>,
150
151	// Immutable stream state.
152	pub info: Group,
153
154	// The number of frames we've read.
155	// NOTE: Cloned readers inherit this offset, but then run in parallel.
156	index: usize,
157
158	// Used to make read_frame cancel safe.
159	active: Option<FrameConsumer>,
160}
161
162impl GroupConsumer {
163	/// Read the next frame.
164	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
165		// In order to be cancel safe, we need to save the active frame.
166		// That way if this method gets cancelled, we can resume where we left off.
167		if self.active.is_none() {
168			self.active = self.next_frame().await?;
169		};
170
171		// Read the frame in one go, which is cancel safe.
172		let frame = match self.active.as_mut() {
173			Some(frame) => frame.read_all().await?,
174			None => return Ok(None),
175		};
176
177		self.active = None;
178
179		Ok(Some(frame))
180	}
181
182	/// Return a reader for the next frame.
183	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
184		// Just in case someone called read_frame, cancelled it, then called next_frame.
185		if let Some(frame) = self.active.take() {
186			return Ok(Some(frame));
187		}
188
189		loop {
190			{
191				let state = self.state.borrow_and_update();
192
193				if let Some(frame) = state.frames.get(self.index).cloned() {
194					self.index += 1;
195					return Ok(Some(frame));
196				}
197
198				match &state.closed {
199					Some(Ok(_)) => return Ok(None),
200					Some(Err(err)) => return Err(err.clone()),
201					_ => {}
202				}
203			}
204
205			if self.state.changed().await.is_err() {
206				return Err(Error::Cancel);
207			}
208		}
209	}
210}