Skip to main content

moq_lite/model/
group.rs

1//! A group is a stream of frames, split into a [GroupProducer] and [GroupConsumer] handle.
2//!
3//! A [GroupProducer] writes an ordered stream of frames.
4//! Frames can be written all at once, or in chunks.
5//!
6//! A [GroupConsumer] reads an ordered stream of frames.
7//! The reader can be cloned, in which case each reader receives a copy of each frame. (fanout)
8//!
9//! The stream is closed with [Error] when all writers or readers are dropped.
10use std::task::Poll;
11
12use bytes::Bytes;
13
14use crate::{Error, Result};
15
16use super::state::{Consumer, Producer};
17use super::waiter::waiter_fn;
18use super::{Frame, FrameConsumer, FrameProducer};
19
20/// A group contains a sequence number because they can arrive out of order.
21///
22/// You can use [crate::TrackProducer::append_group] if you just want to +1 the sequence number.
23#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Group {
26	pub sequence: u64,
27}
28
29impl Group {
30	pub fn produce(self) -> GroupProducer {
31		GroupProducer::new(self)
32	}
33}
34
35impl From<usize> for Group {
36	fn from(sequence: usize) -> Self {
37		Self {
38			sequence: sequence as u64,
39		}
40	}
41}
42
43impl From<u64> for Group {
44	fn from(sequence: u64) -> Self {
45		Self { sequence }
46	}
47}
48
49impl From<u32> for Group {
50	fn from(sequence: u32) -> Self {
51		Self {
52			sequence: sequence as u64,
53		}
54	}
55}
56
57impl From<u16> for Group {
58	fn from(sequence: u16) -> Self {
59		Self {
60			sequence: sequence as u64,
61		}
62	}
63}
64
65#[derive(Default)]
66struct GroupState {
67	// The frames that have been written thus far.
68	// We store producers so consumers can be created on-demand.
69	frames: Vec<FrameProducer>,
70
71	// Whether the group has been finalized (no more frames).
72	fin: bool,
73}
74
75impl GroupState {
76	fn poll_next_frame(&self, index: usize) -> Poll<Option<FrameProducer>> {
77		if let Some(frame) = self.frames.get(index) {
78			Poll::Ready(Some(frame.clone()))
79		} else if self.fin {
80			Poll::Ready(None)
81		} else {
82			Poll::Pending
83		}
84	}
85}
86
87/// Writes frames to a group in order.
88///
89/// Each group is delivered independently over a QUIC stream.
90/// Use [Self::write_frame] for simple single-buffer frames,
91/// or [Self::create_frame] for multi-chunk streaming writes.
92pub struct GroupProducer {
93	// Mutable stream state.
94	state: Producer<GroupState>,
95
96	/// The group header containing the sequence number.
97	pub info: Group,
98}
99
100impl GroupProducer {
101	/// Create a new group producer.
102	pub fn new(info: Group) -> Self {
103		Self {
104			info,
105			state: Producer::default(),
106		}
107	}
108
109	/// A helper method to write a frame from a single byte buffer.
110	///
111	/// If you want to write multiple chunks, use [Self::create_frame] to get a frame producer.
112	/// But an upfront size is required.
113	pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) -> Result<()> {
114		let data = frame.into();
115		let frame = Frame {
116			size: data.len() as u64,
117		};
118		let mut frame = self.create_frame(frame)?;
119		frame.write(data)?;
120		frame.finish()?;
121		Ok(())
122	}
123
124	/// Create a frame with an upfront size
125	pub fn create_frame(&mut self, info: Frame) -> Result<FrameProducer> {
126		let frame = info.produce();
127		self.append_frame(frame.clone())?;
128		Ok(frame)
129	}
130
131	/// Append a frame producer to the group.
132	pub fn append_frame(&mut self, frame: FrameProducer) -> Result<()> {
133		let mut state = self.state.modify()?;
134		if state.fin {
135			return Err(Error::Closed);
136		}
137		state.frames.push(frame);
138		Ok(())
139	}
140
141	/// Mark the group as complete; no more frames will be written.
142	pub fn finish(&mut self) -> Result<()> {
143		let mut state = self.state.modify()?;
144		state.fin = true;
145		Ok(())
146	}
147
148	/// Abort the group with the given error.
149	///
150	/// No updates can be made after this point.
151	pub fn abort(&mut self, err: Error) -> Result<()> {
152		let mut state = self.state.modify()?;
153
154		// Abort all frames still in progress.
155		for frame in state.frames.iter_mut() {
156			// Ignore errors, we don't care if the frame was already closed.
157			frame.abort(err.clone()).ok();
158		}
159
160		state.abort(err);
161		Ok(())
162	}
163
164	/// Create a new consumer for the group.
165	pub fn consume(&self) -> GroupConsumer {
166		GroupConsumer {
167			info: self.info.clone(),
168			state: self.state.consume(),
169			index: 0,
170		}
171	}
172
173	/// Block until the group is closed or aborted.
174	pub async fn closed(&self) -> Error {
175		self.state.closed().await
176	}
177
178	/// Block until there are no active consumers.
179	pub async fn unused(&self) -> Result<()> {
180		self.state.unused().await
181	}
182}
183
184impl Clone for GroupProducer {
185	fn clone(&self) -> Self {
186		Self {
187			info: self.info.clone(),
188			state: self.state.clone(),
189		}
190	}
191}
192
193impl From<Group> for GroupProducer {
194	fn from(info: Group) -> Self {
195		GroupProducer::new(info)
196	}
197}
198
199/// Consume a group, frame-by-frame.
200#[derive(Clone)]
201pub struct GroupConsumer {
202	// Shared state with the producer.
203	state: Consumer<GroupState>,
204
205	// Immutable stream state.
206	pub info: Group,
207
208	// The number of frames we've read.
209	// NOTE: Cloned readers inherit this offset, but then run in parallel.
210	index: usize,
211}
212
213impl GroupConsumer {
214	/// Read the next frame's data all at once.
215	///
216	/// Cancel-safe: if cancelled after obtaining the frame but before reading,
217	/// we retry from the same index and create a fresh consumer.
218	pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
219		// Step 1: Get the next frame producer from the group state.
220		let index = self.index;
221		let frame = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
222
223		let Some(frame) = frame else {
224			return Ok(None);
225		};
226
227		// Step 2: Read all data from the frame via a temporary consumer.
228		// Cancel-safe because read_all returns all or nothing.
229		let mut consumer = frame.consume();
230		let data = consumer.read_all().await?;
231
232		self.index += 1;
233		Ok(Some(data))
234	}
235
236	/// Block until the frame at the given index is available.
237	///
238	/// Returns None if the group is finished and the index is out of range.
239	pub async fn get_frame(&self, index: usize) -> Result<Option<FrameConsumer>> {
240		let res = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
241		Ok(res.map(|producer| producer.consume()))
242	}
243
244	/// Return a consumer for the next frame for chunked reading.
245	pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
246		let index = self.index;
247		let res = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
248		let consumer = res.map(|producer| {
249			self.index += 1;
250			producer.consume()
251		});
252		Ok(consumer)
253	}
254}