1use tokio::sync::watch;
16
17use crate::{Error, Result};
18
19use super::{Group, GroupConsumer, GroupProducer};
20
21use std::{collections::VecDeque, future::Future};
22
23const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27pub struct Track {
28 pub name: String,
29 pub priority: u8,
30}
31
32impl Track {
33 pub fn new<T: Into<String>>(name: T) -> Self {
34 Self {
35 name: name.into(),
36 priority: 0,
37 }
38 }
39
40 pub fn produce(self) -> TrackProducer {
41 TrackProducer::new(self)
42 }
43}
44
45#[derive(Default)]
46struct TrackState {
47 groups: VecDeque<(tokio::time::Instant, GroupConsumer)>,
48 closed: Option<Result<()>>,
49 offset: usize,
50
51 max_sequence: Option<u64>,
52
53 drop_sequence: Option<u64>,
55}
56
57impl TrackState {
58 fn trim(&mut self, now: tokio::time::Instant) {
59 while let Some((timestamp, _)) = self.groups.front() {
60 if now.saturating_duration_since(*timestamp) > MAX_CACHE {
61 let (_, group) = self.groups.pop_front().unwrap();
62 self.drop_sequence = Some(self.drop_sequence.unwrap_or(0).max(group.info.sequence));
63 self.offset += 1;
64 } else {
65 break;
66 }
67 }
68 }
69}
70
71#[derive(Clone)]
73pub struct TrackProducer {
74 pub info: Track,
75 state: watch::Sender<TrackState>,
76}
77
78impl TrackProducer {
79 pub fn new(info: Track) -> Self {
80 Self {
81 info,
82 state: Default::default(),
83 }
84 }
85
86 pub fn insert_group(&mut self, group: GroupConsumer) -> bool {
88 self.state.send_if_modified(|state| {
89 assert!(state.closed.is_none());
90 let now = tokio::time::Instant::now();
91 state.trim(now);
92 state.groups.push_back((now, group.clone()));
93 state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence));
94 true
95 })
96 }
97
98 pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> {
102 let group = info.produce();
103 self.insert_group(group.consume()).then_some(group)
104 }
105
106 pub fn append_group(&mut self) -> GroupProducer {
108 let mut producer = None;
109
110 self.state.send_if_modified(|state| {
111 assert!(state.closed.is_none());
112
113 let now = tokio::time::Instant::now();
114 state.trim(now);
115
116 let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1);
117 let group = Group { sequence }.produce();
118 state.groups.push_back((now, group.consume()));
119 state.max_sequence = Some(sequence);
120
121 producer = Some(group);
122 true
123 });
124
125 producer.unwrap()
126 }
127
128 pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) {
130 let mut group = self.append_group();
131 group.write_frame(frame.into());
132 group.close();
133 }
134
135 pub fn close(self) {
136 self.state.send_modify(|state| state.closed = Some(Ok(())));
137 }
138
139 pub fn abort(self, err: Error) {
140 self.state.send_modify(|state| state.closed = Some(Err(err)));
141 }
142
143 pub fn consume(&self) -> TrackConsumer {
145 let state = self.state.borrow();
146 TrackConsumer {
147 info: self.info.clone(),
148 state: self.state.subscribe(),
149 index: state.offset + state.groups.len().saturating_sub(1),
151 }
152 }
153
154 pub fn unused(&self) -> impl Future<Output = ()> + use<> {
156 let state = self.state.clone();
157 async move {
158 state.closed().await;
159 }
160 }
161
162 pub fn is_clone(&self, other: &Self) -> bool {
164 self.state.same_channel(&other.state)
165 }
166}
167
168impl From<Track> for TrackProducer {
169 fn from(info: Track) -> Self {
170 TrackProducer::new(info)
171 }
172}
173
174#[derive(Clone)]
176pub struct TrackConsumer {
177 pub info: Track,
178 state: watch::Receiver<TrackState>,
179 index: usize,
180}
181
182impl TrackConsumer {
183 pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>> {
187 let Ok(state) = self
189 .state
190 .wait_for(|state| {
191 let index = self.index.saturating_sub(state.offset);
192 state.groups.get(index).is_some() || state.closed.is_some()
193 })
194 .await
195 else {
196 return Err(Error::Cancel);
197 };
198
199 let index = self.index.saturating_sub(state.offset);
200 if let Some(group) = state.groups.get(index) {
201 self.index = state.offset + index + 1;
202 return Ok(Some(group.1.clone()));
203 }
204
205 match &state.closed {
206 Some(Ok(_)) => Ok(None),
207 Some(Err(err)) => Err(err.clone()),
208 _ => unreachable!(),
209 }
210 }
211
212 pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {
216 let mut state = self.state.clone();
217
218 let Ok(state) = state
219 .wait_for(|state| {
220 if state.closed.is_some() {
221 return true;
222 }
223
224 if let Some(drop_sequence) = state.drop_sequence
225 && drop_sequence >= sequence
226 {
227 return true;
228 }
229
230 state.groups.iter().any(|(_, group)| group.info.sequence == sequence)
231 })
232 .await
233 else {
234 return Err(Error::Cancel);
235 };
236
237 if let Some((_, group)) = state.groups.iter().find(|(_, group)| group.info.sequence == sequence) {
238 return Ok(Some(group.clone()));
239 }
240
241 match &state.closed {
242 Some(Ok(_)) => Ok(None), Some(Err(err)) => Err(err.clone()),
244 None => Ok(None), }
246 }
247
248 pub async fn closed(&self) -> Result<()> {
250 match self.state.clone().wait_for(|state| state.closed.is_some()).await {
251 Ok(state) => state.closed.clone().unwrap(),
252 Err(_) => Err(Error::Cancel),
253 }
254 }
255
256 pub fn is_clone(&self, other: &Self) -> bool {
257 self.state.same_channel(&other.state)
258 }
259}
260
261#[cfg(test)]
262use futures::FutureExt;
263
264#[cfg(test)]
265impl TrackConsumer {
266 pub fn assert_group(&mut self) -> GroupConsumer {
267 self.next_group()
268 .now_or_never()
269 .expect("group would have blocked")
270 .expect("would have errored")
271 .expect("track was closed")
272 }
273
274 pub fn assert_no_group(&mut self) {
275 assert!(
276 self.next_group().now_or_never().is_none(),
277 "next group would not have blocked"
278 );
279 }
280
281 pub fn assert_not_closed(&self) {
282 assert!(self.closed().now_or_never().is_none(), "should not be closed");
283 }
284
285 pub fn assert_closed(&self) {
286 assert!(self.closed().now_or_never().is_some(), "should be closed");
287 }
288
289 pub fn assert_error(&self) {
291 assert!(
292 self.closed().now_or_never().expect("should not block").is_err(),
293 "should be error"
294 );
295 }
296
297 pub fn assert_is_clone(&self, other: &Self) {
298 assert!(self.is_clone(other), "should be clone");
299 }
300
301 pub fn assert_not_clone(&self, other: &Self) {
302 assert!(!self.is_clone(other), "should not be clone");
303 }
304}