moq_lite/model/
announce.rs

1use std::collections::{BTreeSet, VecDeque};
2use tokio::sync::mpsc;
3use web_async::{Lock, LockWeak};
4
5// Re-export the announce message type.
6pub use crate::message::Announce;
7
8#[derive(Default)]
9struct ProducerState {
10	// A BTreeSet just for ordering so the unit tests are deterministic.
11	active: BTreeSet<String>,
12	consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
13}
14
15impl ProducerState {
16	fn insert(&mut self, path: String) -> bool {
17		if self.active.contains(&path) {
18			self.update(Announce::Ended { suffix: path.clone() });
19			self.update(Announce::Active { suffix: path });
20			return false;
21		}
22
23		self.active.insert(path.clone());
24		self.update(Announce::Active { suffix: path });
25		true
26	}
27
28	fn remove(&mut self, path: &str) -> bool {
29		let existing = self.active.remove(path);
30		if existing {
31			self.update(Announce::Ended {
32				suffix: path.to_string(),
33			});
34		}
35
36		existing
37	}
38
39	fn update(&mut self, update: Announce) {
40		let mut i = 0;
41
42		while let Some((consumer, notify)) = self.consumers.get(i) {
43			if !notify.is_closed() {
44				consumer.lock().push(update.clone());
45				notify.try_send(()).ok();
46				i += 1;
47			} else {
48				self.consumers.swap_remove(i);
49			}
50		}
51	}
52
53	fn consume<T: ToString>(&mut self, prefix: T) -> ConsumerState {
54		let prefix = prefix.to_string();
55		let mut init = VecDeque::new();
56
57		for active in self.active.iter() {
58			if let Some(suffix) = active.strip_prefix(&prefix) {
59				init.push_back(Announce::Active {
60					suffix: suffix.to_string(),
61				});
62			}
63		}
64
65		ConsumerState { prefix, updates: init }
66	}
67
68	fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
69		let (tx, rx) = mpsc::channel(1);
70		self.consumers.push((consumer.clone(), tx));
71		rx
72	}
73}
74
75impl Drop for ProducerState {
76	fn drop(&mut self) {
77		// Collect because I'm lazy and don't want to deal with the borrow checker.
78		while let Some(broadcast) = self.active.pop_first() {
79			self.update(Announce::Ended {
80				suffix: broadcast.clone(),
81			});
82		}
83	}
84}
85
86#[derive(Clone)]
87struct ConsumerState {
88	prefix: String,
89	updates: VecDeque<Announce>,
90}
91
92impl ConsumerState {
93	pub fn push(&mut self, update: Announce) {
94		match update {
95			Announce::Active { suffix } => {
96				if let Some(suffix) = suffix.strip_prefix(&self.prefix) {
97					self.updates.push_back(Announce::Active {
98						suffix: suffix.to_string(),
99					});
100				}
101			}
102			Announce::Ended { suffix } => {
103				if let Some(suffix) = suffix.strip_prefix(&self.prefix) {
104					self.updates.push_back(Announce::Ended {
105						suffix: suffix.to_string(),
106					});
107				}
108			}
109		}
110	}
111}
112
113/// Announces broadcasts to consumers over the network.
114#[derive(Default, Clone)]
115pub struct AnnounceProducer {
116	state: Lock<ProducerState>,
117}
118
119impl AnnounceProducer {
120	pub fn new() -> Self {
121		Self::default()
122	}
123
124	/// Announce a broadcast.
125	pub fn insert<T: ToString>(&mut self, path: T) -> bool {
126		self.state.lock().insert(path.to_string())
127	}
128
129	pub fn remove(&mut self, path: &str) -> bool {
130		self.state.lock().remove(path)
131	}
132
133	/// Check if a broadcast is active.
134	pub fn contains(&self, path: &str) -> bool {
135		self.state.lock().active.contains(path)
136	}
137
138	/// Check if any broadcasts are active.
139	pub fn is_empty(&self) -> bool {
140		self.state.lock().active.is_empty()
141	}
142
143	/// Subscribe to all announced tracks matching the prefix.
144	///
145	/// There will be an event each time a suffix starts and later ends.
146	pub fn consume<S: ToString>(&self, prefix: S) -> AnnounceConsumer {
147		let mut state = self.state.lock();
148		let consumer = Lock::new(state.consume(prefix));
149		let notify = state.subscribe(consumer.clone());
150		AnnounceConsumer::new(self.state.downgrade(), consumer, notify)
151	}
152
153	/// Wait until all consumers have been dropped.
154	///
155	/// NOTE: subscribe can be called to unclose the producer.
156	pub async fn unused(&self) {
157		// Keep looping until all consumers are closed.
158		while let Some(notify) = self.unused_inner() {
159			notify.closed().await;
160		}
161	}
162
163	// Returns the closed notify of any consumer.
164	fn unused_inner(&self) -> Option<mpsc::Sender<()>> {
165		let mut state = self.state.lock();
166
167		while let Some((_, notify)) = state.consumers.last() {
168			if !notify.is_closed() {
169				return Some(notify.clone());
170			}
171
172			state.consumers.pop();
173		}
174
175		None
176	}
177}
178
179/// Consumes announced tracks over the network matching an optional prefix.
180pub struct AnnounceConsumer {
181	producer: LockWeak<ProducerState>,
182	state: Lock<ConsumerState>,
183	notify: mpsc::Receiver<()>,
184}
185
186impl AnnounceConsumer {
187	fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
188		Self {
189			producer,
190			state,
191			notify,
192		}
193	}
194
195	/// Returns the next announced track.
196	pub async fn next(&mut self) -> Option<Announce> {
197		loop {
198			{
199				let mut state = self.state.lock();
200
201				if let Some(update) = state.updates.pop_front() {
202					return Some(update);
203				}
204			}
205
206			self.notify.recv().await?;
207		}
208	}
209
210	/// A helper to only get active broadcasts.
211	///
212	/// You can learn if a track has ended via its `closed` method.
213	pub async fn active(&mut self) -> Option<String> {
214		loop {
215			if let Some(Announce::Active { suffix }) = self.next().await {
216				return Some(suffix);
217			}
218		}
219	}
220}
221
222// ugh
223// Cloning consumers is problematic because it encourages idle consumers.
224// It's also just a pain in the butt to implement.
225// TODO figure out a way to remove this.
226impl Clone for AnnounceConsumer {
227	fn clone(&self) -> Self {
228		let consumer = Lock::new(self.state.lock().clone());
229
230		match self.producer.upgrade() {
231			Some(producer) => {
232				let mut producer = producer.lock();
233				let notify = producer.subscribe(consumer.clone());
234				AnnounceConsumer::new(self.producer.clone(), consumer, notify)
235			}
236			None => {
237				let (_, notify) = mpsc::channel(1);
238				AnnounceConsumer::new(self.producer.clone(), consumer, notify)
239			}
240		}
241	}
242}
243
244#[cfg(test)]
245use futures::FutureExt;
246
247#[cfg(test)]
248impl AnnounceConsumer {
249	fn assert_active(&mut self, suffix: &str) {
250		self.next()
251			.now_or_never()
252			.expect("would have blocked")
253			.expect("no next announcement")
254			.assert_active(suffix);
255	}
256
257	fn assert_ended(&mut self, suffix: &str) {
258		self.next()
259			.now_or_never()
260			.expect("would have blocked")
261			.expect("no next announcement")
262			.assert_ended(suffix);
263	}
264
265	fn assert_wait(&mut self) {
266		assert_eq!(self.next().now_or_never(), None);
267	}
268
269	fn assert_done(&mut self) {
270		assert_eq!(self.next().now_or_never(), Some(None));
271	}
272}
273
274#[cfg(test)]
275mod test {
276	use super::*;
277
278	#[test]
279	fn simple() {
280		let mut producer = AnnounceProducer::new();
281		let mut consumer = producer.consume("");
282		let ab = "a/b";
283
284		assert!(!producer.contains(ab));
285		assert!(producer.insert(ab));
286		assert!(producer.contains(ab));
287
288		consumer.assert_active(ab);
289
290		assert!(producer.remove(ab));
291		assert!(!producer.contains(ab));
292
293		consumer.assert_ended(ab);
294		consumer.assert_wait();
295	}
296
297	#[test]
298	fn duplicate() {
299		let mut producer = AnnounceProducer::new();
300		let mut consumer = producer.consume("");
301
302		let ab = "a/b";
303		let ab2 = "a/b";
304
305		assert!(producer.insert(ab));
306		assert!(producer.contains(ab));
307
308		// Doesn't matter if you use broadcast1 or broadcast2; they have the same path.
309		assert!(producer.contains(ab2));
310		consumer.assert_active(ab2);
311
312		// Duplicate announcement.
313		assert!(!producer.insert(ab2));
314
315		// Automatically insert an end/start pair.
316		consumer.assert_ended(ab);
317		consumer.assert_active(ab2);
318
319		drop(producer);
320
321		consumer.assert_ended(ab2);
322		consumer.assert_done();
323	}
324
325	#[test]
326	fn multi() {
327		let mut producer = AnnounceProducer::new();
328		let mut consumer = producer.consume("");
329
330		let ab = "a/b";
331		let ac = "a/c";
332		let de = "d/e";
333
334		assert!(producer.insert(ab));
335		assert!(producer.insert(ac));
336		assert!(producer.insert(de));
337
338		// Make sure we get all of the paths in order.
339		consumer.assert_active(ab);
340		consumer.assert_active(ac);
341		consumer.assert_active(de);
342		consumer.assert_wait();
343	}
344
345	#[test]
346	fn late() {
347		let mut producer = AnnounceProducer::new();
348		let ab = "a/b";
349		let ac = "a/c";
350		let de = "d/e";
351		let dd = "d/d";
352
353		assert!(producer.insert(ab));
354		assert!(producer.insert(ac));
355
356		// Subscribe after announcing.
357		let mut consumer = producer.consume("");
358
359		assert!(producer.insert(de));
360		assert!(producer.insert(dd));
361
362		// Make sure we get all of the paths in order.
363		consumer.assert_active(ab);
364		consumer.assert_active(ac);
365		consumer.assert_active(de);
366		consumer.assert_active(dd);
367		consumer.assert_wait();
368	}
369
370	#[test]
371	fn prefix() {
372		let mut producer = AnnounceProducer::new();
373		let mut consumer = producer.consume("a/");
374
375		let ab = "a/b";
376		let ac = "a/c";
377		let de = "d/e";
378
379		assert!(producer.insert(ab));
380		assert!(producer.insert(ac));
381		assert!(producer.insert(de));
382
383		consumer.assert_active("b");
384		consumer.assert_active("c");
385		consumer.assert_wait();
386	}
387
388	#[test]
389	fn prefix_unannounce() {
390		let mut producer = AnnounceProducer::new();
391		let mut consumer = producer.consume("a/");
392
393		let ab = "a/b";
394		let ac = "a/c";
395		let de = "d/e";
396
397		assert!(producer.insert(ab));
398		assert!(producer.insert(ac));
399		assert!(producer.insert(de));
400
401		consumer.assert_active("b");
402		consumer.assert_active("c");
403		consumer.assert_wait();
404
405		assert!(producer.remove(de));
406		assert!(producer.remove(ac));
407		assert!(producer.remove(ab));
408
409		consumer.assert_ended("c");
410		consumer.assert_ended("b");
411		consumer.assert_wait();
412	}
413
414	#[test]
415	fn flicker() {
416		let mut producer = AnnounceProducer::new();
417		let mut consumer = producer.consume("");
418		let ab = "a/b";
419
420		assert!(!producer.contains(ab));
421		assert!(producer.insert(ab));
422		assert!(producer.contains(ab));
423		assert!(producer.remove(ab));
424		assert!(!producer.contains(ab));
425
426		// We missed it, but we still get a start/stop pair.
427		consumer.assert_active(ab);
428		consumer.assert_ended(ab);
429		consumer.assert_wait();
430	}
431
432	#[test]
433	fn dropped() {
434		let mut producer = AnnounceProducer::new();
435		let mut consumer = producer.consume("");
436
437		let ab = "a/b";
438		let ac = "a/c";
439		let de = "d/e";
440
441		assert!(producer.insert(ab));
442		assert!(producer.insert(ac));
443
444		consumer.assert_active(ab);
445		consumer.assert_active(ac);
446
447		// Don't consume "d/e" before dropping.
448		producer.insert(de);
449		drop(producer);
450
451		consumer.assert_active(de);
452		consumer.assert_ended(ab);
453		consumer.assert_ended(ac);
454		consumer.assert_ended(de);
455		consumer.assert_done();
456	}
457
458	#[tokio::test]
459	async fn wakeup() {
460		tokio::time::pause();
461
462		let mut producer = AnnounceProducer::new();
463		let mut consumer = producer.consume("");
464
465		tokio::spawn(async move {
466			let ab = "a/b";
467			let ac = "a/c";
468
469			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
470			producer.insert(ab);
471			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
472			producer.insert(ac);
473			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
474			producer.remove(ab);
475			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
476			// Don't actually unannounce a/c, just drop.
477			drop(producer);
478		});
479
480		let ab = "a/b";
481		let ac = "a/c";
482
483		consumer.next().await.unwrap().assert_active(ab);
484		consumer.next().await.unwrap().assert_active(ac);
485		consumer.next().await.unwrap().assert_ended(ab);
486		consumer.next().await.unwrap().assert_ended(ac);
487		assert_eq!(consumer.next().await, None);
488	}
489}