1use std::task::Poll;
11
12use bytes::Bytes;
13
14use crate::{Error, Result};
15
16use super::state::{Consumer, Producer};
17use super::waiter::waiter_fn;
18use super::{Frame, FrameConsumer, FrameProducer};
19
20#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Group {
26 pub sequence: u64,
27}
28
29impl Group {
30 pub fn produce(self) -> GroupProducer {
31 GroupProducer::new(self)
32 }
33}
34
35impl From<usize> for Group {
36 fn from(sequence: usize) -> Self {
37 Self {
38 sequence: sequence as u64,
39 }
40 }
41}
42
43impl From<u64> for Group {
44 fn from(sequence: u64) -> Self {
45 Self { sequence }
46 }
47}
48
49impl From<u32> for Group {
50 fn from(sequence: u32) -> Self {
51 Self {
52 sequence: sequence as u64,
53 }
54 }
55}
56
57impl From<u16> for Group {
58 fn from(sequence: u16) -> Self {
59 Self {
60 sequence: sequence as u64,
61 }
62 }
63}
64
65#[derive(Default)]
66struct GroupState {
67 frames: Vec<FrameProducer>,
70
71 fin: bool,
73}
74
75impl GroupState {
76 fn poll_next_frame(&self, index: usize) -> Poll<Option<FrameProducer>> {
77 if let Some(frame) = self.frames.get(index) {
78 Poll::Ready(Some(frame.clone()))
79 } else if self.fin {
80 Poll::Ready(None)
81 } else {
82 Poll::Pending
83 }
84 }
85}
86
87pub struct GroupProducer {
93 state: Producer<GroupState>,
95
96 pub info: Group,
98}
99
100impl GroupProducer {
101 pub fn new(info: Group) -> Self {
103 Self {
104 info,
105 state: Producer::default(),
106 }
107 }
108
109 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) -> Result<()> {
114 let data = frame.into();
115 let frame = Frame {
116 size: data.len() as u64,
117 };
118 let mut frame = self.create_frame(frame)?;
119 frame.write(data)?;
120 frame.finish()?;
121 Ok(())
122 }
123
124 pub fn create_frame(&mut self, info: Frame) -> Result<FrameProducer> {
126 let frame = info.produce();
127 self.append_frame(frame.clone())?;
128 Ok(frame)
129 }
130
131 pub fn append_frame(&mut self, frame: FrameProducer) -> Result<()> {
133 let mut state = self.state.modify()?;
134 if state.fin {
135 return Err(Error::Closed);
136 }
137 state.frames.push(frame);
138 Ok(())
139 }
140
141 pub fn finish(&mut self) -> Result<()> {
143 let mut state = self.state.modify()?;
144 state.fin = true;
145 Ok(())
146 }
147
148 pub fn abort(&mut self, err: Error) -> Result<()> {
152 let mut state = self.state.modify()?;
153
154 for frame in state.frames.iter_mut() {
156 frame.abort(err.clone()).ok();
158 }
159
160 state.abort(err);
161 Ok(())
162 }
163
164 pub fn consume(&self) -> GroupConsumer {
166 GroupConsumer {
167 info: self.info.clone(),
168 state: self.state.consume(),
169 index: 0,
170 }
171 }
172
173 pub async fn closed(&self) -> Error {
175 self.state.closed().await
176 }
177
178 pub async fn unused(&self) -> Result<()> {
180 self.state.unused().await
181 }
182}
183
184impl Clone for GroupProducer {
185 fn clone(&self) -> Self {
186 Self {
187 info: self.info.clone(),
188 state: self.state.clone(),
189 }
190 }
191}
192
193impl From<Group> for GroupProducer {
194 fn from(info: Group) -> Self {
195 GroupProducer::new(info)
196 }
197}
198
199#[derive(Clone)]
201pub struct GroupConsumer {
202 state: Consumer<GroupState>,
204
205 pub info: Group,
207
208 index: usize,
211}
212
213impl GroupConsumer {
214 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
219 let index = self.index;
221 let frame = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
222
223 let Some(frame) = frame else {
224 return Ok(None);
225 };
226
227 let mut consumer = frame.consume();
230 let data = consumer.read_all().await?;
231
232 self.index += 1;
233 Ok(Some(data))
234 }
235
236 pub async fn get_frame(&self, index: usize) -> Result<Option<FrameConsumer>> {
240 let res = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
241 Ok(res.map(|producer| producer.consume()))
242 }
243
244 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
246 let index = self.index;
247 let res = waiter_fn(|waiter| self.state.poll(waiter, |state| state.poll_next_frame(index))).await?;
248 let consumer = res.map(|producer| {
249 self.index += 1;
250 producer.consume()
251 });
252 Ok(consumer)
253 }
254}