Skip to main content

moq_lite/model/
frame.rs

1use std::future::Future;
2
3use bytes::{Bytes, BytesMut};
4use tokio::sync::watch;
5
6use crate::{Error, Result};
7
8/// A chunk of data with an upfront size.
9///
10/// Note that this is just the header.
11/// You use [FrameProducer] and [FrameConsumer] to deal with the frame payload, potentially chunked.
12#[derive(Clone, Debug)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14pub struct Frame {
15	pub size: u64,
16}
17
18impl Frame {
19	/// Create a new producer for the frame.
20	pub fn produce(self) -> FrameProducer {
21		FrameProducer::new(self)
22	}
23}
24
25impl From<usize> for Frame {
26	fn from(size: usize) -> Self {
27		Self { size: size as u64 }
28	}
29}
30
31impl From<u64> for Frame {
32	fn from(size: u64) -> Self {
33		Self { size }
34	}
35}
36
37impl From<u32> for Frame {
38	fn from(size: u32) -> Self {
39		Self { size: size as u64 }
40	}
41}
42
43impl From<u16> for Frame {
44	fn from(size: u16) -> Self {
45		Self { size: size as u64 }
46	}
47}
48
49#[derive(Default)]
50struct FrameState {
51	// The chunks that has been written thus far
52	chunks: Vec<Bytes>,
53
54	// Set when the writer or all readers are dropped.
55	closed: Option<Result<()>>,
56}
57
58/// Used to write a frame's worth of data in chunks.
59#[derive(Clone)]
60pub struct FrameProducer {
61	// Immutable stream state.
62	pub info: Frame,
63
64	// Mutable stream state.
65	state: watch::Sender<FrameState>,
66
67	// Sanity check to ensure we don't write more than the frame size.
68	written: usize,
69}
70
71impl FrameProducer {
72	pub fn new(info: Frame) -> Self {
73		Self {
74			info,
75			state: Default::default(),
76			written: 0,
77		}
78	}
79
80	pub fn write_chunk<B: Into<Bytes>>(&mut self, chunk: B) {
81		let chunk = chunk.into();
82		self.written += chunk.len();
83		assert!(self.written <= self.info.size as usize);
84
85		self.state.send_modify(|state| {
86			assert!(state.closed.is_none());
87			state.chunks.push(chunk);
88		});
89	}
90
91	pub fn close(self) {
92		assert!(self.written == self.info.size as usize);
93		self.state.send_modify(|state| state.closed = Some(Ok(())));
94	}
95
96	pub fn abort(self, err: Error) {
97		self.state.send_modify(|state| state.closed = Some(Err(err)));
98	}
99
100	/// Create a new consumer for the frame.
101	pub fn consume(&self) -> FrameConsumer {
102		FrameConsumer {
103			info: self.info.clone(),
104			state: self.state.subscribe(),
105			index: 0,
106		}
107	}
108
109	// Returns a Future so &self is not borrowed during the future.
110	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
111		let state = self.state.clone();
112		async move {
113			state.closed().await;
114		}
115	}
116}
117
118impl From<Frame> for FrameProducer {
119	fn from(info: Frame) -> Self {
120		FrameProducer::new(info)
121	}
122}
123
124/// Used to consume a frame's worth of data in chunks.
125#[derive(Clone)]
126pub struct FrameConsumer {
127	// Immutable stream state.
128	pub info: Frame,
129
130	// Modify the stream state.
131	state: watch::Receiver<FrameState>,
132
133	// The number of frames we've read.
134	// NOTE: Cloned readers inherit this offset, but then run in parallel.
135	index: usize,
136}
137
138impl FrameConsumer {
139	/// Return the next chunk.
140	pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
141		loop {
142			{
143				let state = self.state.borrow_and_update();
144
145				if let Some(chunk) = state.chunks.get(self.index).cloned() {
146					self.index += 1;
147					return Ok(Some(chunk));
148				}
149
150				match &state.closed {
151					Some(Ok(_)) => return Ok(None),
152					Some(Err(err)) => return Err(err.clone()),
153					_ => {}
154				}
155			}
156
157			if self.state.changed().await.is_err() {
158				return Err(Error::Cancel);
159			}
160		}
161	}
162
163	/// Read all of the remaining chunks into a vector.
164	pub async fn read_chunks(&mut self) -> Result<Vec<Bytes>> {
165		// Wait until the writer is done before even attempting to read.
166		// That way this function can be cancelled without consuming half of the frame.
167		let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
168			return Err(Error::Cancel);
169		};
170		if let Some(Err(err)) = &state.closed {
171			return Err(err.clone());
172		}
173
174		// Get all of the remaining chunks.
175		let chunks = state.chunks[self.index..].to_vec();
176		self.index = state.chunks.len();
177
178		Ok(chunks)
179	}
180
181	/// Return all of the remaining chunks concatenated together.
182	pub async fn read_all(&mut self) -> Result<Bytes> {
183		// Wait until the writer is done before even attempting to read.
184		// That way this function can be cancelled without consuming half of the frame.
185		let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
186			return Err(Error::Cancel);
187		};
188		if let Some(Err(err)) = &state.closed {
189			return Err(err.clone());
190		}
191
192		// Get all of the remaining chunks.
193		let chunks = &state.chunks[self.index..];
194		self.index = state.chunks.len();
195
196		// We know the final size so we can allocate the buffer upfront.
197		let size = chunks.iter().map(Bytes::len).sum();
198
199		// We know the final size so we can allocate the buffer upfront.
200		let mut buf = BytesMut::with_capacity(size);
201
202		// Copy the chunks into the buffer.
203		for chunk in chunks {
204			buf.extend_from_slice(chunk);
205		}
206
207		Ok(buf.freeze())
208	}
209}