moq_lite/model/
broadcast.rs

1use std::{
2	collections::HashMap,
3	future::Future,
4	sync::{
5		Arc,
6		atomic::{AtomicUsize, Ordering},
7	},
8};
9
10use crate::{Error, Produce, TrackConsumer, TrackProducer};
11use tokio::sync::watch;
12use web_async::Lock;
13
14use super::Track;
15
16struct State {
17	// When explicitly publishing, we hold a reference to the consumer.
18	// This prevents the track from being marked as "unused".
19	published: HashMap<String, TrackConsumer>,
20
21	// When requesting, we hold a reference to the producer for dynamic tracks.
22	// The track will be marked as "unused" when the last consumer is dropped.
23	requested: HashMap<String, TrackProducer>,
24}
25
26/// A collection of media tracks that can be published and subscribed to.
27///
28/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
29#[derive(Clone, Default)]
30pub struct Broadcast {
31	// NOTE: Broadcasts have no names because they're often relative.
32}
33
34impl Broadcast {
35	pub fn produce() -> Produce<BroadcastProducer, BroadcastConsumer> {
36		let producer = BroadcastProducer::new();
37		let consumer = producer.consume();
38		Produce { producer, consumer }
39	}
40}
41
42/// Receive broadcast/track requests and return if we can fulfill them.
43pub struct BroadcastProducer {
44	state: Lock<State>,
45	closed: watch::Sender<bool>,
46	requested: (
47		async_channel::Sender<TrackProducer>,
48		async_channel::Receiver<TrackProducer>,
49	),
50	cloned: Arc<AtomicUsize>,
51}
52
53impl Default for BroadcastProducer {
54	fn default() -> Self {
55		Self::new()
56	}
57}
58
59impl BroadcastProducer {
60	fn new() -> Self {
61		Self {
62			state: Lock::new(State {
63				published: HashMap::new(),
64				requested: HashMap::new(),
65			}),
66			closed: Default::default(),
67			requested: async_channel::unbounded(),
68			cloned: Default::default(),
69		}
70	}
71
72	/// Return the next requested track.
73	pub async fn requested_track(&mut self) -> Option<TrackProducer> {
74		self.requested.1.recv().await.ok()
75	}
76
77	/// Produce a new track and insert it into the broadcast.
78	pub fn create_track(&mut self, track: Track) -> TrackProducer {
79		let track = track.clone().produce();
80		self.insert_track(track.consumer);
81		track.producer
82	}
83
84	/// Insert a track into the lookup, returning true if it was unique.
85	pub fn insert_track(&mut self, track: TrackConsumer) -> bool {
86		let mut state = self.state.lock();
87		let unique = state.published.insert(track.info.name.clone(), track.clone()).is_none();
88		let removed = state.requested.remove(&track.info.name).is_some();
89
90		unique && !removed
91	}
92
93	/// Remove a track from the lookup.
94	pub fn remove_track(&mut self, name: &str) -> bool {
95		let mut state = self.state.lock();
96		state.published.remove(name).is_some() || state.requested.remove(name).is_some()
97	}
98
99	pub fn consume(&self) -> BroadcastConsumer {
100		BroadcastConsumer {
101			state: self.state.clone(),
102			closed: self.closed.subscribe(),
103			requested: self.requested.0.clone(),
104		}
105	}
106
107	pub fn close(&mut self) {
108		self.closed.send_modify(|closed| *closed = true);
109	}
110
111	/// Block until there are no more consumers.
112	///
113	/// A new consumer can be created by calling [Self::consume] and this will block again.
114	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
115		let closed = self.closed.clone();
116		async move { closed.closed().await }
117	}
118
119	pub fn is_clone(&self, other: &Self) -> bool {
120		self.closed.same_channel(&other.closed)
121	}
122}
123
124impl Clone for BroadcastProducer {
125	fn clone(&self) -> Self {
126		self.cloned.fetch_add(1, Ordering::Relaxed);
127		Self {
128			state: self.state.clone(),
129			closed: self.closed.clone(),
130			requested: self.requested.clone(),
131			cloned: self.cloned.clone(),
132		}
133	}
134}
135
136impl Drop for BroadcastProducer {
137	fn drop(&mut self) {
138		if self.cloned.fetch_sub(1, Ordering::Relaxed) > 0 {
139			return;
140		}
141
142		// Cleanup any lingering state when the last producer is dropped.
143
144		// Close the sender so consumers can't send any more requests.
145		self.requested.0.close();
146
147		// Drain any remaining requests.
148		while let Ok(producer) = self.requested.1.try_recv() {
149			producer.abort(Error::Cancel);
150		}
151
152		let mut state = self.state.lock();
153
154		// Cleanup any published tracks.
155		state.published.clear();
156		state.requested.clear();
157	}
158}
159
160#[cfg(test)]
161use futures::FutureExt;
162
163#[cfg(test)]
164impl BroadcastProducer {
165	pub fn assert_used(&self) {
166		assert!(self.unused().now_or_never().is_none(), "should be used");
167	}
168
169	pub fn assert_unused(&self) {
170		assert!(self.unused().now_or_never().is_some(), "should be unused");
171	}
172
173	pub fn assert_request(&mut self) -> TrackProducer {
174		self.requested_track()
175			.now_or_never()
176			.expect("should not have blocked")
177			.expect("should be a request")
178	}
179
180	pub fn assert_no_request(&mut self) {
181		assert!(self.requested_track().now_or_never().is_none(), "should have blocked");
182	}
183}
184
185/// Subscribe to abitrary broadcast/tracks.
186#[derive(Clone)]
187pub struct BroadcastConsumer {
188	state: Lock<State>,
189	closed: watch::Receiver<bool>,
190	requested: async_channel::Sender<TrackProducer>,
191}
192
193impl BroadcastConsumer {
194	pub fn subscribe_track(&self, track: &Track) -> TrackConsumer {
195		let mut state = self.state.lock();
196
197		// Return any explictly published track.
198		if let Some(consumer) = state.published.get(&track.name).cloned() {
199			return consumer;
200		}
201
202		// Return any requested tracks.
203		if let Some(producer) = state.requested.get(&track.name) {
204			return producer.consume();
205		}
206
207		// Otherwise we have never seen this track before and need to create a new producer.
208		let track = track.clone().produce();
209		let producer = track.producer;
210		let consumer = track.consumer;
211
212		// Insert the producer into the lookup so we will deduplicate requests.
213		// This is not a subscriber so it doesn't count towards "used" subscribers.
214		match self.requested.try_send(producer.clone()) {
215			Ok(()) => {}
216			Err(_) => {
217				// If the BroadcastProducer is closed, immediately close the track.
218				// This is a bit more ergonomic than returning None.
219				producer.abort(Error::Cancel);
220				return consumer;
221			}
222		}
223
224		// Insert the producer into the lookup so we will deduplicate requests.
225		state.requested.insert(producer.info.name.clone(), producer.clone());
226
227		// Remove the track from the lookup when it's unused.
228		let state = self.state.clone();
229		web_async::spawn(async move {
230			producer.unused().await;
231			state.lock().requested.remove(&producer.info.name);
232		});
233
234		consumer
235	}
236
237	pub fn closed(&self) -> impl Future<Output = ()> {
238		// A hacky way to check if the broadcast is closed.
239		let mut closed = self.closed.clone();
240		async move {
241			closed.wait_for(|closed| *closed).await.ok();
242		}
243	}
244
245	/// Check if this is the exact same instance of a broadcast.
246	///
247	/// Duplicate names are allowed in the case of resumption.
248	pub fn is_clone(&self, other: &Self) -> bool {
249		self.closed.same_channel(&other.closed)
250	}
251}
252
253#[cfg(test)]
254impl BroadcastConsumer {
255	pub fn assert_not_closed(&self) {
256		assert!(self.closed().now_or_never().is_none(), "should not be closed");
257	}
258
259	pub fn assert_closed(&self) {
260		assert!(self.closed().now_or_never().is_some(), "should be closed");
261	}
262}
263
264#[cfg(test)]
265mod test {
266	use super::*;
267
268	#[tokio::test]
269	async fn insert() {
270		let mut producer = BroadcastProducer::new();
271		let mut track1 = Track::new("track1").produce();
272
273		// Make sure we can insert before a consumer is created.
274		producer.insert_track(track1.consumer);
275		track1.producer.append_group();
276
277		let consumer = producer.consume();
278
279		let mut track1_sub = consumer.subscribe_track(&track1.producer.info);
280		track1_sub.assert_group();
281
282		let mut track2 = Track::new("track2").produce();
283		producer.insert_track(track2.consumer);
284
285		let consumer2 = producer.consume();
286		let mut track2_consumer = consumer2.subscribe_track(&track2.producer.info);
287		track2_consumer.assert_no_group();
288
289		track2.producer.append_group();
290
291		track2_consumer.assert_group();
292	}
293
294	#[tokio::test]
295	async fn unused() {
296		let producer = BroadcastProducer::new();
297		producer.assert_unused();
298
299		// Create a new consumer.
300		let consumer1 = producer.consume();
301		producer.assert_used();
302
303		// It's also valid to clone the consumer.
304		let consumer2 = consumer1.clone();
305		producer.assert_used();
306
307		// Dropping one consumer doesn't make it unused.
308		drop(consumer1);
309		producer.assert_used();
310
311		drop(consumer2);
312		producer.assert_unused();
313
314		// Even though it's unused, we can still create a new consumer.
315		let consumer3 = producer.consume();
316		producer.assert_used();
317
318		let track1 = consumer3.subscribe_track(&Track::new("track1"));
319
320		// It doesn't matter if a subscription is alive, we only care about the broadcast handle.
321		// TODO is this the right behavior?
322		drop(consumer3);
323		producer.assert_unused();
324
325		drop(track1);
326	}
327
328	#[tokio::test]
329	async fn closed() {
330		let mut producer = BroadcastProducer::new();
331
332		let consumer = producer.consume();
333		consumer.assert_not_closed();
334
335		// Create a new track and insert it into the broadcast.
336		let mut track1 = Track::new("track1").produce();
337		track1.producer.append_group();
338		producer.insert_track(track1.consumer);
339
340		let mut track1c = consumer.subscribe_track(&track1.producer.info);
341		let track2 = consumer.subscribe_track(&Track::new("track2"));
342
343		drop(producer);
344		consumer.assert_closed();
345
346		// The requested TrackProducer should have been dropped, so the track should be closed.
347		track2.assert_closed();
348
349		// But track1 is still open because we currently don't cascade the closed state.
350		track1c.assert_group();
351		track1c.assert_no_group();
352		track1c.assert_not_closed();
353
354		// TODO: We should probably cascade the closed state.
355		drop(track1.producer);
356		track1c.assert_closed();
357	}
358
359	#[tokio::test]
360	async fn select() {
361		let mut producer = BroadcastProducer::new();
362
363		// Make sure this compiles; it's actually more involved than it should be.
364		tokio::select! {
365			_ = producer.unused() => {}
366			_ = producer.requested_track() => {}
367		}
368	}
369
370	#[tokio::test]
371	async fn requests() {
372		let mut producer = BroadcastProducer::new();
373
374		let consumer = producer.consume();
375		let consumer2 = consumer.clone();
376
377		let mut track1 = consumer.subscribe_track(&Track::new("track1"));
378		track1.assert_not_closed();
379		track1.assert_no_group();
380
381		// Make sure we deduplicate requests while track1 is still active.
382		let mut track2 = consumer2.subscribe_track(&Track::new("track1"));
383		track2.assert_is_clone(&track1);
384
385		// Get the requested track, and there should only be one.
386		let mut track3 = producer.assert_request();
387		producer.assert_no_request();
388
389		// Make sure the consumer is the same.
390		track3.consume().assert_is_clone(&track1);
391
392		// Append a group and make sure they all get it.
393		track3.append_group();
394		track1.assert_group();
395		track2.assert_group();
396
397		// Make sure that tracks are cancelled when the producer is dropped.
398		let track4 = consumer.subscribe_track(&Track::new("track2"));
399		drop(producer);
400
401		// Make sure the track is errored, not closed.
402		track4.assert_error();
403
404		let track5 = consumer2.subscribe_track(&Track::new("track3"));
405		track5.assert_error();
406	}
407
408	#[tokio::test]
409	async fn requested_unused() {
410		let mut broadcast = Broadcast::produce();
411
412		// Subscribe to a track that doesn't exist - this creates a request
413		let consumer1 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"));
414
415		// Get the requested track producer
416		let producer1 = broadcast.producer.assert_request();
417
418		// The track producer should NOT be unused yet because there's a consumer
419		assert!(
420			producer1.unused().now_or_never().is_none(),
421			"track producer should be used"
422		);
423
424		// Making a new consumer will keep the producer alive
425		let consumer2 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"));
426		consumer2.assert_is_clone(&consumer1);
427
428		// Drop the consumer subscription
429		drop(consumer1);
430
431		// The track producer should NOT be unused yet because there's a consumer
432		assert!(
433			producer1.unused().now_or_never().is_none(),
434			"track producer should be used"
435		);
436
437		// Drop the second consumer, now the producer should be unused
438		drop(consumer2);
439
440		// BUG: The track producer should become unused after dropping the consumer,
441		// but it won't because the broadcast keeps a reference in the lookup HashMap
442		// This assertion will fail, demonstrating the bug
443		assert!(
444			producer1.unused().now_or_never().is_some(),
445			"track producer should be unused after consumer is dropped"
446		);
447
448		// TODO Unfortunately, we need to sleep for a little bit to detect when unused.
449		tokio::time::sleep(std::time::Duration::from_millis(1)).await;
450
451		// Now the cleanup task should have run and we can subscribe again to the unknown track.
452		let consumer3 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"));
453		let producer2 = broadcast.producer.assert_request();
454
455		// Drop the consumer, now the producer should be unused
456		drop(consumer3);
457		assert!(
458			producer2.unused().now_or_never().is_some(),
459			"track producer should be unused after consumer is dropped"
460		);
461	}
462}