Skip to main content

moq_net/model/
broadcast.rs

1use std::{
2	collections::{HashMap, hash_map},
3	ops::Deref,
4	task::{Poll, ready},
5};
6
7use crate::{Error, TrackConsumer, TrackProducer, model::track::TrackWeak};
8
9use super::{OriginList, Track};
10
11/// A collection of media tracks that can be published and subscribed to.
12///
13/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
14#[derive(Clone, Debug, Default)]
15pub struct Broadcast {
16	/// The chain of origins the broadcast has traversed. Each relay appends its own
17	/// [`crate::Origin`] when forwarding, so the list is used for loop detection and
18	/// shortest-path preference.
19	pub hops: OriginList,
20}
21
22impl Broadcast {
23	/// Create a new broadcast with an empty hop chain.
24	pub fn new() -> Self {
25		Self::default()
26	}
27
28	/// Consume this [Broadcast] to create a producer that carries its metadata
29	/// (including the hop chain).
30	pub fn produce(self) -> BroadcastProducer {
31		BroadcastProducer::new(self)
32	}
33}
34
35#[derive(Default, Clone)]
36struct State {
37	// Weak references for deduplication. Doesn't prevent track auto-close.
38	tracks: HashMap<String, TrackWeak>,
39
40	// Dynamic tracks that have been requested.
41	requests: Vec<TrackProducer>,
42
43	// The current number of dynamic producers.
44	// If this is 0, requests must be empty.
45	dynamic: usize,
46
47	// The error that caused the broadcast to be aborted, if any.
48	abort: Option<Error>,
49}
50
51fn modify(state: &kio::Producer<State>) -> Result<kio::Mut<'_, State>, Error> {
52	match state.write() {
53		Ok(state) => Ok(state),
54		Err(r) => Err(r.abort.clone().unwrap_or(Error::Dropped)),
55	}
56}
57
58impl State {
59	/// Insert a track weak handle into the lookup, returning an error on duplicate.
60	fn insert_track(&mut self, weak: TrackWeak) -> Result<(), Error> {
61		let hash_map::Entry::Vacant(entry) = self.tracks.entry(weak.info.name.clone()) else {
62			return Err(Error::Duplicate);
63		};
64		entry.insert(weak);
65		Ok(())
66	}
67}
68
69/// Manages tracks within a broadcast.
70///
71/// Insert tracks statically with [Self::insert_track] / [Self::create_track],
72/// or handle on-demand requests via [Self::dynamic].
73#[derive(Clone)]
74pub struct BroadcastProducer {
75	info: Broadcast,
76	state: kio::Producer<State>,
77}
78
79impl Deref for BroadcastProducer {
80	type Target = Broadcast;
81
82	fn deref(&self) -> &Self::Target {
83		&self.info
84	}
85}
86
87impl BroadcastProducer {
88	/// Create a producer for the given broadcast metadata. Prefer [`Broadcast::produce`].
89	pub fn new(info: Broadcast) -> Self {
90		Self {
91			info,
92			state: Default::default(),
93		}
94	}
95
96	/// Insert a track into the lookup, returning an error on duplicate.
97	///
98	/// Stores a weak handle to the track. The caller (or the owner of the
99	/// track's [`TrackProducer`]) is responsible for keeping the track alive;
100	/// when all producers are dropped, the entry becomes closed and is
101	/// eventually evicted.
102	pub fn insert_track(&mut self, track: TrackConsumer) -> Result<(), Error> {
103		let mut state = modify(&self.state)?;
104		state.insert_track(track.weak())
105	}
106
107	/// Remove a track from the lookup.
108	pub fn remove_track(&mut self, name: &str) -> Result<(), Error> {
109		let mut state = modify(&self.state)?;
110		state.tracks.remove(name).ok_or(Error::NotFound)?;
111		Ok(())
112	}
113
114	/// Produce a new track and insert it into the broadcast.
115	pub fn create_track(&mut self, track: Track) -> Result<TrackProducer, Error> {
116		let track = TrackProducer::new(track);
117		let mut state = modify(&self.state)?;
118		state.insert_track(track.weak())?;
119		drop(state);
120		Ok(track)
121	}
122
123	/// Create a track with a unique name using the given suffix.
124	///
125	/// Generates names like `0{suffix}`, `1{suffix}`, etc. and picks the first
126	/// one not already used in this broadcast.
127	pub fn unique_track(&mut self, suffix: &str) -> Result<TrackProducer, Error> {
128		let name = self.unique_name(suffix);
129		self.create_track(Track { name, priority: 0 })
130	}
131
132	/// Generate a unique track name from a suffix without creating the track.
133	pub fn unique_name(&self, suffix: &str) -> String {
134		let state = self.state.read();
135		let mut name = String::new();
136		for i in 0u32.. {
137			name = format!("{i}{suffix}");
138			if !state.tracks.contains_key(&name) {
139				break;
140			}
141		}
142		name
143	}
144
145	/// Create a dynamic producer that handles on-demand track requests from consumers.
146	pub fn dynamic(&self) -> BroadcastDynamic {
147		BroadcastDynamic::new(self.info.clone(), self.state.clone())
148	}
149
150	/// Create a consumer that can subscribe to tracks in this broadcast.
151	pub fn consume(&self) -> BroadcastConsumer {
152		BroadcastConsumer {
153			info: self.info.clone(),
154			state: self.state.consume(),
155		}
156	}
157
158	/// Abort the broadcast with the given error.
159	///
160	/// Externally-owned tracks are independent and must be aborted separately;
161	/// inserted tracks are referenced via weak handles so that consumers can
162	/// finish reading them. Pending dynamic track requests, however, are owned
163	/// by the broadcast and have no other producer to fulfill them, so they are
164	/// aborted here. The track lookup is also cleared so a stale
165	/// [`BroadcastConsumer`] can't pin it in memory forever.
166	pub fn abort(&mut self, err: Error) -> Result<(), Error> {
167		let mut guard = modify(&self.state)?;
168
169		// Abort any pending dynamic track requests; their producers are owned
170		// by the broadcast and would otherwise leave consumers stuck forever.
171		for mut request in guard.requests.drain(..) {
172			request.abort(err.clone()).ok();
173		}
174
175		guard.tracks.clear();
176		guard.abort = Some(err);
177		guard.close();
178		Ok(())
179	}
180
181	/// Return true if this is the same broadcast instance.
182	pub fn is_clone(&self, other: &Self) -> bool {
183		self.state.same_channel(&other.state)
184	}
185}
186
187#[cfg(test)]
188impl BroadcastProducer {
189	pub fn assert_create_track(&mut self, track: &Track) -> TrackProducer {
190		self.create_track(track.clone()).expect("should not have errored")
191	}
192
193	pub fn assert_insert_track(&mut self, track: &TrackProducer) {
194		self.insert_track(track.consume()).expect("should not have errored")
195	}
196}
197
198impl Drop for BroadcastProducer {
199	fn drop(&mut self) {
200		// The last producer dropping releases the track lookup so a stale
201		// consumer can't pin it (and the track state it weakly references)
202		// forever, the same as an explicit abort.
203		if !self.state.is_last() {
204			return;
205		}
206		if let Ok(mut state) = modify(&self.state) {
207			state.tracks.clear();
208			for mut request in state.requests.drain(..) {
209				request.abort(Error::Cancel).ok();
210			}
211		}
212	}
213}
214
215/// Handles on-demand track creation for a broadcast.
216///
217/// When a consumer requests a track that doesn't exist, a [TrackProducer] is created
218/// and queued for the dynamic producer to fulfill via [Self::requested_track].
219/// Dropped when no longer needed; pending requests are automatically aborted.
220pub struct BroadcastDynamic {
221	info: Broadcast,
222	state: kio::Producer<State>,
223}
224
225impl Clone for BroadcastDynamic {
226	fn clone(&self) -> Self {
227		// Mirror `new`: bump `state.dynamic` so each live handle is counted.
228		// Without this, deriving Clone would let `Drop` decrement past `new`'s
229		// single increment and prematurely flip `dynamic` to zero, causing
230		// future `subscribe_track` calls to return `NotFound`.
231		if let Ok(mut state) = self.state.write() {
232			state.dynamic += 1;
233		}
234
235		Self {
236			info: self.info.clone(),
237			state: self.state.clone(),
238		}
239	}
240}
241
242impl Deref for BroadcastDynamic {
243	type Target = Broadcast;
244
245	fn deref(&self) -> &Self::Target {
246		&self.info
247	}
248}
249
250impl BroadcastDynamic {
251	fn new(info: Broadcast, state: kio::Producer<State>) -> Self {
252		if let Ok(mut state) = state.write() {
253			// If the broadcast is already closed, we can't handle any new requests.
254			state.dynamic += 1;
255		}
256
257		Self { info, state }
258	}
259
260	// A helper to automatically apply Dropped if the state is closed without an error.
261	fn poll<F>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<kio::Mut<'_, State>, Error>>
262	where
263		F: FnMut(&kio::Ref<'_, State>) -> Poll<()>,
264	{
265		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
266			Ok(state) => Ok(state),
267			Err(state) => Err(state.abort.clone().unwrap_or(Error::Dropped)),
268		})
269	}
270
271	/// Poll for the next consumer-requested track, without blocking. The returned producer
272	/// is preconfigured with the requested track's name and priority.
273	pub fn poll_requested_track(&mut self, waiter: &kio::Waiter) -> Poll<Result<TrackProducer, Error>> {
274		let mut state = ready!(self.poll(waiter, |state| {
275			if state.requests.is_empty() {
276				Poll::Pending
277			} else {
278				Poll::Ready(())
279			}
280		}))?;
281		Poll::Ready(Ok(state.requests.pop().expect("predicate guaranteed a request")))
282	}
283
284	/// Block until a consumer requests a track, returning its producer.
285	pub async fn requested_track(&mut self) -> Result<TrackProducer, Error> {
286		kio::wait(|waiter| self.poll_requested_track(waiter)).await
287	}
288
289	/// Create a consumer that can subscribe to tracks in this broadcast.
290	pub fn consume(&self) -> BroadcastConsumer {
291		BroadcastConsumer {
292			info: self.info.clone(),
293			state: self.state.consume(),
294		}
295	}
296
297	/// Block until the broadcast is closed or aborted, returning the cause.
298	pub async fn closed(&self) -> Error {
299		self.state.closed().await;
300		self.state.read().abort.clone().unwrap_or(Error::Dropped)
301	}
302
303	/// Abort the broadcast with the given error.
304	///
305	/// Externally-owned tracks are independent and must be aborted separately;
306	/// inserted tracks are referenced via weak handles. Pending dynamic track
307	/// requests are owned by the broadcast and aborted here so consumers don't
308	/// stay stuck waiting on producers nobody will fulfill. The track lookup is
309	/// also cleared so a stale [`BroadcastConsumer`] can't pin it in memory forever.
310	pub fn abort(&mut self, err: Error) -> Result<(), Error> {
311		let mut guard = modify(&self.state)?;
312
313		// Abort any pending dynamic track requests; their producers are owned
314		// by the broadcast and would otherwise leave consumers stuck forever.
315		for mut request in guard.requests.drain(..) {
316			request.abort(err.clone()).ok();
317		}
318
319		guard.tracks.clear();
320		guard.abort = Some(err);
321		guard.close();
322		Ok(())
323	}
324
325	/// Return true if this is the same broadcast instance.
326	pub fn is_clone(&self, other: &Self) -> bool {
327		self.state.same_channel(&other.state)
328	}
329}
330
331impl Drop for BroadcastDynamic {
332	fn drop(&mut self) {
333		// Release the track lookup if we're the last producer overall, matching
334		// BroadcastProducer::drop and an explicit abort.
335		let last = self.state.is_last();
336		if let Ok(mut state) = self.state.write() {
337			if last {
338				state.tracks.clear();
339			}
340
341			// We do a saturating sub so Producer::dynamic() can avoid returning an error.
342			state.dynamic = state.dynamic.saturating_sub(1);
343			if state.dynamic != 0 {
344				return;
345			}
346
347			// Abort all pending requests since there's no dynamic producer to handle them.
348			for mut request in state.requests.drain(..) {
349				request.abort(Error::Cancel).ok();
350			}
351		}
352	}
353}
354
355#[cfg(test)]
356use futures::FutureExt;
357
358#[cfg(test)]
359impl BroadcastDynamic {
360	pub fn assert_request(&mut self) -> TrackProducer {
361		self.requested_track()
362			.now_or_never()
363			.expect("should not have blocked")
364			.expect("should not have errored")
365	}
366
367	pub fn assert_no_request(&mut self) {
368		assert!(self.requested_track().now_or_never().is_none(), "should have blocked");
369	}
370}
371
372/// Subscribe to arbitrary broadcast/tracks.
373#[derive(Clone)]
374pub struct BroadcastConsumer {
375	info: Broadcast,
376	state: kio::Consumer<State>,
377}
378
379impl Deref for BroadcastConsumer {
380	type Target = Broadcast;
381
382	fn deref(&self) -> &Self::Target {
383		&self.info
384	}
385}
386
387impl BroadcastConsumer {
388	/// Subscribe to a track on this broadcast.
389	///
390	/// Reuses an existing producer if one is already publishing the track; otherwise
391	/// queues a new dynamic request that the broadcast's producer will service via
392	/// [`BroadcastDynamic::requested_track`]. Returns [`Error::NotFound`] if the
393	/// broadcast has no dynamic producer to handle requests.
394	pub fn subscribe_track(&self, track: &Track) -> Result<TrackConsumer, Error> {
395		// Upgrade to a temporary producer so we can modify the state.
396		let producer = self
397			.state
398			.produce()
399			.ok_or_else(|| self.state.read().abort.clone().unwrap_or(Error::Dropped))?;
400		let mut state = modify(&producer)?;
401
402		if let Some(weak) = state.tracks.get(&track.name) {
403			if !weak.is_closed() {
404				return Ok(weak.consume());
405			}
406			// Remove the stale entry
407			state.tracks.remove(&track.name);
408		}
409
410		// Otherwise we have never seen this track before and need to create a new producer.
411		let producer = track.clone().produce();
412		let consumer = producer.consume();
413
414		if state.dynamic == 0 {
415			return Err(Error::NotFound);
416		}
417
418		// Insert a weak reference for deduplication.
419		let weak = producer.weak();
420		state.tracks.insert(producer.name.clone(), weak.clone());
421		state.requests.push(producer);
422
423		// Remove the track from the lookup when it's unused.
424		let consumer_state = self.state.clone();
425		web_async::spawn(async move {
426			let _ = weak.unused().await;
427
428			let Some(producer) = consumer_state.produce() else {
429				return;
430			};
431			let Ok(mut state) = producer.write() else {
432				return;
433			};
434
435			// Remove the entry, but reinsert if it was replaced by a different reference.
436			if let Some(current) = state.tracks.remove(&weak.info.name)
437				&& !current.is_clone(&weak)
438			{
439				state.tracks.insert(current.info.name.clone(), current);
440			}
441		});
442
443		Ok(consumer)
444	}
445
446	/// Block until the broadcast is closed and return the cause.
447	///
448	/// Returns [`Error::Dropped`] if every producer was dropped without an
449	/// explicit abort, or the abort error supplied by [`BroadcastProducer::abort`].
450	pub async fn closed(&self) -> Error {
451		self.state.closed().await;
452		self.state.read().abort.clone().unwrap_or(Error::Dropped)
453	}
454
455	/// Returns true if every [`BroadcastProducer`] has been dropped.
456	pub fn is_closed(&self) -> bool {
457		self.state.read().is_closed()
458	}
459
460	/// Register a [`kio::Waiter`] that fires when the broadcast closes.
461	///
462	/// Returns [`Poll::Ready`] if already closed, otherwise [`Poll::Pending`] after
463	/// arming the waiter. Useful for composing close-detection into a larger poll
464	/// without spawning a task per broadcast.
465	pub fn poll_closed(&self, waiter: &kio::Waiter) -> Poll<()> {
466		self.state.poll_closed(waiter)
467	}
468
469	/// Check if this is the exact same instance of a broadcast.
470	pub fn is_clone(&self, other: &Self) -> bool {
471		self.state.same_channel(&other.state)
472	}
473}
474
475#[cfg(test)]
476impl BroadcastConsumer {
477	pub fn assert_subscribe_track(&self, track: &Track) -> TrackConsumer {
478		self.subscribe_track(track).expect("should not have errored")
479	}
480
481	pub fn assert_not_closed(&self) {
482		assert!(self.closed().now_or_never().is_none(), "should not be closed");
483	}
484
485	pub fn assert_closed(&self) {
486		assert!(self.closed().now_or_never().is_some(), "should be closed");
487	}
488}
489
490#[cfg(test)]
491mod test {
492	use super::*;
493
494	#[tokio::test]
495	async fn insert() {
496		let mut producer = Broadcast::new().produce();
497		let mut track1 = Track::new("track1").produce();
498
499		// Make sure we can insert before a consumer is created.
500		producer.assert_insert_track(&track1);
501		track1.append_group().unwrap();
502
503		let consumer = producer.consume();
504
505		let mut track1_sub = consumer.assert_subscribe_track(&Track::new("track1"));
506		track1_sub.assert_group();
507
508		let mut track2 = Track::new("track2").produce();
509		producer.assert_insert_track(&track2);
510
511		let consumer2 = producer.consume();
512		let mut track2_consumer = consumer2.assert_subscribe_track(&Track::new("track2"));
513		track2_consumer.assert_no_group();
514
515		track2.append_group().unwrap();
516
517		track2_consumer.assert_group();
518	}
519
520	#[tokio::test]
521	async fn closed() {
522		let mut producer = Broadcast::new().produce();
523		let _dynamic = producer.dynamic();
524
525		let consumer = producer.consume();
526		consumer.assert_not_closed();
527
528		// Create a new track and insert it into the broadcast.
529		let track1 = producer.assert_create_track(&Track::new("track1"));
530		let track1c = consumer.assert_subscribe_track(&track1);
531		let track2 = consumer.assert_subscribe_track(&Track::new("track2"));
532
533		// Aborting the broadcast must NOT cascade to externally-owned tracks.
534		producer.abort(Error::Cancel).unwrap();
535
536		// track2's producer was owned by the broadcast (a pending dynamic
537		// request), so the consumer surfaces the abort.
538		track2.assert_error();
539
540		// track1's producer is held outside the broadcast, so it survives.
541		assert!(!track1.is_closed());
542		track1c.assert_not_closed();
543	}
544
545	#[tokio::test]
546	async fn abort_clears_track_lookup() {
547		let mut producer = Broadcast::new().produce();
548		let track = producer.assert_create_track(&Track::new("track1"));
549
550		// A stale consumer that never drops must not pin the lookup entries.
551		let _consumer = producer.consume();
552		assert_eq!(producer.state.read().tracks.len(), 1);
553
554		producer.abort(Error::Cancel).unwrap();
555		assert!(
556			producer.state.read().tracks.is_empty(),
557			"track lookup should be cleared on abort"
558		);
559
560		drop(track);
561	}
562
563	#[tokio::test]
564	async fn drop_clears_track_lookup() {
565		let mut producer = Broadcast::new().produce();
566		let _track = producer.assert_create_track(&Track::new("track1"));
567
568		// A stale consumer keeps the channel (and thus the lookup) alive.
569		let consumer = producer.consume();
570		assert_eq!(consumer.state.read().tracks.len(), 1);
571
572		// Dropping the last producer releases the lookup, same as an abort.
573		drop(producer);
574		assert!(
575			consumer.state.read().tracks.is_empty(),
576			"track lookup should be cleared when the last producer drops"
577		);
578	}
579
580	#[tokio::test]
581	async fn requests() {
582		let mut producer = Broadcast::new().produce().dynamic();
583
584		let consumer = producer.consume();
585		let consumer2 = consumer.clone();
586
587		let mut track1 = consumer.assert_subscribe_track(&Track::new("track1"));
588		track1.assert_not_closed();
589		track1.assert_no_group();
590
591		// Make sure we deduplicate requests while track1 is still active.
592		let mut track2 = consumer2.assert_subscribe_track(&Track::new("track1"));
593		track2.assert_is_clone(&track1);
594
595		// Get the requested track, and there should only be one.
596		let mut track3 = producer.assert_request();
597		producer.assert_no_request();
598
599		// Make sure the consumer is the same.
600		track3.consume().assert_is_clone(&track1);
601
602		// Append a group and make sure they all get it.
603		track3.append_group().unwrap();
604		track1.assert_group();
605		track2.assert_group();
606
607		// Make sure that tracks are cancelled when the producer is dropped.
608		let track4 = consumer.assert_subscribe_track(&Track::new("track2"));
609		drop(producer);
610
611		// Make sure the track is errored, not closed.
612		track4.assert_error();
613
614		let track5 = consumer2.subscribe_track(&Track::new("track3"));
615		assert!(track5.is_err(), "should have errored");
616	}
617
618	#[tokio::test]
619	async fn stale_producer() {
620		let mut broadcast = Broadcast::new().produce().dynamic();
621		let consumer = broadcast.consume();
622
623		// Subscribe to a track, creating a request
624		let track1 = consumer.assert_subscribe_track(&Track::new("track1"));
625
626		// Get the requested producer and close it (simulating publisher disconnect)
627		let mut producer1 = broadcast.assert_request();
628		producer1.append_group().unwrap();
629		producer1.finish().unwrap();
630		drop(producer1);
631
632		// The consumer should see the track as closed
633		track1.assert_closed();
634
635		// Subscribe again to the same track - should get a NEW producer, not the stale one
636		let mut track2 = consumer.assert_subscribe_track(&Track::new("track1"));
637		track2.assert_not_closed();
638		track2.assert_not_clone(&track1);
639
640		// There should be a new request for the track
641		let mut producer2 = broadcast.assert_request();
642		producer2.append_group().unwrap();
643
644		// The new consumer should receive the new group
645		track2.assert_group();
646	}
647
648	#[tokio::test]
649	async fn requested_unused() {
650		let mut broadcast = Broadcast::new().produce().dynamic();
651
652		// Subscribe to a track that doesn't exist - this creates a request
653		let consumer1 = broadcast.consume().assert_subscribe_track(&Track::new("unknown_track"));
654
655		// Get the requested track producer
656		let producer1 = broadcast.assert_request();
657
658		// The track producer should NOT be unused yet because there's a consumer
659		assert!(
660			producer1.unused().now_or_never().is_none(),
661			"track producer should be used"
662		);
663
664		// Making a new consumer will keep the producer alive
665		let consumer2 = broadcast.consume().assert_subscribe_track(&Track::new("unknown_track"));
666		consumer2.assert_is_clone(&consumer1);
667
668		// Drop the consumer subscription
669		drop(consumer1);
670
671		// The track producer should NOT be unused yet because there's a consumer
672		assert!(
673			producer1.unused().now_or_never().is_none(),
674			"track producer should be used"
675		);
676
677		// Drop the second consumer, now the producer should be unused
678		drop(consumer2);
679
680		// BUG: The track producer should become unused after dropping the consumer,
681		// but it won't because the broadcast keeps a reference in the lookup HashMap
682		// This assertion will fail, demonstrating the bug
683		assert!(
684			producer1.unused().now_or_never().is_some(),
685			"track producer should be unused after consumer is dropped"
686		);
687
688		// TODO Unfortunately, we need to sleep for a little bit to detect when unused.
689		tokio::time::sleep(std::time::Duration::from_millis(1)).await;
690
691		// Now the cleanup task should have run and we can subscribe again to the unknown track.
692		let consumer3 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
693		let producer2 = broadcast.assert_request();
694
695		// Drop the consumer, now the producer should be unused
696		drop(consumer3);
697		assert!(
698			producer2.unused().now_or_never().is_some(),
699			"track producer should be unused after consumer is dropped"
700		);
701	}
702
703	// Cloning a `BroadcastDynamic` and dropping the clone must not flip
704	// `state.dynamic` to zero. The relay's lite subscriber clones the
705	// dynamic per spawned subscribe; if Clone skipped the increment, the
706	// first finished subscribe would tear down the broadcast and any
707	// follow-up `subscribe_track` would return `NotFound`.
708	#[tokio::test]
709	async fn dynamic_clone_keeps_alive() {
710		let broadcast = Broadcast::new().produce().dynamic();
711		let consumer = broadcast.consume();
712
713		let clone = broadcast.clone();
714		drop(clone);
715
716		// Original handle is still live, so requests must still be accepted.
717		consumer.assert_subscribe_track(&Track::new("track1"));
718	}
719}