1use std::future::Future;
2
3use bytes::{Bytes, BytesMut};
4use tokio::sync::watch;
5
6use crate::{Error, Result};
7
8#[derive(Clone, Debug)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14pub struct Frame {
15 pub size: u64,
16}
17
18impl Frame {
19 pub fn produce(self) -> FrameProducer {
21 FrameProducer::new(self)
22 }
23}
24
25impl From<usize> for Frame {
26 fn from(size: usize) -> Self {
27 Self { size: size as u64 }
28 }
29}
30
31impl From<u64> for Frame {
32 fn from(size: u64) -> Self {
33 Self { size }
34 }
35}
36
37impl From<u32> for Frame {
38 fn from(size: u32) -> Self {
39 Self { size: size as u64 }
40 }
41}
42
43impl From<u16> for Frame {
44 fn from(size: u16) -> Self {
45 Self { size: size as u64 }
46 }
47}
48
49#[derive(Default)]
50struct FrameState {
51 chunks: Vec<Bytes>,
53
54 closed: Option<Result<()>>,
56}
57
58#[derive(Clone)]
60pub struct FrameProducer {
61 pub info: Frame,
63
64 state: watch::Sender<FrameState>,
66
67 written: usize,
69}
70
71impl FrameProducer {
72 pub fn new(info: Frame) -> Self {
73 Self {
74 info,
75 state: Default::default(),
76 written: 0,
77 }
78 }
79
80 pub fn write_chunk<B: Into<Bytes>>(&mut self, chunk: B) {
81 let chunk = chunk.into();
82 self.written += chunk.len();
83 assert!(self.written <= self.info.size as usize);
84
85 self.state.send_modify(|state| {
86 assert!(state.closed.is_none());
87 state.chunks.push(chunk);
88 });
89 }
90
91 pub fn close(self) {
92 assert!(self.written == self.info.size as usize);
93 self.state.send_modify(|state| state.closed = Some(Ok(())));
94 }
95
96 pub fn abort(self, err: Error) {
97 self.state.send_modify(|state| state.closed = Some(Err(err)));
98 }
99
100 pub fn consume(&self) -> FrameConsumer {
102 FrameConsumer {
103 info: self.info.clone(),
104 state: self.state.subscribe(),
105 index: 0,
106 }
107 }
108
109 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
111 let state = self.state.clone();
112 async move {
113 state.closed().await;
114 }
115 }
116}
117
118impl From<Frame> for FrameProducer {
119 fn from(info: Frame) -> Self {
120 FrameProducer::new(info)
121 }
122}
123
124#[derive(Clone)]
126pub struct FrameConsumer {
127 pub info: Frame,
129
130 state: watch::Receiver<FrameState>,
132
133 index: usize,
136}
137
138impl FrameConsumer {
139 pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
141 loop {
142 {
143 let state = self.state.borrow_and_update();
144
145 if let Some(chunk) = state.chunks.get(self.index).cloned() {
146 self.index += 1;
147 return Ok(Some(chunk));
148 }
149
150 match &state.closed {
151 Some(Ok(_)) => return Ok(None),
152 Some(Err(err)) => return Err(err.clone()),
153 _ => {}
154 }
155 }
156
157 if self.state.changed().await.is_err() {
158 return Err(Error::Cancel);
159 }
160 }
161 }
162
163 pub async fn read_chunks(&mut self) -> Result<Vec<Bytes>> {
165 let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
168 return Err(Error::Cancel);
169 };
170 if let Some(Err(err)) = &state.closed {
171 return Err(err.clone());
172 }
173
174 let chunks = state.chunks[self.index..].to_vec();
176 self.index = state.chunks.len();
177
178 Ok(chunks)
179 }
180
181 pub async fn read_all(&mut self) -> Result<Bytes> {
183 let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
186 return Err(Error::Cancel);
187 };
188 if let Some(Err(err)) = &state.closed {
189 return Err(err.clone());
190 }
191
192 let chunks = &state.chunks[self.index..];
194 self.index = state.chunks.len();
195
196 let size = chunks.iter().map(Bytes::len).sum();
198
199 let mut buf = BytesMut::with_capacity(size);
201
202 for chunk in chunks {
204 buf.extend_from_slice(chunk);
205 }
206
207 Ok(buf.freeze())
208 }
209}