1use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct Group {
25 pub sequence: u64,
26}
27
28impl Group {
29 pub fn produce(self) -> GroupProducer {
30 GroupProducer::new(self)
31 }
32}
33
34impl From<usize> for Group {
35 fn from(sequence: usize) -> Self {
36 Self {
37 sequence: sequence as u64,
38 }
39 }
40}
41
42impl From<u64> for Group {
43 fn from(sequence: u64) -> Self {
44 Self { sequence }
45 }
46}
47
48impl From<u32> for Group {
49 fn from(sequence: u32) -> Self {
50 Self {
51 sequence: sequence as u64,
52 }
53 }
54}
55
56impl From<u16> for Group {
57 fn from(sequence: u16) -> Self {
58 Self {
59 sequence: sequence as u64,
60 }
61 }
62}
63
64#[derive(Default)]
65struct GroupState {
66 frames: Vec<FrameConsumer>,
68
69 closed: Option<Result<()>>,
71}
72
73#[derive(Clone)]
75pub struct GroupProducer {
76 state: watch::Sender<GroupState>,
78
79 pub info: Group,
81}
82
83impl GroupProducer {
84 pub fn new(info: Group) -> Self {
85 Self {
86 info,
87 state: Default::default(),
88 }
89 }
90
91 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
96 let data = frame.into();
97 let frame = Frame {
98 size: data.len() as u64,
99 };
100 let mut frame = self.create_frame(frame);
101 frame.write_chunk(data);
102 frame.close();
103 }
104
105 pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
107 let frame = info.produce();
108 self.append_frame(frame.consume());
109 frame
110 }
111
112 pub fn append_frame(&mut self, consumer: FrameConsumer) {
114 self.state.send_modify(|state| {
115 assert!(state.closed.is_none());
116 state.frames.push(consumer)
117 });
118 }
119
120 pub fn close(self) {
122 self.state.send_modify(|state| state.closed = Some(Ok(())));
123 }
124
125 pub fn abort(self, err: Error) {
126 self.state.send_modify(|state| state.closed = Some(Err(err)));
127 }
128
129 pub fn consume(&self) -> GroupConsumer {
131 GroupConsumer {
132 info: self.info.clone(),
133 state: self.state.subscribe(),
134 index: 0,
135 active: None,
136 }
137 }
138
139 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
140 let state = self.state.clone();
141 async move {
142 state.closed().await;
143 }
144 }
145}
146
147impl From<Group> for GroupProducer {
148 fn from(info: Group) -> Self {
149 GroupProducer::new(info)
150 }
151}
152
153#[derive(Clone)]
155pub struct GroupConsumer {
156 state: watch::Receiver<GroupState>,
158
159 pub info: Group,
161
162 index: usize,
165
166 active: Option<FrameConsumer>,
168}
169
170impl GroupConsumer {
171 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
173 if self.active.is_none() {
176 self.active = self.next_frame().await?;
177 };
178
179 let Some(frame) = self.active.as_mut() else {
181 return Ok(None);
182 };
183 let frame = frame.read_all().await?;
184
185 self.active = None;
186
187 Ok(Some(frame))
188 }
189
190 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
192 if let Some(frame) = self.active.take() {
194 return Ok(Some(frame));
195 }
196
197 loop {
198 {
199 let state = self.state.borrow_and_update();
200
201 if let Some(frame) = state.frames.get(self.index).cloned() {
202 self.index += 1;
203 return Ok(Some(frame));
204 }
205
206 match &state.closed {
207 Some(Ok(_)) => return Ok(None),
208 Some(Err(err)) => return Err(err.clone()),
209 _ => {}
210 }
211 }
212
213 if self.state.changed().await.is_err() {
214 return Err(Error::Cancel);
215 }
216 }
217 }
218
219 pub async fn get_frame(&self, index: usize) -> Result<Option<FrameConsumer>> {
220 let mut state = self.state.clone();
221 let Ok(state) = state
222 .wait_for(|state| index < state.frames.len() || state.closed.is_some())
223 .await
224 else {
225 return Err(Error::Cancel);
226 };
227
228 if let Some(frame) = state.frames.get(index).cloned() {
229 return Ok(Some(frame));
230 }
231
232 match &state.closed {
233 Some(Ok(_)) => Ok(None),
234 Some(Err(err)) => Err(err.clone()),
235 _ => unreachable!(),
236 }
237 }
238}