moq_lite/model/
frame.rs

1use std::future::Future;
2
3use bytes::{Bytes, BytesMut};
4use tokio::sync::watch;
5
6use crate::{Error, Produce, Result};
7
8/// A chunk of data with an upfront size.
9#[derive(Clone, Debug)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11pub struct Frame {
12	pub size: u64,
13}
14
15impl Frame {
16	/// Create a new producer and consumer for the frame.
17	pub fn produce(self) -> Produce<FrameProducer, FrameConsumer> {
18		let producer = FrameProducer::new(self);
19		let consumer = producer.consume();
20		Produce { producer, consumer }
21	}
22}
23
24impl From<usize> for Frame {
25	fn from(size: usize) -> Self {
26		Self { size: size as u64 }
27	}
28}
29
30impl From<u64> for Frame {
31	fn from(size: u64) -> Self {
32		Self { size }
33	}
34}
35
36impl From<u32> for Frame {
37	fn from(size: u32) -> Self {
38		Self { size: size as u64 }
39	}
40}
41
42impl From<u16> for Frame {
43	fn from(size: u16) -> Self {
44		Self { size: size as u64 }
45	}
46}
47
48#[derive(Default)]
49struct FrameState {
50	// The chunks that has been written thus far
51	chunks: Vec<Bytes>,
52
53	// Set when the writer or all readers are dropped.
54	closed: Option<Result<()>>,
55}
56
57/// Used to write a frame's worth of data in chunks.
58#[derive(Clone)]
59pub struct FrameProducer {
60	// Immutable stream state.
61	pub info: Frame,
62
63	// Mutable stream state.
64	state: watch::Sender<FrameState>,
65
66	// Sanity check to ensure we don't write more than the frame size.
67	written: usize,
68}
69
70impl FrameProducer {
71	fn new(info: Frame) -> Self {
72		Self {
73			info,
74			state: Default::default(),
75			written: 0,
76		}
77	}
78
79	pub fn write_chunk<B: Into<Bytes>>(&mut self, chunk: B) {
80		let chunk = chunk.into();
81		self.written += chunk.len();
82		assert!(self.written <= self.info.size as usize);
83
84		self.state.send_modify(|state| {
85			assert!(state.closed.is_none());
86			state.chunks.push(chunk);
87		});
88	}
89
90	pub fn close(self) {
91		assert!(self.written == self.info.size as usize);
92		self.state.send_modify(|state| state.closed = Some(Ok(())));
93	}
94
95	pub fn abort(self, err: Error) {
96		self.state.send_modify(|state| state.closed = Some(Err(err)));
97	}
98
99	/// Create a new consumer for the frame.
100	pub fn consume(&self) -> FrameConsumer {
101		FrameConsumer {
102			info: self.info.clone(),
103			state: self.state.subscribe(),
104			index: 0,
105		}
106	}
107
108	// Returns a Future so &self is not borrowed during the future.
109	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
110		let state = self.state.clone();
111		async move {
112			state.closed().await;
113		}
114	}
115}
116
117impl From<Frame> for FrameProducer {
118	fn from(info: Frame) -> Self {
119		FrameProducer::new(info)
120	}
121}
122
123/// Used to consume a frame's worth of data in chunks.
124#[derive(Clone)]
125pub struct FrameConsumer {
126	// Immutable stream state.
127	pub info: Frame,
128
129	// Modify the stream state.
130	state: watch::Receiver<FrameState>,
131
132	// The number of frames we've read.
133	// NOTE: Cloned readers inherit this offset, but then run in parallel.
134	index: usize,
135}
136
137impl FrameConsumer {
138	/// Return the next chunk.
139	pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
140		loop {
141			{
142				let state = self.state.borrow_and_update();
143
144				if let Some(chunk) = state.chunks.get(self.index).cloned() {
145					self.index += 1;
146					return Ok(Some(chunk));
147				}
148
149				match &state.closed {
150					Some(Ok(_)) => return Ok(None),
151					Some(Err(err)) => return Err(err.clone()),
152					_ => {}
153				}
154			}
155
156			if self.state.changed().await.is_err() {
157				return Err(Error::Cancel);
158			}
159		}
160	}
161
162	/// Read all of the remaining chunks into a vector.
163	pub async fn read_chunks(&mut self) -> Result<Vec<Bytes>> {
164		// Wait until the writer is done before even attempting to read.
165		// That way this function can be cancelled without consuming half of the frame.
166		let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
167			return Err(Error::Cancel);
168		};
169		if let Some(Err(err)) = &state.closed {
170			return Err(err.clone());
171		}
172
173		// Get all of the remaining chunks.
174		let chunks = state.chunks[self.index..].to_vec();
175		self.index = state.chunks.len();
176
177		Ok(chunks)
178	}
179
180	/// Return all of the remaining chunks concatenated together.
181	pub async fn read_all(&mut self) -> Result<Bytes> {
182		// Wait until the writer is done before even attempting to read.
183		// That way this function can be cancelled without consuming half of the frame.
184		let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
185			return Err(Error::Cancel);
186		};
187		if let Some(Err(err)) = &state.closed {
188			return Err(err.clone());
189		}
190
191		// Get all of the remaining chunks.
192		let chunks = &state.chunks[self.index..];
193		self.index = state.chunks.len();
194
195		// We know the final size so we can allocate the buffer upfront.
196		let size = chunks.iter().map(Bytes::len).sum();
197
198		// We know the final size so we can allocate the buffer upfront.
199		let mut buf = BytesMut::with_capacity(size);
200
201		// Copy the chunks into the buffer.
202		for chunk in chunks {
203			buf.extend_from_slice(chunk);
204		}
205
206		Ok(buf.freeze())
207	}
208}