moq_lite/model/
announce.rs1use std::collections::{BTreeSet, VecDeque};
2use tokio::sync::mpsc;
3use web_async::{Lock, LockWeak};
4
5pub use crate::message::Announce;
7
8#[derive(Default)]
9struct ProducerState {
10 active: BTreeSet<String>,
12 consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
13}
14
15impl ProducerState {
16 fn insert(&mut self, path: String) -> bool {
17 if self.active.contains(&path) {
18 self.update(Announce::Ended { suffix: path.clone() });
19 self.update(Announce::Active { suffix: path });
20 return false;
21 }
22
23 self.active.insert(path.clone());
24 self.update(Announce::Active { suffix: path });
25 true
26 }
27
28 fn remove(&mut self, path: &str) -> bool {
29 let existing = self.active.remove(path);
30 if existing {
31 self.update(Announce::Ended {
32 suffix: path.to_string(),
33 });
34 }
35
36 existing
37 }
38
39 fn update(&mut self, update: Announce) {
40 let mut i = 0;
41
42 while let Some((consumer, notify)) = self.consumers.get(i) {
43 if !notify.is_closed() {
44 consumer.lock().push(update.clone());
45 notify.try_send(()).ok();
46 i += 1;
47 } else {
48 self.consumers.swap_remove(i);
49 }
50 }
51 }
52
53 fn consume<T: ToString>(&mut self, prefix: T) -> ConsumerState {
54 let prefix = prefix.to_string();
55 let mut init = VecDeque::new();
56
57 for active in self.active.iter() {
58 if let Some(suffix) = active.strip_prefix(&prefix) {
59 init.push_back(Announce::Active {
60 suffix: suffix.to_string(),
61 });
62 }
63 }
64
65 ConsumerState { prefix, updates: init }
66 }
67
68 fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
69 let (tx, rx) = mpsc::channel(1);
70 self.consumers.push((consumer.clone(), tx));
71 rx
72 }
73}
74
75impl Drop for ProducerState {
76 fn drop(&mut self) {
77 while let Some(broadcast) = self.active.pop_first() {
79 self.update(Announce::Ended {
80 suffix: broadcast.clone(),
81 });
82 }
83 }
84}
85
86#[derive(Clone)]
87struct ConsumerState {
88 prefix: String,
89 updates: VecDeque<Announce>,
90}
91
92impl ConsumerState {
93 pub fn push(&mut self, update: Announce) {
94 match update {
95 Announce::Active { suffix } => {
96 if let Some(suffix) = suffix.strip_prefix(&self.prefix) {
97 self.updates.push_back(Announce::Active {
98 suffix: suffix.to_string(),
99 });
100 }
101 }
102 Announce::Ended { suffix } => {
103 if let Some(suffix) = suffix.strip_prefix(&self.prefix) {
104 self.updates.push_back(Announce::Ended {
105 suffix: suffix.to_string(),
106 });
107 }
108 }
109 }
110 }
111}
112
113#[derive(Default, Clone)]
115pub struct AnnounceProducer {
116 state: Lock<ProducerState>,
117}
118
119impl AnnounceProducer {
120 pub fn new() -> Self {
121 Self::default()
122 }
123
124 pub fn insert<T: ToString>(&mut self, path: T) -> bool {
126 self.state.lock().insert(path.to_string())
127 }
128
129 pub fn remove(&mut self, path: &str) -> bool {
130 self.state.lock().remove(path)
131 }
132
133 pub fn contains(&self, path: &str) -> bool {
135 self.state.lock().active.contains(path)
136 }
137
138 pub fn is_empty(&self) -> bool {
140 self.state.lock().active.is_empty()
141 }
142
143 pub fn consume<S: ToString>(&self, prefix: S) -> AnnounceConsumer {
147 let mut state = self.state.lock();
148 let consumer = Lock::new(state.consume(prefix));
149 let notify = state.subscribe(consumer.clone());
150 AnnounceConsumer::new(self.state.downgrade(), consumer, notify)
151 }
152
153 pub async fn unused(&self) {
157 while let Some(notify) = self.unused_inner() {
159 notify.closed().await;
160 }
161 }
162
163 fn unused_inner(&self) -> Option<mpsc::Sender<()>> {
165 let mut state = self.state.lock();
166
167 while let Some((_, notify)) = state.consumers.last() {
168 if !notify.is_closed() {
169 return Some(notify.clone());
170 }
171
172 state.consumers.pop();
173 }
174
175 None
176 }
177}
178
179pub struct AnnounceConsumer {
181 producer: LockWeak<ProducerState>,
182 state: Lock<ConsumerState>,
183 notify: mpsc::Receiver<()>,
184}
185
186impl AnnounceConsumer {
187 fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
188 Self {
189 producer,
190 state,
191 notify,
192 }
193 }
194
195 pub async fn next(&mut self) -> Option<Announce> {
197 loop {
198 {
199 let mut state = self.state.lock();
200
201 if let Some(update) = state.updates.pop_front() {
202 return Some(update);
203 }
204 }
205
206 self.notify.recv().await?;
207 }
208 }
209
210 pub async fn active(&mut self) -> Option<String> {
214 loop {
215 if let Some(Announce::Active { suffix }) = self.next().await {
216 return Some(suffix);
217 }
218 }
219 }
220}
221
222impl Clone for AnnounceConsumer {
227 fn clone(&self) -> Self {
228 let consumer = Lock::new(self.state.lock().clone());
229
230 match self.producer.upgrade() {
231 Some(producer) => {
232 let mut producer = producer.lock();
233 let notify = producer.subscribe(consumer.clone());
234 AnnounceConsumer::new(self.producer.clone(), consumer, notify)
235 }
236 None => {
237 let (_, notify) = mpsc::channel(1);
238 AnnounceConsumer::new(self.producer.clone(), consumer, notify)
239 }
240 }
241 }
242}
243
244#[cfg(test)]
245use futures::FutureExt;
246
247#[cfg(test)]
248impl AnnounceConsumer {
249 fn assert_active(&mut self, suffix: &str) {
250 self.next()
251 .now_or_never()
252 .expect("would have blocked")
253 .expect("no next announcement")
254 .assert_active(suffix);
255 }
256
257 fn assert_ended(&mut self, suffix: &str) {
258 self.next()
259 .now_or_never()
260 .expect("would have blocked")
261 .expect("no next announcement")
262 .assert_ended(suffix);
263 }
264
265 fn assert_wait(&mut self) {
266 assert_eq!(self.next().now_or_never(), None);
267 }
268
269 fn assert_done(&mut self) {
270 assert_eq!(self.next().now_or_never(), Some(None));
271 }
272}
273
274#[cfg(test)]
275mod test {
276 use super::*;
277
278 #[test]
279 fn simple() {
280 let mut producer = AnnounceProducer::new();
281 let mut consumer = producer.consume("");
282 let ab = "a/b";
283
284 assert!(!producer.contains(ab));
285 assert!(producer.insert(ab));
286 assert!(producer.contains(ab));
287
288 consumer.assert_active(ab);
289
290 assert!(producer.remove(ab));
291 assert!(!producer.contains(ab));
292
293 consumer.assert_ended(ab);
294 consumer.assert_wait();
295 }
296
297 #[test]
298 fn duplicate() {
299 let mut producer = AnnounceProducer::new();
300 let mut consumer = producer.consume("");
301
302 let ab = "a/b";
303 let ab2 = "a/b";
304
305 assert!(producer.insert(ab));
306 assert!(producer.contains(ab));
307
308 assert!(producer.contains(ab2));
310 consumer.assert_active(ab2);
311
312 assert!(!producer.insert(ab2));
314
315 consumer.assert_ended(ab);
317 consumer.assert_active(ab2);
318
319 drop(producer);
320
321 consumer.assert_ended(ab2);
322 consumer.assert_done();
323 }
324
325 #[test]
326 fn multi() {
327 let mut producer = AnnounceProducer::new();
328 let mut consumer = producer.consume("");
329
330 let ab = "a/b";
331 let ac = "a/c";
332 let de = "d/e";
333
334 assert!(producer.insert(ab));
335 assert!(producer.insert(ac));
336 assert!(producer.insert(de));
337
338 consumer.assert_active(ab);
340 consumer.assert_active(ac);
341 consumer.assert_active(de);
342 consumer.assert_wait();
343 }
344
345 #[test]
346 fn late() {
347 let mut producer = AnnounceProducer::new();
348 let ab = "a/b";
349 let ac = "a/c";
350 let de = "d/e";
351 let dd = "d/d";
352
353 assert!(producer.insert(ab));
354 assert!(producer.insert(ac));
355
356 let mut consumer = producer.consume("");
358
359 assert!(producer.insert(de));
360 assert!(producer.insert(dd));
361
362 consumer.assert_active(ab);
364 consumer.assert_active(ac);
365 consumer.assert_active(de);
366 consumer.assert_active(dd);
367 consumer.assert_wait();
368 }
369
370 #[test]
371 fn prefix() {
372 let mut producer = AnnounceProducer::new();
373 let mut consumer = producer.consume("a/");
374
375 let ab = "a/b";
376 let ac = "a/c";
377 let de = "d/e";
378
379 assert!(producer.insert(ab));
380 assert!(producer.insert(ac));
381 assert!(producer.insert(de));
382
383 consumer.assert_active("b");
384 consumer.assert_active("c");
385 consumer.assert_wait();
386 }
387
388 #[test]
389 fn prefix_unannounce() {
390 let mut producer = AnnounceProducer::new();
391 let mut consumer = producer.consume("a/");
392
393 let ab = "a/b";
394 let ac = "a/c";
395 let de = "d/e";
396
397 assert!(producer.insert(ab));
398 assert!(producer.insert(ac));
399 assert!(producer.insert(de));
400
401 consumer.assert_active("b");
402 consumer.assert_active("c");
403 consumer.assert_wait();
404
405 assert!(producer.remove(de));
406 assert!(producer.remove(ac));
407 assert!(producer.remove(ab));
408
409 consumer.assert_ended("c");
410 consumer.assert_ended("b");
411 consumer.assert_wait();
412 }
413
414 #[test]
415 fn flicker() {
416 let mut producer = AnnounceProducer::new();
417 let mut consumer = producer.consume("");
418 let ab = "a/b";
419
420 assert!(!producer.contains(ab));
421 assert!(producer.insert(ab));
422 assert!(producer.contains(ab));
423 assert!(producer.remove(ab));
424 assert!(!producer.contains(ab));
425
426 consumer.assert_active(ab);
428 consumer.assert_ended(ab);
429 consumer.assert_wait();
430 }
431
432 #[test]
433 fn dropped() {
434 let mut producer = AnnounceProducer::new();
435 let mut consumer = producer.consume("");
436
437 let ab = "a/b";
438 let ac = "a/c";
439 let de = "d/e";
440
441 assert!(producer.insert(ab));
442 assert!(producer.insert(ac));
443
444 consumer.assert_active(ab);
445 consumer.assert_active(ac);
446
447 producer.insert(de);
449 drop(producer);
450
451 consumer.assert_active(de);
452 consumer.assert_ended(ab);
453 consumer.assert_ended(ac);
454 consumer.assert_ended(de);
455 consumer.assert_done();
456 }
457
458 #[tokio::test]
459 async fn wakeup() {
460 tokio::time::pause();
461
462 let mut producer = AnnounceProducer::new();
463 let mut consumer = producer.consume("");
464
465 tokio::spawn(async move {
466 let ab = "a/b";
467 let ac = "a/c";
468
469 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
470 producer.insert(ab);
471 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
472 producer.insert(ac);
473 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
474 producer.remove(ab);
475 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
476 drop(producer);
478 });
479
480 let ab = "a/b";
481 let ac = "a/c";
482
483 consumer.next().await.unwrap().assert_active(ab);
484 consumer.next().await.unwrap().assert_active(ac);
485 consumer.next().await.unwrap().assert_ended(ab);
486 consumer.next().await.unwrap().assert_ended(ac);
487 assert_eq!(consumer.next().await, None);
488 }
489}