moq_transfork/model/
group.rs1use bytes::Bytes;
11use std::ops;
12use tokio::sync::watch;
13
14use crate::Error;
15
16use super::{Frame, FrameConsumer, FrameProducer};
17
18#[derive(Clone, PartialEq, Debug)]
20pub struct Group {
21 pub sequence: u64,
24}
25
26impl Group {
27 pub fn new(sequence: u64) -> Group {
28 Self { sequence }
29 }
30
31 pub fn produce(self) -> (GroupProducer, GroupConsumer) {
32 let (send, recv) = watch::channel(GroupState::default());
33
34 let writer = GroupProducer::new(send, self.clone());
35 let reader = GroupConsumer::new(recv, self);
36
37 (writer, reader)
38 }
39}
40
41#[derive(Debug)]
42struct GroupState {
43 frames: Vec<FrameConsumer>,
45
46 closed: Result<(), Error>,
48}
49
50impl Default for GroupState {
51 fn default() -> Self {
52 Self {
53 frames: Vec::new(),
54 closed: Ok(()),
55 }
56 }
57}
58
59#[derive(Clone, Debug)]
61pub struct GroupProducer {
62 state: watch::Sender<GroupState>,
64
65 pub info: Group,
67}
68
69impl GroupProducer {
70 fn new(state: watch::Sender<GroupState>, info: Group) -> Self {
71 Self { state, info }
72 }
73
74 pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
76 let frame = frame.into();
77 self.create_frame(frame.len()).write(frame);
78 }
79
80 pub fn create_frame(&mut self, size: usize) -> FrameProducer {
82 let (writer, reader) = Frame::new(size).produce();
83 self.state.send_modify(|state| state.frames.push(reader));
84 writer
85 }
86
87 pub fn frame_count(&self) -> usize {
88 self.state.borrow().frames.len()
89 }
90
91 pub fn subscribe(&self) -> GroupConsumer {
93 GroupConsumer::new(self.state.subscribe(), self.info.clone())
94 }
95
96 pub fn close(self, err: Error) {
98 self.state.send_modify(|state| {
99 state.closed = Err(err);
100 });
101 }
102}
103
104impl ops::Deref for GroupProducer {
105 type Target = Group;
106
107 fn deref(&self) -> &Self::Target {
108 &self.info
109 }
110}
111
112#[derive(Clone, Debug)]
114pub struct GroupConsumer {
115 state: watch::Receiver<GroupState>,
117
118 pub info: Group,
120
121 index: usize,
124
125 active: Option<FrameConsumer>,
127}
128
129impl GroupConsumer {
130 fn new(state: watch::Receiver<GroupState>, group: Group) -> Self {
131 Self {
132 state,
133 info: group,
134 index: 0,
135 active: None,
136 }
137 }
138
139 pub async fn read_frame(&mut self) -> Result<Option<Bytes>, Error> {
141 if self.active.is_none() {
144 self.active = match self.next_frame().await? {
145 Some(frame) => Some(frame),
146 None => return Ok(None),
147 };
148 };
149
150 let frame = self.active.as_mut().unwrap().read_all().await?;
152 self.active = None;
153
154 Ok(Some(frame))
155 }
156
157 pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>, Error> {
159 if let Some(frame) = self.active.take() {
161 return Ok(Some(frame));
162 }
163
164 loop {
165 {
166 let state = self.state.borrow_and_update();
167
168 if let Some(frame) = state.frames.get(self.index).cloned() {
169 self.index += 1;
170 return Ok(Some(frame));
171 }
172
173 state.closed.clone()?;
174 }
175
176 if self.state.changed().await.is_err() {
177 return Ok(None);
178 }
179 }
180 }
181
182 pub async fn closed(&self) -> Result<(), Error> {
183 match self.state.clone().wait_for(|state| state.closed.is_err()).await {
184 Ok(state) => state.closed.clone(),
185 Err(_) => Ok(()),
186 }
187 }
188}
189
190impl ops::Deref for GroupConsumer {
191 type Target = Group;
192
193 fn deref(&self) -> &Self::Target {
194 &self.info
195 }
196}