1use std::collections::{HashMap, VecDeque};
2use tokio::sync::mpsc;
3use web_async::{Lock, LockWeak};
4
5use super::BroadcastConsumer;
6
7#[derive(Default)]
8struct ProducerState {
9 active: HashMap<String, BroadcastConsumer>,
10 consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
11}
12
13impl ProducerState {
14 fn publish(&mut self, path: String, broadcast: BroadcastConsumer) -> Option<BroadcastConsumer> {
15 let mut i = 0;
16
17 while let Some((consumer, notify)) = self.consumers.get(i) {
18 if !notify.is_closed() {
19 if consumer.lock().insert(&path, &broadcast) {
20 notify.try_send(()).ok();
21 }
22 i += 1;
23 } else {
24 self.consumers.swap_remove(i);
25 }
26 }
27
28 self.active.insert(path, broadcast)
29 }
30
31 fn consume<T: ToString>(&mut self, prefix: T) -> ConsumerState {
32 let prefix = prefix.to_string();
33 let mut updates = VecDeque::new();
34
35 for (path, broadcast) in self.active.iter() {
36 if let Some(suffix) = path.strip_prefix(&prefix) {
37 updates.push_back((suffix.to_string(), broadcast.clone()));
38 }
39 }
40
41 ConsumerState { prefix, updates }
42 }
43
44 fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
45 let (tx, rx) = mpsc::channel(1);
46 self.consumers.push((consumer.clone(), tx));
47 rx
48 }
49}
50
51#[derive(Clone)]
52struct ConsumerState {
53 prefix: String,
54 updates: VecDeque<(String, BroadcastConsumer)>,
55}
56
57impl ConsumerState {
58 pub fn insert(&mut self, path: &str, consumer: &BroadcastConsumer) -> bool {
59 if let Some(suffix) = path.strip_prefix(&self.prefix) {
60 self.updates.push_back((suffix.to_string(), consumer.clone()));
61 true
62 } else {
63 false
64 }
65 }
66}
67
68#[derive(Default, Clone)]
70pub struct OriginProducer {
71 state: Lock<ProducerState>,
72}
73
74impl OriginProducer {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn publish<S: ToString>(&mut self, path: S, broadcast: BroadcastConsumer) -> bool {
81 let path = path.to_string();
82 let unique = self.state.lock().publish(path.clone(), broadcast.clone()).is_none();
83
84 let state = self.state.clone();
85 web_async::spawn(async move {
86 broadcast.closed().await;
87 state.lock().active.remove(&path);
88 });
89
90 unique
91 }
92
93 pub fn publish_all(&mut self, broadcasts: OriginConsumer) {
95 self.publish_prefix("", broadcasts);
96 }
97
98 pub fn publish_prefix(&mut self, prefix: &str, mut broadcasts: OriginConsumer) {
100 let mut this = self.clone();
102
103 let prefix = match prefix {
105 "" => None,
106 prefix => Some(prefix.to_string()),
107 };
108
109 web_async::spawn(async move {
110 while let Some((suffix, broadcast)) = broadcasts.next().await {
111 let path = match &prefix {
112 Some(prefix) => format!("{}{}", prefix, suffix),
113 None => suffix,
114 };
115
116 this.publish(path, broadcast);
117 }
118 });
119 }
120
121 pub fn consume(&self, path: &str) -> Option<BroadcastConsumer> {
123 self.state.lock().active.get(path).cloned()
124 }
125
126 pub fn consume_all(&self) -> OriginConsumer {
128 self.consume_prefix("")
129 }
130
131 pub fn consume_prefix<S: ToString>(&self, prefix: S) -> OriginConsumer {
133 let mut state = self.state.lock();
134 let consumer = Lock::new(state.consume(prefix));
135 let notify = state.subscribe(consumer.clone());
136 OriginConsumer::new(self.state.downgrade(), consumer, notify)
137 }
138
139 pub async fn unused(&self) {
143 while let Some(notify) = self.unused_inner() {
145 notify.closed().await;
146 }
147 }
148
149 fn unused_inner(&self) -> Option<mpsc::Sender<()>> {
151 let mut state = self.state.lock();
152
153 while let Some((_, notify)) = state.consumers.last() {
154 if !notify.is_closed() {
155 return Some(notify.clone());
156 }
157
158 state.consumers.pop();
159 }
160
161 None
162 }
163}
164
165pub struct OriginConsumer {
167 producer: LockWeak<ProducerState>,
168 state: Lock<ConsumerState>,
169 notify: mpsc::Receiver<()>,
170}
171
172impl OriginConsumer {
173 fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
174 Self {
175 producer,
176 state,
177 notify,
178 }
179 }
180
181 pub async fn next(&mut self) -> Option<(String, BroadcastConsumer)> {
183 loop {
184 {
185 let mut state = self.state.lock();
186
187 if let Some(update) = state.updates.pop_front() {
188 return Some(update);
189 }
190 }
191
192 self.notify.recv().await?;
193 }
194 }
195}
196
197impl Clone for OriginConsumer {
202 fn clone(&self) -> Self {
203 let consumer = Lock::new(self.state.lock().clone());
204
205 match self.producer.upgrade() {
206 Some(producer) => {
207 let mut producer = producer.lock();
208 let notify = producer.subscribe(consumer.clone());
209 OriginConsumer::new(self.producer.clone(), consumer, notify)
210 }
211 None => {
212 let (_, notify) = mpsc::channel(1);
213 OriginConsumer::new(self.producer.clone(), consumer, notify)
214 }
215 }
216 }
217}