1use tokio::sync::watch;
16
17use crate::{Error, Produce, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{cmp::Ordering, future::Future};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Track {
26 pub name: String,
27 pub priority: u8,
28}
29
30impl Track {
31 pub fn new<T: Into<String>>(name: T) -> Self {
32 Self {
33 name: name.into(),
34 priority: 0,
35 }
36 }
37
38 pub fn produce(self) -> Produce<TrackProducer, TrackConsumer> {
39 let producer = TrackProducer::new(self);
40 let consumer = producer.consume();
41 Produce { producer, consumer }
42 }
43}
44
45#[derive(Default)]
46struct TrackState {
47 latest: Option<GroupConsumer>,
48 closed: Option<Result<()>>,
49}
50
51#[derive(Clone)]
53pub struct TrackProducer {
54 pub info: Track,
55 state: watch::Sender<TrackState>,
56}
57
58impl TrackProducer {
59 fn new(info: Track) -> Self {
60 Self {
61 info,
62 state: Default::default(),
63 }
64 }
65
66 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
68 self.state.send_if_modified(|state| {
69 assert!(state.closed.is_none());
70
71 if let Some(latest) = &state.latest {
72 match group.info.cmp(&latest.info) {
73 Ordering::Less => return false,
74 Ordering::Equal => return false,
75 Ordering::Greater => (),
76 }
77 }
78
79 state.latest = Some(group.clone());
80 true
81 })
82 }
83
84 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
88 let group = info.produce();
89 self.insert_group(group.consumer).then_some(group.producer)
90 }
91
92 pub fn append_group(&mut self) -> GroupProducer {
94 let mut producer = None;
95
96 self.state.send_if_modified(|state| {
97 assert!(state.closed.is_none());
98
99 let sequence = state.latest.as_ref().map_or(0, |group| group.info.sequence + 1);
100 let group = Group { sequence }.produce();
101 state.latest = Some(group.consumer);
102 producer = Some(group.producer);
103
104 true
105 });
106
107 producer.unwrap()
108 }
109
110 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
112 let mut group = self.append_group();
113 group.write_frame(frame.into());
114 group.close();
115 }
116
117 pub fn close(self) {
118 self.state.send_modify(|state| state.closed = Some(Ok(())));
119 }
120
121 pub fn abort(self, err: Error) {
122 self.state.send_modify(|state| state.closed = Some(Err(err)));
123 }
124
125 pub fn consume(&self) -> TrackConsumer {
127 TrackConsumer {
128 info: self.info.clone(),
129 state: self.state.subscribe(),
130 prev: None,
131 }
132 }
133
134 pub fn unused(&self) -> impl Future<Output = ()> {
136 let state = self.state.clone();
137 async move {
138 state.closed().await;
139 }
140 }
141
142 pub fn is_clone(&self, other: &Self) -> bool {
144 self.state.same_channel(&other.state)
145 }
146}
147
148impl From<Track> for TrackProducer {
149 fn from(info: Track) -> Self {
150 TrackProducer::new(info)
151 }
152}
153
154#[derive(Clone)]
156pub struct TrackConsumer {
157 pub info: Track,
158 state: watch::Receiver<TrackState>,
159 prev: Option<u64>, }
161
162impl TrackConsumer {
163 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
167 let state = match self
169 .state
170 .wait_for(|state| {
171 state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some()
172 })
173 .await
174 {
175 Ok(state) => state,
176 Err(_) => return Err(Error::Cancel),
177 };
178
179 match &state.closed {
180 Some(Ok(_)) => return Ok(None),
181 Some(Err(err)) => return Err(err.clone()),
182 _ => {}
183 }
184
185 let group = state.latest.clone().unwrap();
187 self.prev = Some(group.info.sequence);
188
189 Ok(Some(group))
190 }
191
192 pub async fn closed(&self) -> Result<()> {
194 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
195 Ok(state) => state.closed.clone().unwrap(),
196 Err(_) => Err(Error::Cancel),
197 }
198 }
199
200 pub fn is_clone(&self, other: &Self) -> bool {
201 self.state.same_channel(&other.state)
202 }
203}
204
205#[cfg(test)]
206use futures::FutureExt;
207
208#[cfg(test)]
209impl TrackConsumer {
210 pub fn assert_group(&mut self) -> GroupConsumer {
211 self.next_group()
212 .now_or_never()
213 .expect("group would have blocked")
214 .expect("would have errored")
215 .expect("track was closed")
216 }
217
218 pub fn assert_no_group(&mut self) {
219 assert!(
220 self.next_group().now_or_never().is_none(),
221 "next group would not have blocked"
222 );
223 }
224
225 pub fn assert_not_closed(&self) {
226 assert!(self.closed().now_or_never().is_none(), "should not be closed");
227 }
228
229 pub fn assert_closed(&self) {
230 assert!(self.closed().now_or_never().is_some(), "should be closed");
231 }
232
233 pub fn assert_error(&self) {
235 assert!(
236 self.closed().now_or_never().expect("should not block").is_err(),
237 "should be error"
238 );
239 }
240
241 pub fn assert_is_clone(&self, other: &Self) {
242 assert!(self.is_clone(other), "should be clone");
243 }
244
245 pub fn assert_not_clone(&self, other: &Self) {
246 assert!(!self.is_clone(other), "should not be clone");
247 }
248}