moq_lite/model/
broadcast.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5
6use crate::{TrackConsumer, TrackProducer};
7use tokio::sync::watch;
8
9use super::Track;
10
11struct State {
12 published: HashMap<String, TrackConsumer>,
13 requested: HashMap<String, TrackProducer>,
14 queue: async_channel::Sender<TrackProducer>,
15}
16
17impl State {
18 pub fn new(queue: async_channel::Sender<TrackProducer>) -> Self {
19 Self {
20 published: HashMap::new(),
21 requested: HashMap::new(),
22 queue,
23 }
24 }
25}
26
27#[derive(Clone)]
32pub struct BroadcastProducer {
33 state: Arc<Mutex<State>>,
34 queue: async_channel::Receiver<TrackProducer>,
35
36 closed: watch::Sender<()>,
39}
40
41impl Default for BroadcastProducer {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl BroadcastProducer {
48 pub fn new() -> Self {
49 let (send, recv) = async_channel::bounded(32);
50
51 Self {
52 state: Arc::new(Mutex::new(State::new(send))),
53 queue: recv,
54 closed: watch::Sender::default(),
55 }
56 }
57
58 pub async fn requested(&self) -> TrackProducer {
59 self.queue.recv().await.unwrap()
60 }
61
62 pub fn create(&self, track: Track) -> TrackProducer {
63 let producer = track.produce();
64 self.insert(producer.consume());
65 producer
66 }
67
68 pub fn insert(&self, track: TrackConsumer) -> Option<TrackConsumer> {
70 let mut state = self.state.lock().unwrap();
71 state.published.insert(track.info.name.clone(), track)
72 }
73
74 pub fn remove(&self, name: &str) -> Option<TrackConsumer> {
76 let mut state = self.state.lock().unwrap();
77 state.published.remove(name)
78 }
79
80 pub fn consume(&self) -> BroadcastConsumer {
82 BroadcastConsumer {
83 state: self.state.clone(),
84 closed: self.closed.subscribe(),
85 }
86 }
87
88 pub async fn unused(&self) {
92 self.closed.closed().await;
93 }
94}
95
96#[derive(Clone)]
98pub struct BroadcastConsumer {
99 state: Arc<Mutex<State>>,
100
101 closed: watch::Receiver<()>,
103}
104
105impl BroadcastConsumer {
106 pub fn subscribe(&self, track: &Track) -> TrackConsumer {
107 let mut state = self.state.lock().unwrap();
108
109 if let Some(consumer) = state.published.get(&track.name).cloned() {
111 return consumer;
112 }
113
114 if let Some(requested) = state.requested.get(&track.name) {
116 return requested.consume();
117 }
118
119 let producer = track.clone().produce();
121 let consumer = producer.consume();
122
123 state.requested.insert(track.name.clone(), producer.clone());
126
127 let queue = state.queue.clone();
128 let state = self.state.clone();
129 let track = track.clone();
130
131 web_async::spawn(async move {
132 let _ = queue.send(producer.clone()).await;
134
135 producer.unused().await;
137
138 state.lock().unwrap().requested.remove(&track.name);
140 });
141
142 consumer
143 }
144
145 pub async fn closed(&self) {
146 self.closed.clone().changed().await.ok();
147 }
148
149 pub fn ptr_eq(&self, other: &Self) -> bool {
153 Arc::ptr_eq(&self.state, &other.state)
154 }
155}