Skip to main content

moq_lite/model/
broadcast.rs

1use std::{
2	collections::HashMap,
3	future::Future,
4	sync::{
5		Arc,
6		atomic::{AtomicUsize, Ordering},
7	},
8};
9
10use crate::{Error, TrackConsumer, TrackProducer};
11use tokio::sync::watch;
12use web_async::Lock;
13
14use super::Track;
15
16struct State {
17	// When explicitly publishing, we hold a reference to the consumer.
18	// This prevents the track from being marked as "unused".
19	consumers: HashMap<String, TrackConsumer>,
20
21	// When requesting, we hold a reference to the producer for dynamic tracks.
22	// The track will be marked as "unused" when the last consumer is dropped.
23	producers: HashMap<String, TrackProducer>,
24}
25
26/// A collection of media tracks that can be published and subscribed to.
27///
28/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
29#[derive(Clone, Default)]
30pub struct Broadcast {
31	// NOTE: Broadcasts have no names because they're often relative.
32}
33
34impl Broadcast {
35	pub fn produce() -> BroadcastProducer {
36		BroadcastProducer::new()
37	}
38}
39
40/// Receive broadcast/track requests and return if we can fulfill them.
41pub struct BroadcastProducer {
42	state: Lock<State>,
43	closed: watch::Sender<bool>,
44	requested: (
45		async_channel::Sender<TrackProducer>,
46		async_channel::Receiver<TrackProducer>,
47	),
48	cloned: Arc<AtomicUsize>,
49}
50
51impl Default for BroadcastProducer {
52	fn default() -> Self {
53		Self::new()
54	}
55}
56
57impl BroadcastProducer {
58	pub fn new() -> Self {
59		Self {
60			state: Lock::new(State {
61				consumers: HashMap::new(),
62				producers: HashMap::new(),
63			}),
64			closed: Default::default(),
65			requested: async_channel::unbounded(),
66			cloned: Default::default(),
67		}
68	}
69
70	/// Return the next requested track.
71	pub async fn requested_track(&mut self) -> Option<TrackProducer> {
72		self.requested.1.recv().await.ok()
73	}
74
75	/// Produce a new track and insert it into the broadcast.
76	pub fn create_track(&mut self, track: Track) -> TrackProducer {
77		let track = TrackProducer::new(track);
78		self.insert_track(track.clone());
79		track
80	}
81
82	/// Insert a track into the lookup, returning true if it was unique.
83	///
84	/// NOTE: You probably want to [TrackProducer::clone] to keep publishing to the track.
85	pub fn insert_track(&mut self, track: TrackProducer) -> bool {
86		let mut state = self.state.lock();
87		state.consumers.insert(track.info.name.clone(), track.consume());
88		state.producers.insert(track.info.name.clone(), track).is_none()
89	}
90
91	/// Remove a track from the lookup.
92	pub fn remove_track(&mut self, name: &str) -> bool {
93		let mut state = self.state.lock();
94		state.consumers.remove(name).is_some() || state.producers.remove(name).is_some()
95	}
96
97	pub fn consume(&self) -> BroadcastConsumer {
98		BroadcastConsumer {
99			state: self.state.clone(),
100			closed: self.closed.subscribe(),
101			requested: self.requested.0.clone(),
102		}
103	}
104
105	pub fn close(&mut self) {
106		self.closed.send_modify(|closed| *closed = true);
107	}
108
109	/// Block until there are no more consumers.
110	///
111	/// A new consumer can be created by calling [Self::consume] and this will block again.
112	pub fn unused(&self) -> impl Future<Output = ()> + use<> {
113		let closed = self.closed.clone();
114		async move { closed.closed().await }
115	}
116
117	pub fn is_clone(&self, other: &Self) -> bool {
118		self.closed.same_channel(&other.closed)
119	}
120}
121
122impl Clone for BroadcastProducer {
123	fn clone(&self) -> Self {
124		self.cloned.fetch_add(1, Ordering::Relaxed);
125		Self {
126			state: self.state.clone(),
127			closed: self.closed.clone(),
128			requested: self.requested.clone(),
129			cloned: self.cloned.clone(),
130		}
131	}
132}
133
134impl Drop for BroadcastProducer {
135	fn drop(&mut self) {
136		if self.cloned.fetch_sub(1, Ordering::Relaxed) > 0 {
137			return;
138		}
139
140		// Cleanup any lingering state when the last producer is dropped.
141
142		// Close the sender so consumers can't send any more requests.
143		self.requested.0.close();
144
145		// Drain any remaining requests.
146		while let Ok(producer) = self.requested.1.try_recv() {
147			producer.abort(Error::Cancel);
148		}
149
150		let mut state = self.state.lock();
151
152		// Cleanup any published tracks.
153		state.consumers.clear();
154		state.producers.clear();
155	}
156}
157
158#[cfg(test)]
159use futures::FutureExt;
160
161#[cfg(test)]
162impl BroadcastProducer {
163	pub fn assert_used(&self) {
164		assert!(self.unused().now_or_never().is_none(), "should be used");
165	}
166
167	pub fn assert_unused(&self) {
168		assert!(self.unused().now_or_never().is_some(), "should be unused");
169	}
170
171	pub fn assert_request(&mut self) -> TrackProducer {
172		self.requested_track()
173			.now_or_never()
174			.expect("should not have blocked")
175			.expect("should be a request")
176	}
177
178	pub fn assert_no_request(&mut self) {
179		assert!(self.requested_track().now_or_never().is_none(), "should have blocked");
180	}
181}
182
183/// Subscribe to abitrary broadcast/tracks.
184#[derive(Clone)]
185pub struct BroadcastConsumer {
186	state: Lock<State>,
187	closed: watch::Receiver<bool>,
188	requested: async_channel::Sender<TrackProducer>,
189}
190
191impl BroadcastConsumer {
192	pub fn subscribe_track(&self, track: &Track) -> TrackConsumer {
193		let mut state = self.state.lock();
194
195		if let Some(producer) = state.producers.get(&track.name) {
196			return producer.consume();
197		}
198
199		// Otherwise we have never seen this track before and need to create a new producer.
200		let producer = track.clone().produce();
201		let consumer = producer.consume();
202
203		// Insert the producer into the lookup so we will deduplicate requests.
204		// This is not a subscriber so it doesn't count towards "used" subscribers.
205		match self.requested.try_send(producer.clone()) {
206			Ok(()) => {}
207			Err(_) => {
208				// If the BroadcastProducer is closed, immediately close the track.
209				// This is a bit more ergonomic than returning None.
210				producer.abort(Error::Cancel);
211				return consumer;
212			}
213		}
214
215		// Insert the producer into the lookup so we will deduplicate requests.
216		state.producers.insert(producer.info.name.clone(), producer.clone());
217
218		// Remove the track from the lookup when it's unused.
219		let state = self.state.clone();
220		web_async::spawn(async move {
221			producer.unused().await;
222			state.lock().producers.remove(&producer.info.name);
223		});
224
225		consumer
226	}
227
228	pub fn closed(&self) -> impl Future<Output = ()> {
229		// A hacky way to check if the broadcast is closed.
230		let mut closed = self.closed.clone();
231		async move {
232			closed.wait_for(|closed| *closed).await.ok();
233		}
234	}
235
236	/// Check if this is the exact same instance of a broadcast.
237	///
238	/// Duplicate names are allowed in the case of resumption.
239	pub fn is_clone(&self, other: &Self) -> bool {
240		self.closed.same_channel(&other.closed)
241	}
242}
243
244#[cfg(test)]
245impl BroadcastConsumer {
246	pub fn assert_not_closed(&self) {
247		assert!(self.closed().now_or_never().is_none(), "should not be closed");
248	}
249
250	pub fn assert_closed(&self) {
251		assert!(self.closed().now_or_never().is_some(), "should be closed");
252	}
253}
254
255#[cfg(test)]
256mod test {
257	use super::*;
258
259	#[tokio::test]
260	async fn insert() {
261		let mut producer = BroadcastProducer::new();
262		let mut track1 = Track::new("track1").produce();
263
264		// Make sure we can insert before a consumer is created.
265		producer.insert_track(track1.clone());
266		track1.append_group();
267
268		let consumer = producer.consume();
269
270		let mut track1_sub = consumer.subscribe_track(&Track::new("track1"));
271		track1_sub.assert_group();
272
273		let mut track2 = Track::new("track2").produce();
274		producer.insert_track(track2.clone());
275
276		let consumer2 = producer.consume();
277		let mut track2_consumer = consumer2.subscribe_track(&Track::new("track2"));
278		track2_consumer.assert_no_group();
279
280		track2.append_group();
281
282		track2_consumer.assert_group();
283	}
284
285	#[tokio::test]
286	async fn unused() {
287		let producer = BroadcastProducer::new();
288		producer.assert_unused();
289
290		// Create a new consumer.
291		let consumer1 = producer.consume();
292		producer.assert_used();
293
294		// It's also valid to clone the consumer.
295		let consumer2 = consumer1.clone();
296		producer.assert_used();
297
298		// Dropping one consumer doesn't make it unused.
299		drop(consumer1);
300		producer.assert_used();
301
302		drop(consumer2);
303		producer.assert_unused();
304
305		// Even though it's unused, we can still create a new consumer.
306		let consumer3 = producer.consume();
307		producer.assert_used();
308
309		let track1 = consumer3.subscribe_track(&Track::new("track1"));
310
311		// It doesn't matter if a subscription is alive, we only care about the broadcast handle.
312		// TODO is this the right behavior?
313		drop(consumer3);
314		producer.assert_unused();
315
316		drop(track1);
317	}
318
319	#[tokio::test]
320	async fn closed() {
321		let mut producer = BroadcastProducer::new();
322
323		let consumer = producer.consume();
324		consumer.assert_not_closed();
325
326		// Create a new track and insert it into the broadcast.
327		let mut track1 = producer.create_track(Track::new("track1"));
328		track1.append_group();
329
330		let mut track1c = consumer.subscribe_track(&track1.info);
331		let track2 = consumer.subscribe_track(&Track::new("track2"));
332
333		drop(producer);
334		consumer.assert_closed();
335
336		// The requested TrackProducer should have been dropped, so the track should be closed.
337		track2.assert_closed();
338
339		// But track1 is still open because we currently don't cascade the closed state.
340		track1c.assert_group();
341		track1c.assert_no_group();
342		track1c.assert_not_closed();
343
344		// TODO: We should probably cascade the closed state.
345		drop(track1);
346		track1c.assert_closed();
347	}
348
349	#[tokio::test]
350	async fn select() {
351		let mut producer = BroadcastProducer::new();
352
353		// Make sure this compiles; it's actually more involved than it should be.
354		tokio::select! {
355			_ = producer.unused() => {}
356			_ = producer.requested_track() => {}
357		}
358	}
359
360	#[tokio::test]
361	async fn requests() {
362		let mut producer = BroadcastProducer::new();
363
364		let consumer = producer.consume();
365		let consumer2 = consumer.clone();
366
367		let mut track1 = consumer.subscribe_track(&Track::new("track1"));
368		track1.assert_not_closed();
369		track1.assert_no_group();
370
371		// Make sure we deduplicate requests while track1 is still active.
372		let mut track2 = consumer2.subscribe_track(&Track::new("track1"));
373		track2.assert_is_clone(&track1);
374
375		// Get the requested track, and there should only be one.
376		let mut track3 = producer.assert_request();
377		producer.assert_no_request();
378
379		// Make sure the consumer is the same.
380		track3.consume().assert_is_clone(&track1);
381
382		// Append a group and make sure they all get it.
383		track3.append_group();
384		track1.assert_group();
385		track2.assert_group();
386
387		// Make sure that tracks are cancelled when the producer is dropped.
388		let track4 = consumer.subscribe_track(&Track::new("track2"));
389		drop(producer);
390
391		// Make sure the track is errored, not closed.
392		track4.assert_error();
393
394		let track5 = consumer2.subscribe_track(&Track::new("track3"));
395		track5.assert_error();
396	}
397
398	#[tokio::test]
399	async fn requested_unused() {
400		let mut broadcast = Broadcast::produce();
401
402		// Subscribe to a track that doesn't exist - this creates a request
403		let consumer1 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
404
405		// Get the requested track producer
406		let producer1 = broadcast.assert_request();
407
408		// The track producer should NOT be unused yet because there's a consumer
409		assert!(
410			producer1.unused().now_or_never().is_none(),
411			"track producer should be used"
412		);
413
414		// Making a new consumer will keep the producer alive
415		let consumer2 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
416		consumer2.assert_is_clone(&consumer1);
417
418		// Drop the consumer subscription
419		drop(consumer1);
420
421		// The track producer should NOT be unused yet because there's a consumer
422		assert!(
423			producer1.unused().now_or_never().is_none(),
424			"track producer should be used"
425		);
426
427		// Drop the second consumer, now the producer should be unused
428		drop(consumer2);
429
430		// BUG: The track producer should become unused after dropping the consumer,
431		// but it won't because the broadcast keeps a reference in the lookup HashMap
432		// This assertion will fail, demonstrating the bug
433		assert!(
434			producer1.unused().now_or_never().is_some(),
435			"track producer should be unused after consumer is dropped"
436		);
437
438		// TODO Unfortunately, we need to sleep for a little bit to detect when unused.
439		tokio::time::sleep(std::time::Duration::from_millis(1)).await;
440
441		// Now the cleanup task should have run and we can subscribe again to the unknown track.
442		let consumer3 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
443		let producer2 = broadcast.assert_request();
444
445		// Drop the consumer, now the producer should be unused
446		drop(consumer3);
447		assert!(
448			producer2.unused().now_or_never().is_some(),
449			"track producer should be unused after consumer is dropped"
450		);
451	}
452}