moq_transfork/model/
announced.rs

1use moq_async::{Lock, LockWeak};
2use std::{
3	collections::{BTreeSet, VecDeque},
4	fmt,
5};
6use tokio::sync::mpsc;
7
8pub use crate::message::Filter;
9use crate::message::FilterMatch;
10
11/// The suffix of each announced track.
12#[derive(Clone, Debug, PartialEq, Eq)]
13pub enum Announced {
14	// Indicates the track, returning the captured wildcard.
15	Active(AnnouncedMatch),
16
17	// Indicates the track is no longer active, returning the captured wildcard.
18	Ended(AnnouncedMatch),
19
20	// Indicates we're caught up to the current state of the world.
21	Live,
22}
23
24#[cfg(test)]
25impl Announced {
26	pub fn assert_active(&self, expected: &str) {
27		match self {
28			Announced::Active(m) => assert_eq!(m.capture(), expected),
29			_ => panic!("expected active announce"),
30		}
31	}
32
33	pub fn assert_ended(&self, expected: &str) {
34		match self {
35			Announced::Ended(m) => assert_eq!(m.capture(), expected),
36			_ => panic!("expected ended announce"),
37		}
38	}
39
40	pub fn assert_live(&self) {
41		match self {
42			Announced::Live => (),
43			_ => panic!("expected live announce"),
44		}
45	}
46}
47
48// An owned version of FilterMatch
49#[derive(Clone, PartialEq, Eq)]
50pub struct AnnouncedMatch {
51	full: String,
52	capture: (usize, usize),
53}
54
55impl AnnouncedMatch {
56	pub fn full(&self) -> &str {
57		&self.full
58	}
59
60	pub fn capture(&self) -> &str {
61		&self.full[self.capture.0..self.capture.1]
62	}
63
64	pub fn to_full(self) -> String {
65		self.full
66	}
67
68	pub fn to_capture(mut self) -> String {
69		self.full.truncate(self.capture.1);
70		self.full.split_off(self.capture.0)
71	}
72}
73
74impl From<FilterMatch<'_>> for AnnouncedMatch {
75	fn from(value: FilterMatch) -> Self {
76		AnnouncedMatch {
77			full: value.full().to_string(),
78			capture: value.capture_index(),
79		}
80	}
81}
82
83impl fmt::Debug for AnnouncedMatch {
84	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85		f.debug_struct("AnnouncedMatch")
86			.field("full", &self.full())
87			.field("capture", &self.capture())
88			.finish()
89	}
90}
91
92#[derive(Default)]
93struct ProducerState {
94	active: BTreeSet<String>,
95	consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
96	live: bool,
97}
98
99impl ProducerState {
100	fn insert(&mut self, path: String) -> bool {
101		if !self.active.insert(path.clone()) {
102			return false;
103		}
104
105		let mut i = 0;
106
107		while let Some((consumer, notify)) = self.consumers.get(i) {
108			if !notify.is_closed() {
109				consumer.lock().insert(&path);
110				notify.try_send(()).ok();
111				i += 1;
112			} else {
113				self.consumers.swap_remove(i);
114			}
115		}
116
117		true
118	}
119
120	fn remove(&mut self, path: &str) -> bool {
121		if !self.active.remove(path) {
122			return false;
123		}
124
125		let mut i = 0;
126
127		while let Some((consumer, notify)) = self.consumers.get(i) {
128			if !notify.is_closed() {
129				consumer.lock().remove(path);
130				notify.try_send(()).ok();
131				i += 1;
132			} else {
133				self.consumers.swap_remove(i);
134			}
135		}
136
137		true
138	}
139
140	fn live(&mut self) -> bool {
141		if self.live {
142			return false;
143		}
144
145		self.live = true;
146
147		let mut i = 0;
148		while let Some((consumer, notify)) = self.consumers.get(i) {
149			if !notify.is_closed() {
150				consumer.lock().live();
151				notify.try_send(()).ok();
152				i += 1;
153			} else {
154				self.consumers.swap_remove(i);
155			}
156		}
157
158		true
159	}
160
161	fn consumer(&mut self, filter: Filter) -> ConsumerState {
162		let mut added = VecDeque::new();
163
164		for active in &self.active {
165			if let Some(m) = filter.matches(active) {
166				added.push_back(m.into());
167			}
168		}
169
170		ConsumerState {
171			added,
172			removed: VecDeque::new(),
173			filter,
174			live: self.live,
175		}
176	}
177
178	fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
179		let (tx, rx) = mpsc::channel(1);
180		self.consumers.push((consumer.clone(), tx));
181		rx
182	}
183}
184
185impl Drop for ProducerState {
186	fn drop(&mut self) {
187		for (consumer, notify) in &self.consumers {
188			let mut consumer = consumer.lock();
189			for path in &self.active {
190				consumer.remove(path);
191			}
192
193			notify.try_send(()).ok();
194		}
195	}
196}
197
198#[derive(Clone)]
199struct ConsumerState {
200	filter: Filter,
201	added: VecDeque<AnnouncedMatch>,
202	removed: VecDeque<AnnouncedMatch>,
203	live: bool,
204}
205
206impl ConsumerState {
207	pub fn insert(&mut self, path: &str) {
208		let added: AnnouncedMatch = match self.filter.matches(path) {
209			Some(m) => m.into(),
210			None => return,
211		};
212
213		// Remove any matches that haven't been consumed yet.
214		// TODO make this faster while maintaining order
215		if let Some(index) = self
216			.removed
217			.iter()
218			.position(|removed| removed.capture() == added.capture())
219		{
220			self.removed.remove(index);
221		} else {
222			self.added.push_back(added);
223		}
224	}
225
226	pub fn remove(&mut self, path: &str) {
227		let removed: AnnouncedMatch = match self.filter.matches(path) {
228			Some(m) => m.into(),
229			None => return,
230		};
231
232		// Remove any matches that haven't been consumed yet.
233		// TODO make this faster while maintaining insertion order.
234		if let Some(index) = self.added.iter().position(|added| added.capture() == removed.capture()) {
235			self.added.remove(index);
236		} else {
237			self.removed.push_back(removed);
238		}
239	}
240
241	pub fn live(&mut self) {
242		self.live = true;
243	}
244
245	pub fn reset(&mut self) {
246		self.added.clear();
247		self.removed.clear();
248		self.live = false;
249	}
250}
251
252/// Announces tracks to consumers over the network.
253// TODO Cloning Producers is questionable. It might be better to chain them (consumer -> producer).
254#[derive(Default, Clone)]
255pub struct AnnouncedProducer {
256	state: Lock<ProducerState>,
257}
258
259impl AnnouncedProducer {
260	pub fn new() -> Self {
261		Self::default()
262	}
263
264	/// Announce a track, returning true if it's new.
265	pub fn announce<T: ToString>(&mut self, path: T) -> bool {
266		let path = path.to_string();
267		let mut state = self.state.lock();
268		state.insert(path)
269	}
270
271	/// Check if a track is active.
272	pub fn is_active(&self, path: &str) -> bool {
273		self.state.lock().active.contains(path)
274	}
275
276	/// Check if any tracks are active.
277	pub fn is_empty(&self) -> bool {
278		self.state.lock().active.is_empty()
279	}
280
281	/// Stop announcing a track, returning true if it was active.
282	pub fn unannounce(&mut self, path: &str) -> bool {
283		let mut state = self.state.lock();
284		state.remove(path)
285	}
286
287	/// Indicate that we're caught up to the current state of the world.
288	pub fn live(&mut self) -> bool {
289		let mut state = self.state.lock();
290		state.live()
291	}
292
293	/// Subscribe to all announced tracks matching the (wildcard) filter.
294	pub fn subscribe(&self, filter: Filter) -> AnnouncedConsumer {
295		let mut state = self.state.lock();
296		let consumer = Lock::new(state.consumer(filter));
297		let notify = state.subscribe(consumer.clone());
298		AnnouncedConsumer::new(self.state.downgrade(), consumer, notify)
299	}
300
301	/// Clear all announced tracks.
302	pub fn reset(&mut self) {
303		let mut state = self.state.lock();
304
305		let mut i = 0;
306		while let Some((consumer, notify)) = state.consumers.get(i) {
307			if !notify.is_closed() {
308				consumer.lock().reset();
309				i += 1;
310			} else {
311				state.consumers.swap_remove(i);
312			}
313		}
314	}
315
316	/// Wait until all consumers have been dropped.
317	///
318	/// NOTE: subscribe can be called to unclose the producer.
319	pub async fn closed(&self) {
320		// Keep looping until all consumers are closed.
321		while let Some(notify) = self.closed_inner() {
322			notify.closed().await;
323		}
324	}
325
326	// Returns the closed notify of any consumer.
327	fn closed_inner(&self) -> Option<mpsc::Sender<()>> {
328		let mut state = self.state.lock();
329
330		while let Some((_, notify)) = state.consumers.last() {
331			if !notify.is_closed() {
332				return Some(notify.clone());
333			}
334
335			state.consumers.pop();
336		}
337
338		None
339	}
340}
341
342/// Consumes announced tracks over the network matching an optional prefix.
343pub struct AnnouncedConsumer {
344	producer: LockWeak<ProducerState>,
345	state: Lock<ConsumerState>,
346	notify: mpsc::Receiver<()>,
347
348	// True if we've returned that the track is live.
349	live: bool,
350}
351
352impl AnnouncedConsumer {
353	fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
354		Self {
355			producer,
356			state,
357			notify,
358			live: false,
359		}
360	}
361
362	/// Returns the next announced track.
363	pub async fn next(&mut self) -> Option<Announced> {
364		loop {
365			{
366				let mut state = self.state.lock();
367
368				if let Some(removed) = state.removed.pop_front() {
369					return Some(Announced::Ended(removed));
370				}
371
372				if let Some(added) = state.added.pop_front() {
373					return Some(Announced::Active(added));
374				}
375
376				if !self.live && state.live {
377					self.live = true;
378					return Some(Announced::Live);
379				}
380			}
381
382			self.notify.recv().await?;
383		}
384	}
385}
386
387// ugh
388// Cloning consumers is problematic because it encourages idle consumers.
389// It's also just a pain in the butt to implement.
390// TODO figure out a way to remove this.
391impl Clone for AnnouncedConsumer {
392	fn clone(&self) -> Self {
393		let consumer = Lock::new(self.state.lock().clone());
394
395		match self.producer.upgrade() {
396			Some(producer) => {
397				let mut producer = producer.lock();
398				let notify = producer.subscribe(consumer.clone());
399				AnnouncedConsumer::new(self.producer.clone(), consumer, notify)
400			}
401			None => {
402				let (_, notify) = mpsc::channel(1);
403				AnnouncedConsumer::new(self.producer.clone(), consumer, notify)
404			}
405		}
406	}
407}
408
409#[cfg(test)]
410use futures::FutureExt;
411
412#[cfg(test)]
413impl AnnouncedConsumer {
414	fn assert_active(&mut self, capture: &str) {
415		self.next()
416			.now_or_never()
417			.expect("would have blocked")
418			.expect("no next announcement")
419			.assert_active(capture);
420	}
421
422	fn assert_ended(&mut self, capture: &str) {
423		self.next()
424			.now_or_never()
425			.expect("would have blocked")
426			.expect("no next announcement")
427			.assert_ended(capture);
428	}
429
430	fn assert_wait(&mut self) {
431		assert_eq!(self.next().now_or_never(), None);
432	}
433
434	fn assert_done(&mut self) {
435		assert_eq!(self.next().now_or_never(), Some(None));
436	}
437
438	fn assert_live(&mut self) {
439		self.next()
440			.now_or_never()
441			.expect("would have blocked")
442			.expect("no next announcement")
443			.assert_live();
444	}
445}
446
447#[cfg(test)]
448mod test {
449	use super::*;
450
451	#[test]
452	fn simple() {
453		let mut producer = AnnouncedProducer::new();
454		let mut consumer = producer.subscribe(Filter::Any);
455
456		assert!(!producer.is_active("a/b"));
457		assert!(producer.announce("a/b"));
458		assert!(producer.is_active("a/b"));
459
460		consumer.assert_active("a/b");
461
462		assert!(producer.unannounce("a/b"));
463		assert!(!producer.is_active("a/b"));
464
465		consumer.assert_ended("a/b");
466		consumer.assert_wait();
467	}
468
469	#[test]
470	fn multi() {
471		let mut producer = AnnouncedProducer::new();
472		let mut consumer = producer.subscribe(Filter::Any);
473
474		assert!(producer.announce("a/b"));
475		assert!(producer.announce("a/c"));
476		assert!(producer.announce("d/e"));
477
478		// Make sure we get all of the paths in order.
479		consumer.assert_active("a/b");
480		consumer.assert_active("a/c");
481		consumer.assert_active("d/e");
482		consumer.assert_wait();
483	}
484
485	#[test]
486	fn late() {
487		let mut producer = AnnouncedProducer::new();
488
489		assert!(producer.announce("a/b"));
490		assert!(producer.announce("a/c"));
491
492		// Subscribe after announcing.
493		let mut consumer = producer.subscribe(Filter::Any);
494
495		assert!(producer.announce("d/e"));
496		assert!(producer.announce("d/d"));
497
498		// Make sure we get all of the paths in order.
499		consumer.assert_active("a/b");
500		consumer.assert_active("a/c");
501		consumer.assert_active("d/e");
502		consumer.assert_active("d/d");
503		consumer.assert_wait();
504	}
505
506	#[test]
507	fn prefix() {
508		let mut producer = AnnouncedProducer::new();
509		let mut consumer = producer.subscribe(Filter::Prefix("a/".into()));
510
511		assert!(producer.announce("a/b"));
512		assert!(producer.announce("a/c"));
513		assert!(producer.announce("d/e"));
514
515		consumer.assert_active("b");
516		consumer.assert_active("c");
517		consumer.assert_wait();
518	}
519
520	#[test]
521	fn prefix_unannounce() {
522		let mut producer = AnnouncedProducer::new();
523		let mut consumer = producer.subscribe(Filter::Prefix("a/".into()));
524
525		assert!(producer.announce("a/b"));
526		assert!(producer.announce("a/c"));
527		assert!(producer.announce("d/e"));
528
529		consumer.assert_active("b");
530		consumer.assert_active("c");
531		consumer.assert_wait();
532
533		assert!(producer.unannounce("d/e"));
534		assert!(producer.unannounce("a/c"));
535		assert!(producer.unannounce("a/b"));
536
537		consumer.assert_ended("c");
538		consumer.assert_ended("b");
539		consumer.assert_wait();
540	}
541
542	#[test]
543	fn flicker() {
544		let mut producer = AnnouncedProducer::new();
545		let mut consumer = producer.subscribe(Filter::Any);
546
547		assert!(!producer.is_active("a/b"));
548		assert!(producer.announce("a/b"));
549		assert!(producer.is_active("a/b"));
550		assert!(producer.unannounce("a/b"));
551		assert!(!producer.is_active("a/b"));
552
553		// We missed it.
554		consumer.assert_wait();
555	}
556
557	#[test]
558	fn dropped() {
559		let mut producer = AnnouncedProducer::new();
560		let mut consumer = producer.subscribe(Filter::Any);
561
562		producer.announce("a/b");
563		consumer.assert_active("a/b");
564		producer.announce("a/c");
565		consumer.assert_active("a/c");
566
567		// Don't consume "d/e" before dropping.
568		producer.announce("d/e");
569		drop(producer);
570
571		consumer.assert_ended("a/b");
572		consumer.assert_ended("a/c");
573		consumer.assert_done();
574	}
575
576	#[test]
577	fn live() {
578		let mut producer = AnnouncedProducer::new();
579		let mut consumer = producer.subscribe(Filter::Any);
580
581		producer.announce("a/b");
582		producer.live();
583		producer.announce("a/c");
584
585		consumer.assert_active("a/b");
586		consumer.assert_active("a/c");
587		// We actually get live after "a/c" because we were slow to consume.
588		consumer.assert_live();
589
590		producer.live(); // no-op
591		producer.announce("d/e");
592
593		consumer.assert_active("d/e");
594		consumer.assert_wait();
595	}
596
597	#[tokio::test]
598	async fn wakeup() {
599		tokio::time::pause();
600
601		let mut producer = AnnouncedProducer::new();
602		let mut consumer = producer.subscribe(Filter::Any);
603
604		tokio::spawn(async move {
605			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
606			producer.announce("a/b");
607			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
608			producer.announce("a/c");
609			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
610			producer.unannounce("a/b");
611			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
612			// Don't actually unannounce p2, just drop.
613			drop(producer);
614		});
615
616		consumer.next().await.unwrap().assert_active("a/b");
617		consumer.next().await.unwrap().assert_active("a/c");
618		consumer.next().await.unwrap().assert_ended("a/b");
619		consumer.next().await.unwrap().assert_ended("a/c");
620		assert_eq!(consumer.next().await, None);
621	}
622}