1use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::cmp::Ordering;
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26 pub name: String,
27 pub priority: i8,
28}
29
30impl Track {
31 pub fn produce(self) -> TrackProducer {
32 TrackProducer::new(self)
33 }
34}
35
36#[derive(Default)]
37struct TrackState {
38 latest: Option<GroupConsumer>,
39 closed: Option<Result<()>>,
40}
41
42#[derive(Clone)]
44pub struct TrackProducer {
45 pub info: Track,
46 state: watch::Sender<TrackState>,
47}
48
49impl TrackProducer {
50 pub fn new(info: Track) -> Self {
51 Self {
52 info,
53 state: Default::default(),
54 }
55 }
56
57 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
59 self.state.send_if_modified(|state| {
60 assert!(state.closed.is_none());
61
62 if let Some(latest) = &state.latest {
63 match group.info.cmp(&latest.info) {
64 Ordering::Less => return false,
65 Ordering::Equal => return false,
66 Ordering::Greater => (),
67 }
68 }
69
70 state.latest = Some(group.clone());
71 true
72 })
73 }
74
75 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
79 let group = GroupProducer::new(info);
80 self.insert_group(group.consume()).then_some(group)
81 }
82
83 pub fn append_group(&mut self) -> GroupProducer {
85 let sequence = self
87 .state
88 .borrow()
89 .latest
90 .as_ref()
91 .map_or(0, |group| group.info.sequence + 1);
92
93 let group = Group { sequence };
94 self.create_group(group).unwrap()
95 }
96
97 pub fn finish(&mut self) {
98 self.state.send_modify(|state| state.closed = Some(Ok(())));
99 }
100
101 pub fn abort(&mut self, err: Error) {
102 self.state.send_modify(|state| state.closed = Some(Err(err)));
103 }
104
105 pub fn consume(&self) -> TrackConsumer {
107 TrackConsumer {
108 info: self.info.clone(),
109 state: self.state.subscribe(),
110 prev: None,
111 }
112 }
113
114 pub async fn unused(&self) {
116 self.state.closed().await
117 }
118}
119
120impl From<Track> for TrackProducer {
121 fn from(info: Track) -> Self {
122 TrackProducer::new(info)
123 }
124}
125
126#[derive(Clone)]
128pub struct TrackConsumer {
129 pub info: Track,
130 state: watch::Receiver<TrackState>,
131 prev: Option<u64>, }
133
134impl TrackConsumer {
135 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
139 let state = match self
141 .state
142 .wait_for(|state| {
143 state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
144 })
145 .await
146 {
147 Ok(state) => state,
148 Err(_) => return Err(Error::Cancel),
149 };
150
151 match &state.closed {
152 Some(Ok(_)) => return Ok(None),
153 Some(Err(err)) => return Err(err.clone()),
154 _ => {}
155 }
156
157 let group = state.latest.clone().unwrap();
159 self.prev = Some(group.info.sequence);
160
161 Ok(Some(group))
162 }
163
164 pub async fn closed(&self) -> Result<()> {
166 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
167 Ok(state) => match &state.closed {
168 Some(Ok(_)) => Ok(()),
169 Some(Err(err)) => Err(err.clone()),
170 _ => unreachable!(),
171 },
172 Err(_) => Err(Error::Cancel),
173 }
174 }
175}