1use std::future::Future;
11
12use bytes::Bytes;
13use tokio::sync::watch;
14
15use crate::{Error, Produce, Result};
16
17use super::{Frame, FrameConsumer, FrameProducer};
18
19#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub struct Group {
22 pub sequence: u64,
23}
24
25impl Group {
26 pub fn produce(self) -> Produce<GroupProducer, GroupConsumer> {
27 let producer = GroupProducer::new(self);
28 let consumer = producer.consume();
29 Produce { producer, consumer }
30 }
31}
32
33impl From<usize> for Group {
34 fn from(sequence: usize) -> Self {
35 Self {
36 sequence: sequence as u64,
37 }
38 }
39}
40
41impl From<u64> for Group {
42 fn from(sequence: u64) -> Self {
43 Self { sequence }
44 }
45}
46
47impl From<u32> for Group {
48 fn from(sequence: u32) -> Self {
49 Self {
50 sequence: sequence as u64,
51 }
52 }
53}
54
55impl From<u16> for Group {
56 fn from(sequence: u16) -> Self {
57 Self {
58 sequence: sequence as u64,
59 }
60 }
61}
62
63#[derive(Default)]
64struct GroupState {
65 frames: Vec<FrameConsumer>,
67
68 closed: Option<Result<()>>,
70}
71
72#[derive(Clone)]
74pub struct GroupProducer {
75 state: watch::Sender<GroupState>,
77
78 pub info: Group,
80}
81
82impl GroupProducer {
83 fn new(info: Group) -> Self {
84 Self {
85 info,
86 state: Default::default(),
87 }
88 }
89
90 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
95 let data = frame.into();
96 let frame = Frame {
97 size: data.len() as u64,
98 };
99 let mut frame = self.create_frame(frame);
100 frame.write_chunk(data);
101 frame.close();
102 }
103
104 pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
106 let frame = Frame::produce(info);
107 self.append_frame(frame.consumer);
108 frame.producer
109 }
110
111 pub fn append_frame(&mut self, consumer: FrameConsumer) {
113 self.state.send_modify(|state| {
114 assert!(state.closed.is_none());
115 state.frames.push(consumer)
116 });
117 }
118
119 pub fn close(self) {
121 self.state.send_modify(|state| state.closed = Some(Ok(())));
122 }
123
124 pub fn abort(self, err: Error) {
125 self.state.send_modify(|state| state.closed = Some(Err(err)));
126 }
127
128 pub fn consume(&self) -> GroupConsumer {
130 GroupConsumer {
131 info: self.info.clone(),
132 state: self.state.subscribe(),
133 index: 0,
134 active: None,
135 }
136 }
137
138 pub fn unused(&self) -> impl Future<Output = ()> {
139 let state = self.state.clone();
140 async move {
141 state.closed().await;
142 }
143 }
144}
145
146impl From<Group> for GroupProducer {
147 fn from(info: Group) -> Self {
148 GroupProducer::new(info)
149 }
150}
151
152#[derive(Clone)]
154pub struct GroupConsumer {
155 state: watch::Receiver<GroupState>,
157
158 pub info: Group,
160
161 index: usize,
164
165 active: Option<FrameConsumer>,
167}
168
169impl GroupConsumer {
170 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
172 if self.active.is_none() {
175 self.active = self.next_frame().await?;
176 };
177
178 let frame = match self.active.as_mut() {
180 Some(frame) => frame.read_all().await?,
181 None => return Ok(None),
182 };
183
184 self.active = None;
185
186 Ok(Some(frame))
187 }
188
189 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
191 if let Some(frame) = self.active.take() {
193 return Ok(Some(frame));
194 }
195
196 loop {
197 {
198 let state = self.state.borrow_and_update();
199
200 if let Some(frame) = state.frames.get(self.index).cloned() {
201 self.index += 1;
202 return Ok(Some(frame));
203 }
204
205 match &state.closed {
206 Some(Ok(_)) => return Ok(None),
207 Some(Err(err)) => return Err(err.clone()),
208 _ => {}
209 }
210 }
211
212 if self.state.changed().await.is_err() {
213 return Err(Error::Cancel);
214 }
215 }
216 }
217}