1use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Produce, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24pub struct Group {
25 pub sequence: u64,
26}
27
28impl Group {
29 pub fn produce(self) -> Produce<GroupProducer, GroupConsumer> {
30 let producer = GroupProducer::new(self);
31 let consumer = producer.consume();
32 Produce { producer, consumer }
33 }
34}
35
36impl From<usize> for Group {
37 fn from(sequence: usize) -> Self {
38 Self {
39 sequence: sequence as u64,
40 }
41 }
42}
43
44impl From<u64> for Group {
45 fn from(sequence: u64) -> Self {
46 Self { sequence }
47 }
48}
49
50impl From<u32> for Group {
51 fn from(sequence: u32) -> Self {
52 Self {
53 sequence: sequence as u64,
54 }
55 }
56}
57
58impl From<u16> for Group {
59 fn from(sequence: u16) -> Self {
60 Self {
61 sequence: sequence as u64,
62 }
63 }
64}
65
66#[derive(Default)]
67struct GroupState {
68 frames: Vec<FrameConsumer>,
70
71 closed: Option<Result<()>>,
73}
74
75#[derive(Clone)]
77pub struct GroupProducer {
78 state: watch::Sender<GroupState>,
80
81 pub info: Group,
83}
84
85impl GroupProducer {
86 fn new(info: Group) -> Self {
87 Self {
88 info,
89 state: Default::default(),
90 }
91 }
92
93 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
98 let data = frame.into();
99 let frame = Frame {
100 size: data.len() as u64,
101 };
102 let mut frame = self.create_frame(frame);
103 frame.write_chunk(data);
104 frame.close();
105 }
106
107 pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
109 let frame = Frame::produce(info);
110 self.append_frame(frame.consumer);
111 frame.producer
112 }
113
114 pub fn append_frame(&mut self, consumer: FrameConsumer) {
116 self.state.send_modify(|state| {
117 assert!(state.closed.is_none());
118 state.frames.push(consumer)
119 });
120 }
121
122 pub fn close(self) {
124 self.state.send_modify(|state| state.closed = Some(Ok(())));
125 }
126
127 pub fn abort(self, err: Error) {
128 self.state.send_modify(|state| state.closed = Some(Err(err)));
129 }
130
131 pub fn consume(&self) -> GroupConsumer {
133 GroupConsumer {
134 info: self.info.clone(),
135 state: self.state.subscribe(),
136 index: 0,
137 active: None,
138 }
139 }
140
141 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
142 let state = self.state.clone();
143 async move {
144 state.closed().await;
145 }
146 }
147}
148
149impl From<Group> for GroupProducer {
150 fn from(info: Group) -> Self {
151 GroupProducer::new(info)
152 }
153}
154
155#[derive(Clone)]
157pub struct GroupConsumer {
158 state: watch::Receiver<GroupState>,
160
161 pub info: Group,
163
164 index: usize,
167
168 active: Option<FrameConsumer>,
170}
171
172impl GroupConsumer {
173 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
175 if self.active.is_none() {
178 self.active = self.next_frame().await?;
179 };
180
181 let Some(frame) = self.active.as_mut() else {
183 return Ok(None);
184 };
185 let frame = frame.read_all().await?;
186
187 self.active = None;
188
189 Ok(Some(frame))
190 }
191
192 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
194 if let Some(frame) = self.active.take() {
196 return Ok(Some(frame));
197 }
198
199 loop {
200 {
201 let state = self.state.borrow_and_update();
202
203 if let Some(frame) = state.frames.get(self.index).cloned() {
204 self.index += 1;
205 return Ok(Some(frame));
206 }
207
208 match &state.closed {
209 Some(Ok(_)) => return Ok(None),
210 Some(Err(err)) => return Err(err.clone()),
211 _ => {}
212 }
213 }
214
215 if self.state.changed().await.is_err() {
216 return Err(Error::Cancel);
217 }
218 }
219 }
220}