moq_lite/model/
broadcast.rs

1use std::{
2	collections::HashMap,
3	sync::{Arc, Mutex},
4};
5
6use crate::{TrackConsumer, TrackProducer};
7use tokio::sync::watch;
8
9use super::Track;
10
11struct State {
12	published: HashMap<String, TrackConsumer>,
13	requested: HashMap<String, TrackProducer>,
14	queue: async_channel::Sender<TrackProducer>,
15}
16
17impl State {
18	pub fn new(queue: async_channel::Sender<TrackProducer>) -> Self {
19		Self {
20			published: HashMap::new(),
21			requested: HashMap::new(),
22			queue,
23		}
24	}
25}
26
27/// Receive broadcast/track requests and return if we can fulfill them.
28///
29/// This is a pull-based producer.
30/// If you want an easier push-based producer, use [BroadcastProducer::map].
31#[derive(Clone)]
32pub struct BroadcastProducer {
33	state: Arc<Mutex<State>>,
34	queue: async_channel::Receiver<TrackProducer>,
35
36	// Dropped when all senders or all receivers are dropped.
37	// TODO Make a better way of doing this.
38	closed: watch::Sender<()>,
39}
40
41impl Default for BroadcastProducer {
42	fn default() -> Self {
43		Self::new()
44	}
45}
46
47impl BroadcastProducer {
48	pub fn new() -> Self {
49		let (send, recv) = async_channel::bounded(32);
50
51		Self {
52			state: Arc::new(Mutex::new(State::new(send))),
53			queue: recv,
54			closed: watch::Sender::default(),
55		}
56	}
57
58	pub async fn requested(&self) -> TrackProducer {
59		self.queue.recv().await.unwrap()
60	}
61
62	pub fn create(&self, track: Track) -> TrackProducer {
63		let producer = track.produce();
64		self.insert(producer.consume());
65		producer
66	}
67
68	/// Insert a new track into the lookup, returning the old track if it already exists.
69	pub fn insert(&self, track: TrackConsumer) -> Option<TrackConsumer> {
70		let mut state = self.state.lock().unwrap();
71		state.published.insert(track.info.name.clone(), track)
72	}
73
74	/// Remove a track from the lookup.
75	pub fn remove(&self, name: &str) -> Option<TrackConsumer> {
76		let mut state = self.state.lock().unwrap();
77		state.published.remove(name)
78	}
79
80	// Try to create a new consumer.
81	pub fn consume(&self) -> BroadcastConsumer {
82		BroadcastConsumer {
83			state: self.state.clone(),
84			closed: self.closed.subscribe(),
85		}
86	}
87
88	/// Block until there are no more consumers.
89	///
90	/// A new consumer can be created by calling [Self::consume] and this will block again.
91	pub async fn unused(&self) {
92		self.closed.closed().await;
93	}
94}
95
96/// Subscribe to abitrary broadcast/tracks.
97#[derive(Clone)]
98pub struct BroadcastConsumer {
99	state: Arc<Mutex<State>>,
100
101	// Annoying, but we need to know when the above channel is closed without sending.
102	closed: watch::Receiver<()>,
103}
104
105impl BroadcastConsumer {
106	pub fn subscribe(&self, track: &Track) -> TrackConsumer {
107		let mut state = self.state.lock().unwrap();
108
109		// Return any explictly published track.
110		if let Some(consumer) = state.published.get(&track.name).cloned() {
111			return consumer;
112		}
113
114		// Return any requested track, deduplicating it.
115		if let Some(requested) = state.requested.get(&track.name) {
116			return requested.consume();
117		}
118
119		// Otherwise we have never seen this track before and need to create a new producer.
120		let producer = track.clone().produce();
121		let consumer = producer.consume();
122
123		// Insert the producer into the lookup so we will deduplicate requests.
124		// This is not a subscriber so it doesn't count towards "used" subscribers.
125		state.requested.insert(track.name.clone(), producer.clone());
126
127		let queue = state.queue.clone();
128		let state = self.state.clone();
129		let track = track.clone();
130
131		web_async::spawn(async move {
132			// Send the request to the producer.
133			let _ = queue.send(producer.clone()).await;
134
135			// Wait until we no longer want this track.
136			producer.unused().await;
137
138			// Remove the track from the lookup.
139			state.lock().unwrap().requested.remove(&track.name);
140		});
141
142		consumer
143	}
144
145	pub async fn closed(&self) {
146		self.closed.clone().changed().await.ok();
147	}
148
149	/// Check if this is the exact same instance of a broadcast.
150	///
151	/// Duplicate names are allowed in the case of resumption.
152	pub fn ptr_eq(&self, other: &Self) -> bool {
153		Arc::ptr_eq(&self.state, &other.state)
154	}
155}