moq_transfork/model/
track.rs1use tokio::sync::watch;
16
17use super::{Group, GroupConsumer, GroupProducer};
18pub use crate::message::GroupOrder;
19use crate::Error;
20
21use std::{cmp::Ordering, ops, sync::Arc};
22
23#[derive(Clone, PartialEq, Eq, Debug)]
25pub struct Track {
26 pub path: String,
28
29 pub priority: i8,
31
32 pub order: GroupOrder,
34}
35
36impl Track {
37 pub fn new<S: ToString>(path: S) -> Self {
38 Self {
39 path: path.to_string(),
40 ..Default::default()
41 }
42 }
43
44 pub fn build() -> TrackBuilder {
45 TrackBuilder::new()
46 }
47
48 pub fn produce(self) -> (TrackProducer, TrackConsumer) {
49 let (send, recv) = watch::channel(TrackState::default());
50 let info = Arc::new(self);
51
52 let writer = TrackProducer::new(send, info.clone());
53 let reader = TrackConsumer::new(recv, info);
54
55 (writer, reader)
56 }
57}
58
59impl Default for Track {
60 fn default() -> Self {
61 Self {
62 path: Default::default(),
63 priority: 0,
64 order: GroupOrder::Desc,
65 }
66 }
67}
68
69pub struct TrackBuilder {
71 track: Track,
72}
73
74impl Default for TrackBuilder {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl TrackBuilder {
81 pub fn new() -> Self {
82 Self {
83 track: Default::default(),
84 }
85 }
86
87 pub fn path<T: ToString>(mut self, path: T) -> Self {
88 self.track.path = path.to_string();
89 self
90 }
91
92 pub fn priority(mut self, priority: i8) -> Self {
93 self.track.priority = priority;
94 self
95 }
96
97 pub fn group_order(mut self, order: GroupOrder) -> Self {
98 self.track.order = order;
99 self
100 }
101
102 pub fn produce(self) -> (TrackProducer, TrackConsumer) {
103 self.track.produce()
104 }
105
106 pub fn into(self) -> Track {
108 self.track
109 }
110}
111
112impl From<TrackBuilder> for Track {
113 fn from(builder: TrackBuilder) -> Self {
114 builder.track
115 }
116}
117
118#[derive(Debug)]
119struct TrackState {
120 latest: Option<GroupConsumer>,
121 closed: Result<(), Error>,
122}
123
124impl Default for TrackState {
125 fn default() -> Self {
126 Self {
127 latest: None,
128 closed: Ok(()),
129 }
130 }
131}
132
133#[derive(Clone, Debug)]
135pub struct TrackProducer {
136 pub info: Arc<Track>,
137 state: watch::Sender<TrackState>,
138}
139
140impl TrackProducer {
141 fn new(state: watch::Sender<TrackState>, info: Arc<Track>) -> Self {
142 Self { info, state }
143 }
144
145 pub fn create_group(&mut self, sequence: u64) -> GroupProducer {
147 let group = Group::new(sequence);
148 let (writer, reader) = group.produce();
149
150 self.state.send_if_modified(|state| {
151 if let Some(latest) = &state.latest {
152 match reader.sequence.cmp(&latest.sequence) {
153 Ordering::Less => return false, Ordering::Equal => return false, Ordering::Greater => (),
156 }
157 }
158
159 state.latest = Some(reader);
160 true
161 });
162
163 writer
164 }
165
166 pub fn append_group(&mut self) -> GroupProducer {
168 let sequence = self
170 .state
171 .borrow()
172 .latest
173 .as_ref()
174 .map_or(0, |group| group.sequence + 1);
175
176 self.create_group(sequence)
177 }
178
179 pub fn close(self, err: Error) {
181 self.state.send_modify(|state| {
182 state.closed = Err(err);
183 });
184 }
185
186 pub fn subscribe(&self) -> TrackConsumer {
188 TrackConsumer::new(self.state.subscribe(), self.info.clone())
189 }
190
191 pub async fn unused(&self) {
193 self.state.closed().await
194 }
195}
196
197impl ops::Deref for TrackProducer {
198 type Target = Track;
199
200 fn deref(&self) -> &Self::Target {
201 &self.info
202 }
203}
204
205#[derive(Clone, Debug)]
207pub struct TrackConsumer {
208 pub info: Arc<Track>,
209 state: watch::Receiver<TrackState>,
210 prev: Option<u64>, }
212
213impl TrackConsumer {
214 fn new(state: watch::Receiver<TrackState>, info: Arc<Track>) -> Self {
215 Self {
216 state,
217 info,
218 prev: None,
219 }
220 }
221
222 pub fn get_group(&self, sequence: u64) -> Result<GroupConsumer, Error> {
223 let state = self.state.borrow();
224
225 if let Some(latest) = &state.latest {
227 if latest.sequence == sequence {
228 return Ok(latest.clone());
229 }
230 }
231
232 state.closed.clone()?;
233 Err(Error::NotFound)
234 }
235
236 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>, Error> {
239 let state = match self
241 .state
242 .wait_for(|state| state.latest.as_ref().map(|group| group.sequence) != self.prev || state.closed.is_err())
243 .await
244 {
245 Ok(state) => state,
246 Err(_) => return Ok(None),
247 };
248
249 if let Some(group) = state.latest.as_ref() {
251 if Some(group.sequence) != self.prev {
252 self.prev = Some(group.sequence);
253 return Ok(Some(group.clone()));
254 }
255 }
256
257 Err(state.closed.clone().unwrap_err())
259 }
260
261 pub fn latest_group(&self) -> u64 {
263 let state = self.state.borrow();
264 state.latest.as_ref().map(|group| group.sequence).unwrap_or_default()
265 }
266
267 pub async fn closed(&self) -> Result<(), Error> {
268 match self.state.clone().wait_for(|state| state.closed.is_err()).await {
269 Ok(state) => state.closed.clone(),
270 Err(_) => Ok(()),
271 }
272 }
273}
274
275impl ops::Deref for TrackConsumer {
276 type Target = Track;
277
278 fn deref(&self) -> &Self::Target {
279 &self.info
280 }
281}