1use std::future::Future;
2
3use bytes::{Bytes, BytesMut};
4use tokio::sync::watch;
5
6use crate::{Error, Produce, Result};
7
8#[derive(Clone, Debug)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11pub struct Frame {
12 pub size: u64,
13}
14
15impl Frame {
16 pub fn produce(self) -> Produce<FrameProducer, FrameConsumer> {
18 let producer = FrameProducer::new(self);
19 let consumer = producer.consume();
20 Produce { producer, consumer }
21 }
22}
23
24impl From<usize> for Frame {
25 fn from(size: usize) -> Self {
26 Self { size: size as u64 }
27 }
28}
29
30impl From<u64> for Frame {
31 fn from(size: u64) -> Self {
32 Self { size }
33 }
34}
35
36impl From<u32> for Frame {
37 fn from(size: u32) -> Self {
38 Self { size: size as u64 }
39 }
40}
41
42impl From<u16> for Frame {
43 fn from(size: u16) -> Self {
44 Self { size: size as u64 }
45 }
46}
47
48#[derive(Default)]
49struct FrameState {
50 chunks: Vec<Bytes>,
52
53 closed: Option<Result<()>>,
55}
56
57#[derive(Clone)]
59pub struct FrameProducer {
60 pub info: Frame,
62
63 state: watch::Sender<FrameState>,
65
66 written: usize,
68}
69
70impl FrameProducer {
71 fn new(info: Frame) -> Self {
72 Self {
73 info,
74 state: Default::default(),
75 written: 0,
76 }
77 }
78
79 pub fn write_chunk<B: Into<Bytes>>(&mut self, chunk: B) {
80 let chunk = chunk.into();
81 self.written += chunk.len();
82 assert!(self.written <= self.info.size as usize);
83
84 self.state.send_modify(|state| {
85 assert!(state.closed.is_none());
86 state.chunks.push(chunk);
87 });
88 }
89
90 pub fn close(self) {
91 assert!(self.written == self.info.size as usize);
92 self.state.send_modify(|state| state.closed = Some(Ok(())));
93 }
94
95 pub fn abort(self, err: Error) {
96 self.state.send_modify(|state| state.closed = Some(Err(err)));
97 }
98
99 pub fn consume(&self) -> FrameConsumer {
101 FrameConsumer {
102 info: self.info.clone(),
103 state: self.state.subscribe(),
104 index: 0,
105 }
106 }
107
108 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
110 let state = self.state.clone();
111 async move {
112 state.closed().await;
113 }
114 }
115}
116
117impl From<Frame> for FrameProducer {
118 fn from(info: Frame) -> Self {
119 FrameProducer::new(info)
120 }
121}
122
123#[derive(Clone)]
125pub struct FrameConsumer {
126 pub info: Frame,
128
129 state: watch::Receiver<FrameState>,
131
132 index: usize,
135}
136
137impl FrameConsumer {
138 pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
140 loop {
141 {
142 let state = self.state.borrow_and_update();
143
144 if let Some(chunk) = state.chunks.get(self.index).cloned() {
145 self.index += 1;
146 return Ok(Some(chunk));
147 }
148
149 match &state.closed {
150 Some(Ok(_)) => return Ok(None),
151 Some(Err(err)) => return Err(err.clone()),
152 _ => {}
153 }
154 }
155
156 if self.state.changed().await.is_err() {
157 return Err(Error::Cancel);
158 }
159 }
160 }
161
162 pub async fn read_chunks(&mut self) -> Result<Vec<Bytes>> {
164 let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
167 return Err(Error::Cancel);
168 };
169 if let Some(Err(err)) = &state.closed {
170 return Err(err.clone());
171 }
172
173 let chunks = state.chunks[self.index..].to_vec();
175 self.index = state.chunks.len();
176
177 Ok(chunks)
178 }
179
180 pub async fn read_all(&mut self) -> Result<Bytes> {
182 let Ok(state) = self.state.wait_for(|state| state.closed.is_some()).await else {
185 return Err(Error::Cancel);
186 };
187 if let Some(Err(err)) = &state.closed {
188 return Err(err.clone());
189 }
190
191 let chunks = &state.chunks[self.index..];
193 self.index = state.chunks.len();
194
195 let size = chunks.iter().map(Bytes::len).sum();
197
198 let mut buf = BytesMut::with_capacity(size);
200
201 for chunk in chunks {
203 buf.extend_from_slice(chunk);
204 }
205
206 Ok(buf.freeze())
207 }
208}