moq_lite/model/
origin.rs

1use std::collections::HashMap;
2
3use crate::{AnnounceConsumer, AnnounceProducer, BroadcastConsumer};
4use web_async::Lock;
5
6/// A collection of broadcasts, published by potentially multiple clients.
7#[derive(Clone, Default)]
8pub struct Origin {
9	// Tracks announced by clients.
10	unique: AnnounceProducer,
11
12	// Active broadcasts.
13	routes: Lock<HashMap<String, BroadcastConsumer>>,
14}
15
16impl Origin {
17	pub fn new() -> Self {
18		Self::default()
19	}
20
21	/// Announce a broadcast, replacing the previous announcement if it exists.
22	pub fn publish<T: ToString>(&mut self, path: T, broadcast: BroadcastConsumer) {
23		let path = path.to_string();
24		self.routes.lock().insert(path.clone(), broadcast.clone());
25		self.unique.insert(&path);
26
27		let mut this = self.clone();
28		web_async::spawn(async move {
29			// Wait until the broadcast is closed, then remove it from the lookup.
30			broadcast.closed().await;
31
32			// Remove the broadcast from the lookup only if it's not a duplicate.
33			let mut routes = this.routes.lock();
34
35			if let Some(existing) = routes.remove(&path) {
36				if !existing.ptr_eq(&broadcast) {
37					// Oops we were the duplicate, re-insert the original.
38					routes.insert(path.to_string(), broadcast.clone());
39				} else {
40					// We were the original, remove from the unique set.
41					this.unique.remove(&path);
42				}
43			}
44		});
45	}
46
47	/// Consume a broadcast by path.
48	pub fn consume(&self, path: &str) -> Option<BroadcastConsumer> {
49		// Return the most recently announced broadcast
50		self.routes.lock().get(path).cloned()
51	}
52
53	/// Discover any broadcasts published by the remote matching a prefix.
54	///
55	/// NOTE: The results contain the suffix only.
56	pub fn announced(&self, prefix: &str) -> AnnounceConsumer {
57		self.unique.consume(prefix)
58	}
59}