1use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26 pub name: String,
27 pub priority: u8,
28}
29
30impl Track {
31 pub fn new<T: Into<String>>(name: T) -> Self {
32 Self {
33 name: name.into(),
34 priority: 0,
35 }
36 }
37
38 pub fn produce(self) -> TrackProducer {
39 TrackProducer::new(self)
40 }
41}
42
43#[derive(Default)]
44struct TrackState {
45 latest: Option<GroupConsumer>,
46 closed: Option<Result<()>>,
47}
48
49#[derive(Clone)]
51pub struct TrackProducer {
52 pub info: Track,
53 state: watch::Sender<TrackState>,
54}
55
56impl TrackProducer {
57 pub fn new(info: Track) -> Self {
58 Self {
59 info,
60 state: Default::default(),
61 }
62 }
63
64 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
66 self.state.send_if_modified(|state| {
67 assert!(state.closed.is_none());
68
69 if let Some(latest) = &state.latest {
70 match group.info.cmp(&latest.info) {
71 Ordering::Less => return false,
72 Ordering::Equal => return false,
73 Ordering::Greater => (),
74 }
75 }
76
77 state.latest = Some(group.clone());
78 true
79 })
80 }
81
82 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
86 let group = GroupProducer::new(info);
87 self.insert_group(group.consume()).then_some(group)
88 }
89
90 pub fn append_group(&mut self) -> GroupProducer {
92 let sequence = self
94 .state
95 .borrow()
96 .latest
97 .as_ref()
98 .map_or(0, |group| group.info.sequence + 1);
99
100 let group = Group { sequence };
101 self.create_group(group).unwrap()
102 }
103
104 pub fn finish(self) {
105 self.state.send_modify(|state| state.closed = Some(Ok(())));
106 }
107
108 pub fn abort(self, err: Error) {
109 self.state.send_modify(|state| state.closed = Some(Err(err)));
110 }
111
112 pub fn consume(&self) -> TrackConsumer {
114 TrackConsumer {
115 info: self.info.clone(),
116 state: self.state.subscribe(),
117 prev: None,
118 }
119 }
120
121 pub fn unused(&self) -> impl Future<Output = ()> {
123 let state = self.state.clone();
124 async move {
125 state.closed().await;
126 }
127 }
128
129 pub fn is_clone(&self, other: &Self) -> bool {
131 self.state.same_channel(&other.state)
132 }
133}
134
135impl From<Track> for TrackProducer {
136 fn from(info: Track) -> Self {
137 TrackProducer::new(info)
138 }
139}
140
141#[derive(Clone)]
143pub struct TrackConsumer {
144 pub info: Track,
145 state: watch::Receiver<TrackState>,
146 prev: Option<u64>, }
148
149impl TrackConsumer {
150 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
154 let state = match self
156 .state
157 .wait_for(|state| {
158 state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
159 })
160 .await
161 {
162 Ok(state) => state,
163 Err(_) => return Err(Error::Cancel),
164 };
165
166 match &state.closed {
167 Some(Ok(_)) => return Ok(None),
168 Some(Err(err)) => return Err(err.clone()),
169 _ => {}
170 }
171
172 let group = state.latest.clone().unwrap();
174 self.prev = Some(group.info.sequence);
175
176 Ok(Some(group))
177 }
178
179 pub async fn closed(&self) -> Result<()> {
181 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
182 Ok(state) => state.closed.clone().unwrap(),
183 Err(_) => Err(Error::Cancel),
184 }
185 }
186
187 pub fn is_clone(&self, other: &Self) -> bool {
188 self.state.same_channel(&other.state)
189 }
190}
191
192#[cfg(test)]
193use futures::FutureExt;
194
195#[cfg(test)]
196impl TrackConsumer {
197 pub fn assert_group(&mut self) -> GroupConsumer {
198 self.next_group()
199 .now_or_never()
200 .expect("group would have blocked")
201 .expect("would have errored")
202 .expect("track was closed")
203 }
204
205 pub fn assert_no_group(&mut self) {
206 assert!(
207 self.next_group().now_or_never().is_none(),
208 "next group would not have blocked"
209 );
210 }
211
212 pub fn assert_not_closed(&self) {
213 assert!(self.closed().now_or_never().is_none(), "should not be closed");
214 }
215
216 pub fn assert_closed(&self) {
217 assert!(self.closed().now_or_never().is_some(), "should be closed");
218 }
219
220 pub fn assert_error(&self) {
222 assert!(
223 self.closed().now_or_never().expect("should not block").is_err(),
224 "should be error"
225 );
226 }
227
228 pub fn assert_is_clone(&self, other: &Self) {
229 assert!(self.is_clone(other), "should be clone");
230 }
231
232 pub fn assert_not_clone(&self, other: &Self) {
233 assert!(!self.is_clone(other), "should not be clone");
234 }
235}