moq_transfork/model/
frame.rs

1use bytes::{Bytes, BytesMut};
2use std::{fmt, ops};
3use tokio::sync::watch;
4
5use crate::Error;
6
7/// A frame of data with an upfront size.
8#[derive(Clone, PartialEq, Debug)]
9pub struct Frame {
10	pub size: usize,
11}
12
13impl Frame {
14	pub fn new(size: usize) -> Frame {
15		Self { size }
16	}
17
18	pub fn produce(self) -> (FrameProducer, FrameConsumer) {
19		let (send, recv) = watch::channel(FrameState::default());
20
21		let writer = FrameProducer::new(send, self.clone());
22		let reader = FrameConsumer::new(recv, self);
23
24		(writer, reader)
25	}
26}
27
28struct FrameState {
29	// The chunks that has been written thus far
30	chunks: Vec<Bytes>,
31
32	// Set when the writer or all readers are dropped.
33	closed: Result<(), Error>,
34}
35
36impl Default for FrameState {
37	fn default() -> Self {
38		Self {
39			chunks: Vec::new(),
40			closed: Ok(()),
41		}
42	}
43}
44
45impl fmt::Debug for FrameState {
46	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47		f.debug_struct("FrameState")
48			.field("chunks", &self.chunks.len())
49			.field("closed", &self.closed)
50			.finish()
51	}
52}
53
54/// Used to write a frame's worth of data in chunks.
55#[derive(Clone, Debug)]
56pub struct FrameProducer {
57	// Mutable stream state.
58	state: watch::Sender<FrameState>,
59
60	// Immutable stream state.
61	pub info: Frame,
62}
63
64impl FrameProducer {
65	fn new(state: watch::Sender<FrameState>, info: Frame) -> Self {
66		Self { state, info }
67	}
68
69	pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
70		self.state.send_modify(|state| state.chunks.push(chunk.into()));
71	}
72
73	/// Close the stream with an error.
74	pub fn close(self, err: Error) {
75		self.state.send_modify(|state| state.closed = Err(err));
76	}
77
78	/// Create a new consumer for the frame.
79	pub fn subscribe(&self) -> FrameConsumer {
80		FrameConsumer::new(self.state.subscribe(), self.info.clone())
81	}
82}
83
84impl ops::Deref for FrameProducer {
85	type Target = Frame;
86
87	fn deref(&self) -> &Self::Target {
88		&self.info
89	}
90}
91
92/// Used to consume a frame's worth of data in chunks.
93#[derive(Clone, Debug)]
94pub struct FrameConsumer {
95	// Modify the stream state.
96	state: watch::Receiver<FrameState>,
97
98	// Immutable stream state.
99	pub info: Frame,
100
101	// The number of frames we've read.
102	// NOTE: Cloned readers inherit this offset, but then run in parallel.
103	index: usize,
104}
105
106impl FrameConsumer {
107	fn new(state: watch::Receiver<FrameState>, group: Frame) -> Self {
108		Self {
109			state,
110			info: group,
111			index: 0,
112		}
113	}
114
115	// Return the next chunk.
116	pub async fn read(&mut self) -> Result<Option<Bytes>, Error> {
117		loop {
118			{
119				let state = self.state.borrow_and_update();
120
121				if let Some(chunk) = state.chunks.get(self.index).cloned() {
122					self.index += 1;
123					return Ok(Some(chunk));
124				}
125
126				state.closed.clone()?;
127			}
128
129			if self.state.changed().await.is_err() {
130				return Ok(None);
131			}
132		}
133	}
134
135	// Return all of the remaining chunks concatenated together.
136	pub async fn read_all(&mut self) -> Result<Bytes, Error> {
137		// Wait until the writer is done before even attempting to read.
138		// That way this function can be cancelled without consuming half of the frame.
139		if let Ok(err) = self.state.wait_for(|s| s.closed.is_err()).await {
140			return Err(err.closed.clone().unwrap_err());
141		};
142
143		// Get all of the remaining chunks.
144		let state = self.state.borrow_and_update();
145		let chunks = &state.chunks[self.index..];
146		self.index = state.chunks.len();
147
148		// We know the final size so we can allocate the buffer upfront.
149		let size = chunks.iter().map(Bytes::len).sum();
150		let mut buf = BytesMut::with_capacity(size);
151
152		// Copy the chunks into the buffer.
153		for chunk in chunks {
154			buf.extend_from_slice(chunk);
155		}
156
157		Ok(buf.freeze())
158	}
159
160	pub async fn closed(&self) -> Result<(), Error> {
161		match self.state.clone().wait_for(|state| state.closed.is_err()).await {
162			Ok(state) => state.closed.clone(),
163			Err(_) => Ok(()),
164		}
165	}
166}
167
168impl ops::Deref for FrameConsumer {
169	type Target = Frame;
170
171	fn deref(&self) -> &Self::Target {
172		&self.info
173	}
174}