1use bytes::{Bytes, BytesMut};
2use tokio::sync::watch;
3
4use crate::{Error, Result};
5
6#[derive(Clone, Debug)]
7#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
8pub struct Frame {
9 pub size: u64,
10}
11
12impl Frame {
13 pub fn produce(self) -> FrameProducer {
14 FrameProducer::new(self)
15 }
16}
17
18impl From<usize> for Frame {
19 fn from(size: usize) -> Self {
20 Self { size: size as u64 }
21 }
22}
23
24impl From<u64> for Frame {
25 fn from(size: u64) -> Self {
26 Self { size }
27 }
28}
29
30impl From<u32> for Frame {
31 fn from(size: u32) -> Self {
32 Self { size: size as u64 }
33 }
34}
35
36impl From<u16> for Frame {
37 fn from(size: u16) -> Self {
38 Self { size: size as u64 }
39 }
40}
41
42#[derive(Default)]
43struct FrameState {
44 chunks: Vec<Bytes>,
46
47 closed: Option<Result<()>>,
49}
50
51#[derive(Clone)]
53pub struct FrameProducer {
54 pub info: Frame,
56
57 state: watch::Sender<FrameState>,
59
60 written: usize,
62}
63
64impl FrameProducer {
65 pub fn new(info: Frame) -> Self {
66 Self {
67 info,
68 state: Default::default(),
69 written: 0,
70 }
71 }
72
73 pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
74 let chunk = chunk.into();
75 self.written += chunk.len();
76 assert!(self.written <= self.info.size as usize);
77
78 self.state.send_modify(|state| {
79 assert!(state.closed.is_none());
80 state.chunks.push(chunk);
81 });
82 }
83
84 pub fn finish(&mut self) {
85 assert!(self.written == self.info.size as usize);
86 self.state.send_modify(|state| state.closed = Some(Ok(())));
87 }
88
89 pub fn abort(&mut self, err: Error) {
90 self.state.send_modify(|state| state.closed = Some(Err(err)));
91 }
92
93 pub fn consume(&self) -> FrameConsumer {
95 FrameConsumer {
96 info: self.info.clone(),
97 state: self.state.subscribe(),
98 index: 0,
99 }
100 }
101
102 pub async fn unused(&self) {
103 self.state.closed().await;
104 }
105}
106
107impl From<Frame> for FrameProducer {
108 fn from(info: Frame) -> Self {
109 FrameProducer::new(info)
110 }
111}
112
113#[derive(Clone)]
115pub struct FrameConsumer {
116 pub info: Frame,
118
119 state: watch::Receiver<FrameState>,
121
122 index: usize,
125}
126
127impl FrameConsumer {
128 pub async fn read(&mut self) -> Result<Option<Bytes>> {
130 loop {
131 {
132 let state = self.state.borrow_and_update();
133
134 if let Some(chunk) = state.chunks.get(self.index).cloned() {
135 self.index += 1;
136 return Ok(Some(chunk));
137 }
138
139 match &state.closed {
140 Some(Ok(_)) => return Ok(None),
141 Some(Err(err)) => return Err(err.clone()),
142 _ => {}
143 }
144 }
145
146 if self.state.changed().await.is_err() {
147 return Err(Error::Cancel);
148 }
149 }
150 }
151
152 pub async fn read_all(&mut self) -> Result<Bytes> {
154 let state = match self.state.wait_for(|state| state.closed.is_some()).await {
157 Ok(state) => {
158 if let Some(Err(err)) = &state.closed {
159 return Err(err.clone());
160 }
161 state
162 }
163 Err(_) => return Err(Error::Cancel),
164 };
165
166 let chunks = &state.chunks[self.index..];
168 self.index = state.chunks.len();
169
170 let size = chunks.iter().map(Bytes::len).sum();
172
173 let mut buf = BytesMut::with_capacity(size);
175
176 for chunk in chunks {
178 buf.extend_from_slice(chunk);
179 }
180
181 Ok(buf.freeze())
182 }
183}