1use bytes::Bytes;
11use tokio::sync::watch;
12
13use crate::{Error, Result};
14
15use super::{Frame, FrameConsumer, FrameProducer};
16
17#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
18#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
19pub struct Group {
20 pub sequence: u64,
21}
22
23impl Group {
24 pub fn produce(self) -> GroupProducer {
25 GroupProducer::new(self)
26 }
27}
28
29impl From<usize> for Group {
30 fn from(sequence: usize) -> Self {
31 Self {
32 sequence: sequence as u64,
33 }
34 }
35}
36
37impl From<u64> for Group {
38 fn from(sequence: u64) -> Self {
39 Self { sequence }
40 }
41}
42
43impl From<u32> for Group {
44 fn from(sequence: u32) -> Self {
45 Self {
46 sequence: sequence as u64,
47 }
48 }
49}
50
51impl From<u16> for Group {
52 fn from(sequence: u16) -> Self {
53 Self {
54 sequence: sequence as u64,
55 }
56 }
57}
58
59#[derive(Default)]
60struct GroupState {
61 frames: Vec<FrameConsumer>,
63
64 closed: Option<Result<()>>,
66}
67
68#[derive(Clone)]
70pub struct GroupProducer {
71 state: watch::Sender<GroupState>,
73
74 pub info: Group,
76}
77
78impl GroupProducer {
79 pub fn new(info: Group) -> Self {
80 Self {
81 info,
82 state: Default::default(),
83 }
84 }
85
86 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
91 let data = frame.into();
92 let frame = Frame {
93 size: data.len() as u64,
94 };
95 let mut frame = self.create_frame(frame);
96 frame.write(data);
97 frame.finish();
98 }
99
100 pub fn create_frame(&mut self, info: Frame) -> FrameProducer {
102 let producer = FrameProducer::new(info);
103 self.append_frame(producer.consume());
104 producer
105 }
106
107 pub fn append_frame(&mut self, consumer: FrameConsumer) {
109 self.state.send_modify(|state| {
110 assert!(state.closed.is_none());
111 state.frames.push(consumer)
112 });
113 }
114
115 pub fn finish(&mut self) {
117 self.state.send_modify(|state| state.closed = Some(Ok(())));
118 }
119
120 pub fn abort(&mut self, err: Error) {
121 self.state.send_modify(|state| state.closed = Some(Err(err)));
122 }
123
124 pub fn consume(&self) -> GroupConsumer {
126 GroupConsumer {
127 info: self.info.clone(),
128 state: self.state.subscribe(),
129 index: 0,
130 active: None,
131 }
132 }
133
134 pub async fn unused(&self) {
135 self.state.closed().await;
136 }
137}
138
139impl From<Group> for GroupProducer {
140 fn from(info: Group) -> Self {
141 GroupProducer::new(info)
142 }
143}
144
145#[derive(Clone)]
147pub struct GroupConsumer {
148 state: watch::Receiver<GroupState>,
150
151 pub info: Group,
153
154 index: usize,
157
158 active: Option<FrameConsumer>,
160}
161
162impl GroupConsumer {
163 pub async fn read_frame(&mut self) -> Result<Option<Bytes>> {
165 if self.active.is_none() {
168 self.active = self.next_frame().await?;
169 };
170
171 let frame = match self.active.as_mut() {
173 Some(frame) => frame.read_all().await?,
174 None => return Ok(None),
175 };
176
177 self.active = None;
178
179 Ok(Some(frame))
180 }
181
182 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>> {
184 if let Some(frame) = self.active.take() {
186 return Ok(Some(frame));
187 }
188
189 loop {
190 {
191 let state = self.state.borrow_and_update();
192
193 if let Some(frame) = state.frames.get(self.index).cloned() {
194 self.index += 1;
195 return Ok(Some(frame));
196 }
197
198 match &state.closed {
199 Some(Ok(_)) => return Ok(None),
200 Some(Err(err)) => return Err(err.clone()),
201 _ => {}
202 }
203 }
204
205 if self.state.changed().await.is_err() {
206 return Err(Error::Cancel);
207 }
208 }
209 }
210}