moq_lite/model/
frame.rs

1use bytes::{Bytes, BytesMut};
2use tokio::sync::watch;
3
4use crate::{Error, Result};
5
6#[derive(Clone, Debug)]
7#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
8pub struct Frame {
9	pub size: u64,
10}
11
12impl Frame {
13	pub fn produce(self) -> FrameProducer {
14		FrameProducer::new(self)
15	}
16}
17
18impl From<usize> for Frame {
19	fn from(size: usize) -> Self {
20		Self { size: size as u64 }
21	}
22}
23
24impl From<u64> for Frame {
25	fn from(size: u64) -> Self {
26		Self { size }
27	}
28}
29
30impl From<u32> for Frame {
31	fn from(size: u32) -> Self {
32		Self { size: size as u64 }
33	}
34}
35
36impl From<u16> for Frame {
37	fn from(size: u16) -> Self {
38		Self { size: size as u64 }
39	}
40}
41
42#[derive(Default)]
43struct FrameState {
44	// The chunks that has been written thus far
45	chunks: Vec<Bytes>,
46
47	// Set when the writer or all readers are dropped.
48	closed: Option<Result<()>>,
49}
50
51/// Used to write a frame's worth of data in chunks.
52#[derive(Clone)]
53pub struct FrameProducer {
54	// Immutable stream state.
55	pub info: Frame,
56
57	// Mutable stream state.
58	state: watch::Sender<FrameState>,
59
60	// Sanity check to ensure we don't write more than the frame size.
61	written: usize,
62}
63
64impl FrameProducer {
65	pub fn new(info: Frame) -> Self {
66		Self {
67			info,
68			state: Default::default(),
69			written: 0,
70		}
71	}
72
73	pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
74		let chunk = chunk.into();
75		self.written += chunk.len();
76		assert!(self.written <= self.info.size as usize);
77
78		self.state.send_modify(|state| {
79			assert!(state.closed.is_none());
80			state.chunks.push(chunk);
81		});
82	}
83
84	pub fn finish(&mut self) {
85		assert!(self.written == self.info.size as usize);
86		self.state.send_modify(|state| state.closed = Some(Ok(())));
87	}
88
89	pub fn abort(&mut self, err: Error) {
90		self.state.send_modify(|state| state.closed = Some(Err(err)));
91	}
92
93	/// Create a new consumer for the frame.
94	pub fn consume(&self) -> FrameConsumer {
95		FrameConsumer {
96			info: self.info.clone(),
97			state: self.state.subscribe(),
98			index: 0,
99		}
100	}
101
102	pub async fn unused(&self) {
103		self.state.closed().await;
104	}
105}
106
107impl From<Frame> for FrameProducer {
108	fn from(info: Frame) -> Self {
109		FrameProducer::new(info)
110	}
111}
112
113/// Used to consume a frame's worth of data in chunks.
114#[derive(Clone)]
115pub struct FrameConsumer {
116	// Immutable stream state.
117	pub info: Frame,
118
119	// Modify the stream state.
120	state: watch::Receiver<FrameState>,
121
122	// The number of frames we've read.
123	// NOTE: Cloned readers inherit this offset, but then run in parallel.
124	index: usize,
125}
126
127impl FrameConsumer {
128	// Return the next chunk.
129	pub async fn read(&mut self) -> Result<Option<Bytes>> {
130		loop {
131			{
132				let state = self.state.borrow_and_update();
133
134				if let Some(chunk) = state.chunks.get(self.index).cloned() {
135					self.index += 1;
136					return Ok(Some(chunk));
137				}
138
139				match &state.closed {
140					Some(Ok(_)) => return Ok(None),
141					Some(Err(err)) => return Err(err.clone()),
142					_ => {}
143				}
144			}
145
146			if self.state.changed().await.is_err() {
147				return Err(Error::Cancel);
148			}
149		}
150	}
151
152	// Return all of the remaining chunks concatenated together.
153	pub async fn read_all(&mut self) -> Result<Bytes> {
154		// Wait until the writer is done before even attempting to read.
155		// That way this function can be cancelled without consuming half of the frame.
156		let state = match self.state.wait_for(|state| state.closed.is_some()).await {
157			Ok(state) => {
158				if let Some(Err(err)) = &state.closed {
159					return Err(err.clone());
160				}
161				state
162			}
163			Err(_) => return Err(Error::Cancel),
164		};
165
166		// Get all of the remaining chunks.
167		let chunks = &state.chunks[self.index..];
168		self.index = state.chunks.len();
169
170		// We know the final size so we can allocate the buffer upfront.
171		let size = chunks.iter().map(Bytes::len).sum();
172
173		// We know the final size so we can allocate the buffer upfront.
174		let mut buf = BytesMut::with_capacity(size);
175
176		// Copy the chunks into the buffer.
177		for chunk in chunks {
178			buf.extend_from_slice(chunk);
179		}
180
181		Ok(buf.freeze())
182	}
183}