1use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{collections::VecDeque, future::Future};
22
23const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28pub struct Track {
29 pub name: String,
30 pub priority: u8,
31}
32
33impl Track {
34 pub fn new<T: Into<String>>(name: T) -> Self {
35 Self {
36 name: name.into(),
37 priority: 0,
38 }
39 }
40
41 pub fn produce(self) -> TrackProducer {
42 TrackProducer::new(self)
43 }
44}
45
46#[derive(Default)]
47struct TrackState {
48 groups: VecDeque<(tokio::time::Instant, GroupConsumer)>,
49 closed: Option<Result<()>>,
50 offset: usize,
51
52 max_sequence: Option<u64>,
53
54 drop_sequence: Option<u64>,
56}
57
58impl TrackState {
59 fn trim(&mut self, now: tokio::time::Instant) {
60 while let Some((timestamp, _)) = self.groups.front() {
61 if now.saturating_duration_since(*timestamp) > MAX_CACHE {
62 let (_, group) = self.groups.pop_front().unwrap();
63 self.drop_sequence = Some(self.drop_sequence.unwrap_or(0).max(group.info.sequence));
64 self.offset += 1;
65 } else {
66 break;
67 }
68 }
69 }
70}
71
72#[derive(Clone)]
74pub struct TrackProducer {
75 pub info: Track,
76 state: watch::Sender<TrackState>,
77}
78
79impl TrackProducer {
80 pub fn new(info: Track) -> Self {
81 Self {
82 info,
83 state: Default::default(),
84 }
85 }
86
87 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
89 self.state.send_if_modified(|state| {
90 assert!(state.closed.is_none());
91 let now = tokio::time::Instant::now();
92 state.trim(now);
93 state.groups.push_back((now, group.clone()));
94 state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
95 true
96 })
97 }
98
99 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
103 let group = info.produce();
104 self.insert_group(group.consume()).then_some(group)
105 }
106
107 pub fn append_group(&mut self) -> GroupProducer {
109 let mut producer = None;
110
111 self.state.send_if_modified(|state| {
112 assert!(state.closed.is_none());
113
114 let now = tokio::time::Instant::now();
115 state.trim(now);
116
117 let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1);
118 let group = Group { sequence }.produce();
119 state.groups.push_back((now, group.consume()));
120 state.max_sequence = Some(sequence);
121
122 producer = Some(group);
123 true
124 });
125
126 producer.unwrap()
127 }
128
129 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
131 let mut group = self.append_group();
132 group.write_frame(frame.into());
133 group.close();
134 }
135
136 pub fn close(self) {
137 self.state.send_modify(|state| state.closed = Some(Ok(())));
138 }
139
140 pub fn abort(self, err: Error) {
141 self.state.send_modify(|state| state.closed = Some(Err(err)));
142 }
143
144 pub fn consume(&self) -> TrackConsumer {
146 let state = self.state.borrow();
147 TrackConsumer {
148 info: self.info.clone(),
149 state: self.state.subscribe(),
150 index: state.offset + state.groups.len().saturating_sub(1),
152 }
153 }
154
155 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
157 let state = self.state.clone();
158 async move {
159 state.closed().await;
160 }
161 }
162
163 pub fn is_closed(&self) -> bool {
165 self.state.borrow().closed.is_some()
166 }
167
168 pub fn is_clone(&self, other: &Self) -> bool {
170 self.state.same_channel(&other.state)
171 }
172}
173
174impl From<Track> for TrackProducer {
175 fn from(info: Track) -> Self {
176 TrackProducer::new(info)
177 }
178}
179
180#[derive(Clone)]
182pub struct TrackConsumer {
183 pub info: Track,
184 state: watch::Receiver<TrackState>,
185 index: usize,
186}
187
188impl TrackConsumer {
189 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
193 let Ok(state) = self
195 .state
196 .wait_for(|state| {
197 let index = self.index.saturating_sub(state.offset);
198 state.groups.get(index).is_some() || state.closed.is_some()
199 })
200 .await
201 else {
202 return Err(Error::Cancel);
203 };
204
205 let index = self.index.saturating_sub(state.offset);
206 if let Some(group) = state.groups.get(index) {
207 self.index = state.offset + index + 1;
208 return Ok(Some(group.1.clone()));
209 }
210
211 match &state.closed {
212 Some(Ok(_)) => Ok(None),
213 Some(Err(err)) => Err(err.clone()),
214 _ => unreachable!(),
215 }
216 }
217
218 pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
222 let mut state = self.state.clone();
223
224 let Ok(state) = state
225 .wait_for(|state| {
226 if state.closed.is_some() {
227 return true;
228 }
229
230 if let Some(drop_sequence) = state.drop_sequence
231 && drop_sequence >= sequence
232 {
233 return true;
234 }
235
236 state.groups.iter().any(|(_, group)| group.info.sequence == sequence)
237 })
238 .await
239 else {
240 return Err(Error::Cancel);
241 };
242
243 if let Some((_, group)) = state.groups.iter().find(|(_, group)| group.info.sequence == sequence) {
244 return Ok(Some(group.clone()));
245 }
246
247 match &state.closed {
248 Some(Ok(_)) => Ok(None), Some(Err(err)) => Err(err.clone()),
250 None => Ok(None), }
252 }
253
254 pub async fn closed(&self) -> Result<()> {
256 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
257 Ok(state) => state.closed.clone().unwrap(),
258 Err(_) => Err(Error::Cancel),
259 }
260 }
261
262 pub fn is_clone(&self, other: &Self) -> bool {
263 self.state.same_channel(&other.state)
264 }
265}
266
267#[cfg(test)]
268use futures::FutureExt;
269
270#[cfg(test)]
271impl TrackConsumer {
272 pub fn assert_group(&mut self) -> GroupConsumer {
273 self.next_group()
274 .now_or_never()
275 .expect("group would have blocked")
276 .expect("would have errored")
277 .expect("track was closed")
278 }
279
280 pub fn assert_no_group(&mut self) {
281 assert!(
282 self.next_group().now_or_never().is_none(),
283 "next group would not have blocked"
284 );
285 }
286
287 pub fn assert_not_closed(&self) {
288 assert!(self.closed().now_or_never().is_none(), "should not be closed");
289 }
290
291 pub fn assert_closed(&self) {
292 assert!(self.closed().now_or_never().is_some(), "should be closed");
293 }
294
295 pub fn assert_error(&self) {
297 assert!(
298 self.closed().now_or_never().expect("should not block").is_err(),
299 "should be error"
300 );
301 }
302
303 pub fn assert_is_clone(&self, other: &Self) {
304 assert!(self.is_clone(other), "should be clone");
305 }
306
307 pub fn assert_not_clone(&self, other: &Self) {
308 assert!(!self.is_clone(other), "should not be clone");
309 }
310}