moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [Producer] and [Consumer] handle.
2//!
3//! A [Producer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [Consumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [ServeError::MoqError] when all writers or readers are dropped.
10use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub struct Group {
22	pub sequence: u64,
23}
24
25impl Group {
26	pub fn produce(self) -> GroupProducer {
27		GroupProducer::new(self)
28	}
29}
30
31impl From<usize> for Group {
32	fn from(sequence: usize) -> Self {
33		Self {
34			sequence: sequence as u64,
35		}
36	}
37}
38
39impl From<u64> for Group {
40	fn from(sequence: u64) -> Self {
41		Self { sequence }
42	}
43}
44
45impl From<u32> for Group {
46	fn from(sequence: u32) -> Self {
47		Self {
48			sequence: sequence as u64,
49		}
50	}
51}
52
53impl From<u16> for Group {
54	fn from(sequence: u16) -> Self {
55		Self {
56			sequence: sequence as u64,
57		}
58	}
59}
60
61#[derive(Default)]
62struct GroupState {
63	// The frames that has been written thus far
64	frames: Vec<FrameConsumer>,
65
66	// Whether the group is finished
67	closed: Option<Result<()>>,
68}
69
70/// Create a group, frame-by-frame.
71#[derive(Clone)]
72pub struct GroupProducer {
73	// Mutable stream state.
74	state: watch::Sender<GroupState>,
75
76	// Immutable stream state.
77	pub info: Group,
78}
79
80impl GroupProducer {
81	pub fn new(info: Group) -> Self {
82		Self {
83			info,
84			state: Default::default(),
85		}
86	}
87
88	/// A helper method to write a frame from a single byte buffer.
89	///
90	/// If you want to write multiple chunks, use [Self::create_frame] or [Self::append_frame].
91	/// But an upfront size is required.
92	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
93		let data = frame.into();
94		let frame = Frame {
95			size: data.len() as u64,
96		};
97		let mut frame = self.create_frame(frame);
98		frame.write(data);
99		frame.finish();
100	}
101
102	/// Create a frame with an upfront size
103	pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
104		let producer = FrameProducer::new(info);
105		self.append_frame(producer.consume());
106		producer
107	}
108
109	/// Append a frame to the group.
110	pub fn append_frame(&mut self, consumer: FrameConsumer) {
111		self.state.send_modify(|state| {
112			assert!(state.closed.is_none());
113			state.frames.push(consumer)
114		});
115	}
116
117	// Clean termination of the group.
118	pub fn finish(self) {
119		self.state.send_modify(|state| state.closed = Some(Ok(())));
120	}
121
122	pub fn abort(self, err: Error) {
123		self.state.send_modify(|state| state.closed = Some(Err(err)));
124	}
125
126	/// Create a new consumer for the group.
127	pub fn consume(&self) -> GroupConsumer {
128		GroupConsumer {
129			info: self.info.clone(),
130			state: self.state.subscribe(),
131			index: 0,
132			active: None,
133		}
134	}
135
136	pub fn unused(&self) -> impl Future<Output = ()> {
137		let state = self.state.clone();
138		async move {
139			state.closed().await;
140		}
141	}
142}
143
144impl From<Group> for GroupProducer {
145	fn from(info: Group) -> Self {
146		GroupProducer::new(info)
147	}
148}
149
150/// Consume a group, frame-by-frame.
151#[derive(Clone)]
152pub struct GroupConsumer {
153	// Modify the stream state.
154	state: watch::Receiver<GroupState>,
155
156	// Immutable stream state.
157	pub info: Group,
158
159	// The number of frames we've read.
160	// NOTE: Cloned readers inherit this offset, but then run in parallel.
161	index: usize,
162
163	// Used to make read_frame cancel safe.
164	active: Option<FrameConsumer>,
165}
166
167impl GroupConsumer {
168	/// Read the next frame.
169	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
170		// In order to be cancel safe, we need to save the active frame.
171		// That way if this method gets cancelled, we can resume where we left off.
172		if self.active.is_none() {
173			self.active = self.next_frame().await?;
174		};
175
176		// Read the frame in one go, which is cancel safe.
177		let frame = match self.active.as_mut() {
178			Some(frame) => frame.read_all().await?,
179			None => return Ok(None),
180		};
181
182		self.active = None;
183
184		Ok(Some(frame))
185	}
186
187	/// Return a reader for the next frame.
188	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
189		// Just in case someone called read_frame, cancelled it, then called next_frame.
190		if let Some(frame) = self.active.take() {
191			return Ok(Some(frame));
192		}
193
194		loop {
195			{
196				let state = self.state.borrow_and_update();
197
198				if let Some(frame) = state.frames.get(self.index).cloned() {
199					self.index += 1;
200					return Ok(Some(frame));
201				}
202
203				match &state.closed {
204					Some(Ok(_)) => return Ok(None),
205					Some(Err(err)) => return Err(err.clone()),
206					_ => {}
207				}
208			}
209
210			if self.state.changed().await.is_err() {
211				return Err(Error::Cancel);
212			}
213		}
214	}
215}