1use tokio::sync::watch;
16
17use crate::{Error, Produce, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26 pub name: String,
27 pub priority: u8,
28}
29
30impl Track {
31 pub fn new<T: Into<String>>(name: T) -> Self {
32 Self {
33 name: name.into(),
34 priority: 0,
35 }
36 }
37
38 pub fn produce(self) -> Produce<TrackProducer, TrackConsumer> {
39 let producer = TrackProducer::new(self);
40 let consumer = producer.consume();
41 Produce { producer, consumer }
42 }
43}
44
45#[derive(Default)]
46struct TrackState {
47 latest: Option<GroupConsumer>,
48 closed: Option<Result<()>>,
49}
50
51#[derive(Clone)]
53pub struct TrackProducer {
54 pub info: Track,
55 state: watch::Sender<TrackState>,
56}
57
58impl TrackProducer {
59 fn new(info: Track) -> Self {
60 Self {
61 info,
62 state: Default::default(),
63 }
64 }
65
66 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
68 self.state.send_if_modified(|state| {
69 assert!(state.closed.is_none());
70
71 if let Some(latest) = &state.latest {
72 match group.info.cmp(&latest.info) {
73 Ordering::Less => return false,
74 Ordering::Equal => return false,
75 Ordering::Greater => (),
76 }
77 }
78
79 state.latest = Some(group.clone());
80 true
81 })
82 }
83
84 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
88 let group = Group::produce(info);
89 self.insert_group(group.consumer).then_some(group.producer)
90 }
91
92 pub fn append_group(&mut self) -> GroupProducer {
94 let sequence = self
96 .state
97 .borrow()
98 .latest
99 .as_ref()
100 .map_or(0, |group| group.info.sequence + 1);
101
102 let group = Group { sequence };
103 self.create_group(group).unwrap()
104 }
105
106 pub fn close(self) {
107 self.state.send_modify(|state| state.closed = Some(Ok(())));
108 }
109
110 pub fn abort(self, err: Error) {
111 self.state.send_modify(|state| state.closed = Some(Err(err)));
112 }
113
114 pub fn consume(&self) -> TrackConsumer {
116 TrackConsumer {
117 info: self.info.clone(),
118 state: self.state.subscribe(),
119 prev: None,
120 }
121 }
122
123 pub fn unused(&self) -> impl Future<Output = ()> {
125 let state = self.state.clone();
126 async move {
127 state.closed().await;
128 }
129 }
130
131 pub fn is_clone(&self, other: &Self) -> bool {
133 self.state.same_channel(&other.state)
134 }
135}
136
137impl From<Track> for TrackProducer {
138 fn from(info: Track) -> Self {
139 TrackProducer::new(info)
140 }
141}
142
143#[derive(Clone)]
145pub struct TrackConsumer {
146 pub info: Track,
147 state: watch::Receiver<TrackState>,
148 prev: Option<u64>, }
150
151impl TrackConsumer {
152 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
156 let state = match self
158 .state
159 .wait_for(|state| {
160 state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
161 })
162 .await
163 {
164 Ok(state) => state,
165 Err(_) => return Err(Error::Cancel),
166 };
167
168 match &state.closed {
169 Some(Ok(_)) => return Ok(None),
170 Some(Err(err)) => return Err(err.clone()),
171 _ => {}
172 }
173
174 let group = state.latest.clone().unwrap();
176 self.prev = Some(group.info.sequence);
177
178 Ok(Some(group))
179 }
180
181 pub async fn closed(&self) -> Result<()> {
183 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
184 Ok(state) => state.closed.clone().unwrap(),
185 Err(_) => Err(Error::Cancel),
186 }
187 }
188
189 pub fn is_clone(&self, other: &Self) -> bool {
190 self.state.same_channel(&other.state)
191 }
192}
193
194#[cfg(test)]
195use futures::FutureExt;
196
197#[cfg(test)]
198impl TrackConsumer {
199 pub fn assert_group(&mut self) -> GroupConsumer {
200 self.next_group()
201 .now_or_never()
202 .expect("group would have blocked")
203 .expect("would have errored")
204 .expect("track was closed")
205 }
206
207 pub fn assert_no_group(&mut self) {
208 assert!(
209 self.next_group().now_or_never().is_none(),
210 "next group would not have blocked"
211 );
212 }
213
214 pub fn assert_not_closed(&self) {
215 assert!(self.closed().now_or_never().is_none(), "should not be closed");
216 }
217
218 pub fn assert_closed(&self) {
219 assert!(self.closed().now_or_never().is_some(), "should be closed");
220 }
221
222 pub fn assert_error(&self) {
224 assert!(
225 self.closed().now_or_never().expect("should not block").is_err(),
226 "should be error"
227 );
228 }
229
230 pub fn assert_is_clone(&self, other: &Self) {
231 assert!(self.is_clone(other), "should be clone");
232 }
233
234 pub fn assert_not_clone(&self, other: &Self) {
235 assert!(!self.is_clone(other), "should not be clone");
236 }
237}