1use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub struct Group {
22 pub sequence: u64,
23}
24
25impl Group {
26 pub fn produce(self) -> GroupProducer {
27 GroupProducer::new(self)
28 }
29}
30
31impl From<usize> for Group {
32 fn from(sequence: usize) -> Self {
33 Self {
34 sequence: sequence as u64,
35 }
36 }
37}
38
39impl From<u64> for Group {
40 fn from(sequence: u64) -> Self {
41 Self { sequence }
42 }
43}
44
45impl From<u32> for Group {
46 fn from(sequence: u32) -> Self {
47 Self {
48 sequence: sequence as u64,
49 }
50 }
51}
52
53impl From<u16> for Group {
54 fn from(sequence: u16) -> Self {
55 Self {
56 sequence: sequence as u64,
57 }
58 }
59}
60
61#[derive(Default)]
62struct GroupState {
63 frames: Vec<FrameConsumer>,
65
66 closed: Option<Result<()>>,
68}
69
70#[derive(Clone)]
72pub struct GroupProducer {
73 state: watch::Sender<GroupState>,
75
76 pub info: Group,
78}
79
80impl GroupProducer {
81 pub fn new(info: Group) -> Self {
82 Self {
83 info,
84 state: Default::default(),
85 }
86 }
87
88 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
93 let data = frame.into();
94 let frame = Frame {
95 size: data.len() as u64,
96 };
97 let mut frame = self.create_frame(frame);
98 frame.write(data);
99 frame.finish();
100 }
101
102 pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
104 let producer = FrameProducer::new(info);
105 self.append_frame(producer.consume());
106 producer
107 }
108
109 pub fn append_frame(&mut self, consumer: FrameConsumer) {
111 self.state.send_modify(|state| {
112 assert!(state.closed.is_none());
113 state.frames.push(consumer)
114 });
115 }
116
117 pub fn finish(self) {
119 self.state.send_modify(|state| state.closed = Some(Ok(())));
120 }
121
122 pub fn abort(self, err: Error) {
123 self.state.send_modify(|state| state.closed = Some(Err(err)));
124 }
125
126 pub fn consume(&self) -> GroupConsumer {
128 GroupConsumer {
129 info: self.info.clone(),
130 state: self.state.subscribe(),
131 index: 0,
132 active: None,
133 }
134 }
135
136 pub fn unused(&self) -> impl Future<Output = ()> {
137 let state = self.state.clone();
138 async move {
139 state.closed().await;
140 }
141 }
142}
143
144impl From<Group> for GroupProducer {
145 fn from(info: Group) -> Self {
146 GroupProducer::new(info)
147 }
148}
149
150#[derive(Clone)]
152pub struct GroupConsumer {
153 state: watch::Receiver<GroupState>,
155
156 pub info: Group,
158
159 index: usize,
162
163 active: Option<FrameConsumer>,
165}
166
167impl GroupConsumer {
168 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
170 if self.active.is_none() {
173 self.active = self.next_frame().await?;
174 };
175
176 let frame = match self.active.as_mut() {
178 Some(frame) => frame.read_all().await?,
179 None => return Ok(None),
180 };
181
182 self.active = None;
183
184 Ok(Some(frame))
185 }
186
187 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
189 if let Some(frame) = self.active.take() {
191 return Ok(Some(frame));
192 }
193
194 loop {
195 {
196 let state = self.state.borrow_and_update();
197
198 if let Some(frame) = state.frames.get(self.index).cloned() {
199 self.index += 1;
200 return Ok(Some(frame));
201 }
202
203 match &state.closed {
204 Some(Ok(_)) => return Ok(None),
205 Some(Err(err)) => return Err(err.clone()),
206 _ => {}
207 }
208 }
209
210 if self.state.changed().await.is_err() {
211 return Err(Error::Cancel);
212 }
213 }
214 }
215}