moq_lite/model/
origin.rs

1use std::collections::{HashMap, VecDeque};
2use tokio::sync::mpsc;
3use web_async::{Lock, LockWeak};
4
5use super::BroadcastConsumer;
6
7#[derive(Default)]
8struct ProducerState {
9	active: HashMap<String, BroadcastConsumer>,
10	consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
11}
12
13impl ProducerState {
14	fn publish(&mut self, path: String, broadcast: BroadcastConsumer) -> Option<BroadcastConsumer> {
15		let mut i = 0;
16
17		while let Some((consumer, notify)) = self.consumers.get(i) {
18			if !notify.is_closed() {
19				if consumer.lock().insert(&path, &broadcast) {
20					notify.try_send(()).ok();
21				}
22				i += 1;
23			} else {
24				self.consumers.swap_remove(i);
25			}
26		}
27
28		self.active.insert(path, broadcast)
29	}
30
31	fn consume<T: ToString>(&mut self, prefix: T) -> ConsumerState {
32		let prefix = prefix.to_string();
33		let mut updates = VecDeque::new();
34
35		for (path, broadcast) in self.active.iter() {
36			if let Some(suffix) = path.strip_prefix(&prefix) {
37				updates.push_back((suffix.to_string(), broadcast.clone()));
38			}
39		}
40
41		ConsumerState { prefix, updates }
42	}
43
44	fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
45		let (tx, rx) = mpsc::channel(1);
46		self.consumers.push((consumer.clone(), tx));
47		rx
48	}
49}
50
51#[derive(Clone)]
52struct ConsumerState {
53	prefix: String,
54	updates: VecDeque<(String, BroadcastConsumer)>,
55}
56
57impl ConsumerState {
58	pub fn insert(&mut self, path: &str, consumer: &BroadcastConsumer) -> bool {
59		if let Some(suffix) = path.strip_prefix(&self.prefix) {
60			self.updates.push_back((suffix.to_string(), consumer.clone()));
61			true
62		} else {
63			false
64		}
65	}
66}
67
68/// Announces broadcasts to consumers over the network.
69#[derive(Default, Clone)]
70pub struct OriginProducer {
71	state: Lock<ProducerState>,
72}
73
74impl OriginProducer {
75	pub fn new() -> Self {
76		Self::default()
77	}
78
79	/// Announce a broadcast, returning true if it was unique.
80	pub fn publish<S: ToString>(&mut self, path: S, broadcast: BroadcastConsumer) -> bool {
81		let path = path.to_string();
82		let unique = self.state.lock().publish(path.clone(), broadcast.clone()).is_none();
83
84		let state = self.state.clone();
85		web_async::spawn(async move {
86			broadcast.closed().await;
87			state.lock().active.remove(&path);
88		});
89
90		unique
91	}
92
93	/// Publish all broadcasts from the given origin.
94	pub fn publish_all(&mut self, broadcasts: OriginConsumer) {
95		self.publish_prefix("", broadcasts);
96	}
97
98	/// Publish all broadcasts from the given origin with an optional prefix.
99	pub fn publish_prefix(&mut self, prefix: &str, mut broadcasts: OriginConsumer) {
100		// Really gross that this just spawns a background task, but I want publishing to be sync.
101		let mut this = self.clone();
102
103		// Overkill to avoid allocating a string if the prefix is empty.
104		let prefix = match prefix {
105			"" => None,
106			prefix => Some(prefix.to_string()),
107		};
108
109		web_async::spawn(async move {
110			while let Some((suffix, broadcast)) = broadcasts.next().await {
111				let path = match &prefix {
112					Some(prefix) => format!("{}{}", prefix, suffix),
113					None => suffix,
114				};
115
116				this.publish(path, broadcast);
117			}
118		});
119	}
120
121	/// Get a specific broadcast by name.
122	pub fn consume(&self, path: &str) -> Option<BroadcastConsumer> {
123		self.state.lock().active.get(path).cloned()
124	}
125
126	/// Subscribe to all announced broadcasts.
127	pub fn consume_all(&self) -> OriginConsumer {
128		self.consume_prefix("")
129	}
130
131	/// Subscribe to all announced broadcasts matching the prefix.
132	pub fn consume_prefix<S: ToString>(&self, prefix: S) -> OriginConsumer {
133		let mut state = self.state.lock();
134		let consumer = Lock::new(state.consume(prefix));
135		let notify = state.subscribe(consumer.clone());
136		OriginConsumer::new(self.state.downgrade(), consumer, notify)
137	}
138
139	/// Wait until all consumers have been dropped.
140	///
141	/// NOTE: subscribe can be called to unclose the producer.
142	pub async fn unused(&self) {
143		// Keep looping until all consumers are closed.
144		while let Some(notify) = self.unused_inner() {
145			notify.closed().await;
146		}
147	}
148
149	// Returns the closed notify of any consumer.
150	fn unused_inner(&self) -> Option<mpsc::Sender<()>> {
151		let mut state = self.state.lock();
152
153		while let Some((_, notify)) = state.consumers.last() {
154			if !notify.is_closed() {
155				return Some(notify.clone());
156			}
157
158			state.consumers.pop();
159		}
160
161		None
162	}
163}
164
165/// Consumes announced broadcasts matching against an optional prefix.
166pub struct OriginConsumer {
167	producer: LockWeak<ProducerState>,
168	state: Lock<ConsumerState>,
169	notify: mpsc::Receiver<()>,
170}
171
172impl OriginConsumer {
173	fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
174		Self {
175			producer,
176			state,
177			notify,
178		}
179	}
180
181	/// Returns the next announced broadcast.
182	pub async fn next(&mut self) -> Option<(String, BroadcastConsumer)> {
183		loop {
184			{
185				let mut state = self.state.lock();
186
187				if let Some(update) = state.updates.pop_front() {
188					return Some(update);
189				}
190			}
191
192			self.notify.recv().await?;
193		}
194	}
195}
196
197// ugh
198// Cloning consumers is problematic because it encourages idle consumers.
199// It's also just a pain in the butt to implement.
200// TODO figure out a way to remove this.
201impl Clone for OriginConsumer {
202	fn clone(&self) -> Self {
203		let consumer = Lock::new(self.state.lock().clone());
204
205		match self.producer.upgrade() {
206			Some(producer) => {
207				let mut producer = producer.lock();
208				let notify = producer.subscribe(consumer.clone());
209				OriginConsumer::new(self.producer.clone(), consumer, notify)
210			}
211			None => {
212				let (_, notify) = mpsc::channel(1);
213				OriginConsumer::new(self.producer.clone(), consumer, notify)
214			}
215		}
216	}
217}