moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Produce, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub struct Group {
22	pub sequence: u64,
23}
24
25impl Group {
26	pub fn produce(self) -> Produce<GroupProducer, GroupConsumer> {
27		let producer = GroupProducer::new(self);
28		let consumer = producer.consume();
29		Produce { producer, consumer }
30	}
31}
32
33impl From<usize> for Group {
34	fn from(sequence: usize) -> Self {
35		Self {
36			sequence: sequence as u64,
37		}
38	}
39}
40
41impl From<u64> for Group {
42	fn from(sequence: u64) -> Self {
43		Self { sequence }
44	}
45}
46
47impl From<u32> for Group {
48	fn from(sequence: u32) -> Self {
49		Self {
50			sequence: sequence as u64,
51		}
52	}
53}
54
55impl From<u16> for Group {
56	fn from(sequence: u16) -> Self {
57		Self {
58			sequence: sequence as u64,
59		}
60	}
61}
62
63#[derive(Default)]
64struct GroupState {
65	// The frames that has been written thus far
66	frames: Vec<FrameConsumer>,
67
68	// Whether the group is closed
69	closed: Option<Result<()>>,
70}
71
72/// Create a group, frame-by-frame.
73#[derive(Clone)]
74pub struct GroupProducer {
75	// Mutable stream state.
76	state: watch::Sender<GroupState>,
77
78	// Immutable stream state.
79	pub info: Group,
80}
81
82impl GroupProducer {
83	fn new(info: Group) -> Self {
84		Self {
85			info,
86			state: Default::default(),
87		}
88	}
89
90	/// A helper method to write a frame from a single byte buffer.
91	///
92	/// If you want to write multiple chunks, use [Self::create] or [Self::append].
93	/// But an upfront size is required.
94	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
95		let data = frame.into();
96		let frame = Frame {
97			size: data.len() as u64,
98		};
99		let mut frame = self.create_frame(frame);
100		frame.write_chunk(data);
101		frame.close();
102	}
103
104	/// Create a frame with an upfront size
105	pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
106		let frame = Frame::produce(info);
107		self.append_frame(frame.consumer);
108		frame.producer
109	}
110
111	/// Append a frame to the group.
112	pub fn append_frame(&mut self, consumer: FrameConsumer) {
113		self.state.send_modify(|state| {
114			assert!(state.closed.is_none());
115			state.frames.push(consumer)
116		});
117	}
118
119	// Clean termination of the group.
120	pub fn close(self) {
121		self.state.send_modify(|state| state.closed = Some(Ok(())));
122	}
123
124	pub fn abort(self, err: Error) {
125		self.state.send_modify(|state| state.closed = Some(Err(err)));
126	}
127
128	/// Create a new consumer for the group.
129	pub fn consume(&self) -> GroupConsumer {
130		GroupConsumer {
131			info: self.info.clone(),
132			state: self.state.subscribe(),
133			index: 0,
134			active: None,
135		}
136	}
137
138	pub fn unused(&self) -> impl Future<Output = ()> {
139		let state = self.state.clone();
140		async move {
141			state.closed().await;
142		}
143	}
144}
145
146impl From<Group> for GroupProducer {
147	fn from(info: Group) -> Self {
148		GroupProducer::new(info)
149	}
150}
151
152/// Consume a group, frame-by-frame.
153#[derive(Clone)]
154pub struct GroupConsumer {
155	// Modify the stream state.
156	state: watch::Receiver<GroupState>,
157
158	// Immutable stream state.
159	pub info: Group,
160
161	// The number of frames we've read.
162	// NOTE: Cloned readers inherit this offset, but then run in parallel.
163	index: usize,
164
165	// Used to make read_frame cancel safe.
166	active: Option<FrameConsumer>,
167}
168
169impl GroupConsumer {
170	/// Read the next frame.
171	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
172		// In order to be cancel safe, we need to save the active frame.
173		// That way if this method gets cancelled, we can resume where we left off.
174		if self.active.is_none() {
175			self.active = self.next_frame().await?;
176		};
177
178		// Read the frame in one go, which is cancel safe.
179		let frame = match self.active.as_mut() {
180			Some(frame) => frame.read_all().await?,
181			None => return Ok(None),
182		};
183
184		self.active = None;
185
186		Ok(Some(frame))
187	}
188
189	/// Return a reader for the next frame.
190	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
191		// Just in case someone called read_frame, cancelled it, then called next_frame.
192		if let Some(frame) = self.active.take() {
193			return Ok(Some(frame));
194		}
195
196		loop {
197			{
198				let state = self.state.borrow_and_update();
199
200				if let Some(frame) = state.frames.get(self.index).cloned() {
201					self.index += 1;
202					return Ok(Some(frame));
203				}
204
205				match &state.closed {
206					Some(Ok(_)) => return Ok(None),
207					Some(Err(err)) => return Err(err.clone()),
208					_ => {}
209				}
210			}
211
212			if self.state.changed().await.is_err() {
213				return Err(Error::Cancel);
214			}
215		}
216	}
217}