1use moq_async::{Lock, LockWeak};
2use std::{
3 collections::{BTreeSet, VecDeque},
4 fmt,
5};
6use tokio::sync::mpsc;
7
8pub use crate::message::Filter;
9use crate::message::FilterMatch;
10
11#[derive(Clone, Debug, PartialEq, Eq)]
13pub enum Announced {
14 Active(AnnouncedMatch),
16
17 Ended(AnnouncedMatch),
19
20 Live,
22}
23
24#[cfg(test)]
25impl Announced {
26 pub fn assert_active(&self, expected: &str) {
27 match self {
28 Announced::Active(m) => assert_eq!(m.capture(), expected),
29 _ => panic!("expected active announce"),
30 }
31 }
32
33 pub fn assert_ended(&self, expected: &str) {
34 match self {
35 Announced::Ended(m) => assert_eq!(m.capture(), expected),
36 _ => panic!("expected ended announce"),
37 }
38 }
39
40 pub fn assert_live(&self) {
41 match self {
42 Announced::Live => (),
43 _ => panic!("expected live announce"),
44 }
45 }
46}
47
48#[derive(Clone, PartialEq, Eq)]
50pub struct AnnouncedMatch {
51 full: String,
52 capture: (usize, usize),
53}
54
55impl AnnouncedMatch {
56 pub fn full(&self) -> &str {
57 &self.full
58 }
59
60 pub fn capture(&self) -> &str {
61 &self.full[self.capture.0..self.capture.1]
62 }
63
64 pub fn to_full(self) -> String {
65 self.full
66 }
67
68 pub fn to_capture(mut self) -> String {
69 self.full.truncate(self.capture.1);
70 self.full.split_off(self.capture.0)
71 }
72}
73
74impl From<FilterMatch<'_>> for AnnouncedMatch {
75 fn from(value: FilterMatch) -> Self {
76 AnnouncedMatch {
77 full: value.full().to_string(),
78 capture: value.capture_index(),
79 }
80 }
81}
82
83impl fmt::Debug for AnnouncedMatch {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("AnnouncedMatch")
86 .field("full", &self.full())
87 .field("capture", &self.capture())
88 .finish()
89 }
90}
91
92#[derive(Default)]
93struct ProducerState {
94 active: BTreeSet<String>,
95 consumers: Vec<(Lock<ConsumerState>, mpsc::Sender<()>)>,
96 live: bool,
97}
98
99impl ProducerState {
100 fn insert(&mut self, path: String) -> bool {
101 if !self.active.insert(path.clone()) {
102 return false;
103 }
104
105 let mut i = 0;
106
107 while let Some((consumer, notify)) = self.consumers.get(i) {
108 if !notify.is_closed() {
109 consumer.lock().insert(&path);
110 notify.try_send(()).ok();
111 i += 1;
112 } else {
113 self.consumers.swap_remove(i);
114 }
115 }
116
117 true
118 }
119
120 fn remove(&mut self, path: &str) -> bool {
121 if !self.active.remove(path) {
122 return false;
123 }
124
125 let mut i = 0;
126
127 while let Some((consumer, notify)) = self.consumers.get(i) {
128 if !notify.is_closed() {
129 consumer.lock().remove(path);
130 notify.try_send(()).ok();
131 i += 1;
132 } else {
133 self.consumers.swap_remove(i);
134 }
135 }
136
137 true
138 }
139
140 fn live(&mut self) -> bool {
141 if self.live {
142 return false;
143 }
144
145 self.live = true;
146
147 let mut i = 0;
148 while let Some((consumer, notify)) = self.consumers.get(i) {
149 if !notify.is_closed() {
150 consumer.lock().live();
151 notify.try_send(()).ok();
152 i += 1;
153 } else {
154 self.consumers.swap_remove(i);
155 }
156 }
157
158 true
159 }
160
161 fn consumer(&mut self, filter: Filter) -> ConsumerState {
162 let mut added = VecDeque::new();
163
164 for active in &self.active {
165 if let Some(m) = filter.matches(active) {
166 added.push_back(m.into());
167 }
168 }
169
170 ConsumerState {
171 added,
172 removed: VecDeque::new(),
173 filter,
174 live: self.live,
175 }
176 }
177
178 fn subscribe(&mut self, consumer: Lock<ConsumerState>) -> mpsc::Receiver<()> {
179 let (tx, rx) = mpsc::channel(1);
180 self.consumers.push((consumer.clone(), tx));
181 rx
182 }
183}
184
185impl Drop for ProducerState {
186 fn drop(&mut self) {
187 for (consumer, notify) in &self.consumers {
188 let mut consumer = consumer.lock();
189 for path in &self.active {
190 consumer.remove(path);
191 }
192
193 notify.try_send(()).ok();
194 }
195 }
196}
197
198#[derive(Clone)]
199struct ConsumerState {
200 filter: Filter,
201 added: VecDeque<AnnouncedMatch>,
202 removed: VecDeque<AnnouncedMatch>,
203 live: bool,
204}
205
206impl ConsumerState {
207 pub fn insert(&mut self, path: &str) {
208 let added: AnnouncedMatch = match self.filter.matches(path) {
209 Some(m) => m.into(),
210 None => return,
211 };
212
213 if let Some(index) = self
216 .removed
217 .iter()
218 .position(|removed| removed.capture() == added.capture())
219 {
220 self.removed.remove(index);
221 } else {
222 self.added.push_back(added);
223 }
224 }
225
226 pub fn remove(&mut self, path: &str) {
227 let removed: AnnouncedMatch = match self.filter.matches(path) {
228 Some(m) => m.into(),
229 None => return,
230 };
231
232 if let Some(index) = self.added.iter().position(|added| added.capture() == removed.capture()) {
235 self.added.remove(index);
236 } else {
237 self.removed.push_back(removed);
238 }
239 }
240
241 pub fn live(&mut self) {
242 self.live = true;
243 }
244
245 pub fn reset(&mut self) {
246 self.added.clear();
247 self.removed.clear();
248 self.live = false;
249 }
250}
251
252#[derive(Default, Clone)]
255pub struct AnnouncedProducer {
256 state: Lock<ProducerState>,
257}
258
259impl AnnouncedProducer {
260 pub fn new() -> Self {
261 Self::default()
262 }
263
264 pub fn announce<T: ToString>(&mut self, path: T) -> bool {
266 let path = path.to_string();
267 let mut state = self.state.lock();
268 state.insert(path)
269 }
270
271 pub fn is_active(&self, path: &str) -> bool {
273 self.state.lock().active.contains(path)
274 }
275
276 pub fn is_empty(&self) -> bool {
278 self.state.lock().active.is_empty()
279 }
280
281 pub fn unannounce(&mut self, path: &str) -> bool {
283 let mut state = self.state.lock();
284 state.remove(path)
285 }
286
287 pub fn live(&mut self) -> bool {
289 let mut state = self.state.lock();
290 state.live()
291 }
292
293 pub fn subscribe(&self, filter: Filter) -> AnnouncedConsumer {
295 let mut state = self.state.lock();
296 let consumer = Lock::new(state.consumer(filter));
297 let notify = state.subscribe(consumer.clone());
298 AnnouncedConsumer::new(self.state.downgrade(), consumer, notify)
299 }
300
301 pub fn reset(&mut self) {
303 let mut state = self.state.lock();
304
305 let mut i = 0;
306 while let Some((consumer, notify)) = state.consumers.get(i) {
307 if !notify.is_closed() {
308 consumer.lock().reset();
309 i += 1;
310 } else {
311 state.consumers.swap_remove(i);
312 }
313 }
314 }
315
316 pub async fn closed(&self) {
320 while let Some(notify) = self.closed_inner() {
322 notify.closed().await;
323 }
324 }
325
326 fn closed_inner(&self) -> Option<mpsc::Sender<()>> {
328 let mut state = self.state.lock();
329
330 while let Some((_, notify)) = state.consumers.last() {
331 if !notify.is_closed() {
332 return Some(notify.clone());
333 }
334
335 state.consumers.pop();
336 }
337
338 None
339 }
340}
341
342pub struct AnnouncedConsumer {
344 producer: LockWeak<ProducerState>,
345 state: Lock<ConsumerState>,
346 notify: mpsc::Receiver<()>,
347
348 live: bool,
350}
351
352impl AnnouncedConsumer {
353 fn new(producer: LockWeak<ProducerState>, state: Lock<ConsumerState>, notify: mpsc::Receiver<()>) -> Self {
354 Self {
355 producer,
356 state,
357 notify,
358 live: false,
359 }
360 }
361
362 pub async fn next(&mut self) -> Option<Announced> {
364 loop {
365 {
366 let mut state = self.state.lock();
367
368 if let Some(removed) = state.removed.pop_front() {
369 return Some(Announced::Ended(removed));
370 }
371
372 if let Some(added) = state.added.pop_front() {
373 return Some(Announced::Active(added));
374 }
375
376 if !self.live && state.live {
377 self.live = true;
378 return Some(Announced::Live);
379 }
380 }
381
382 self.notify.recv().await?;
383 }
384 }
385}
386
387impl Clone for AnnouncedConsumer {
392 fn clone(&self) -> Self {
393 let consumer = Lock::new(self.state.lock().clone());
394
395 match self.producer.upgrade() {
396 Some(producer) => {
397 let mut producer = producer.lock();
398 let notify = producer.subscribe(consumer.clone());
399 AnnouncedConsumer::new(self.producer.clone(), consumer, notify)
400 }
401 None => {
402 let (_, notify) = mpsc::channel(1);
403 AnnouncedConsumer::new(self.producer.clone(), consumer, notify)
404 }
405 }
406 }
407}
408
409#[cfg(test)]
410use futures::FutureExt;
411
412#[cfg(test)]
413impl AnnouncedConsumer {
414 fn assert_active(&mut self, capture: &str) {
415 self.next()
416 .now_or_never()
417 .expect("would have blocked")
418 .expect("no next announcement")
419 .assert_active(capture);
420 }
421
422 fn assert_ended(&mut self, capture: &str) {
423 self.next()
424 .now_or_never()
425 .expect("would have blocked")
426 .expect("no next announcement")
427 .assert_ended(capture);
428 }
429
430 fn assert_wait(&mut self) {
431 assert_eq!(self.next().now_or_never(), None);
432 }
433
434 fn assert_done(&mut self) {
435 assert_eq!(self.next().now_or_never(), Some(None));
436 }
437
438 fn assert_live(&mut self) {
439 self.next()
440 .now_or_never()
441 .expect("would have blocked")
442 .expect("no next announcement")
443 .assert_live();
444 }
445}
446
447#[cfg(test)]
448mod test {
449 use super::*;
450
451 #[test]
452 fn simple() {
453 let mut producer = AnnouncedProducer::new();
454 let mut consumer = producer.subscribe(Filter::Any);
455
456 assert!(!producer.is_active("a/b"));
457 assert!(producer.announce("a/b"));
458 assert!(producer.is_active("a/b"));
459
460 consumer.assert_active("a/b");
461
462 assert!(producer.unannounce("a/b"));
463 assert!(!producer.is_active("a/b"));
464
465 consumer.assert_ended("a/b");
466 consumer.assert_wait();
467 }
468
469 #[test]
470 fn multi() {
471 let mut producer = AnnouncedProducer::new();
472 let mut consumer = producer.subscribe(Filter::Any);
473
474 assert!(producer.announce("a/b"));
475 assert!(producer.announce("a/c"));
476 assert!(producer.announce("d/e"));
477
478 consumer.assert_active("a/b");
480 consumer.assert_active("a/c");
481 consumer.assert_active("d/e");
482 consumer.assert_wait();
483 }
484
485 #[test]
486 fn late() {
487 let mut producer = AnnouncedProducer::new();
488
489 assert!(producer.announce("a/b"));
490 assert!(producer.announce("a/c"));
491
492 let mut consumer = producer.subscribe(Filter::Any);
494
495 assert!(producer.announce("d/e"));
496 assert!(producer.announce("d/d"));
497
498 consumer.assert_active("a/b");
500 consumer.assert_active("a/c");
501 consumer.assert_active("d/e");
502 consumer.assert_active("d/d");
503 consumer.assert_wait();
504 }
505
506 #[test]
507 fn prefix() {
508 let mut producer = AnnouncedProducer::new();
509 let mut consumer = producer.subscribe(Filter::Prefix("a/".into()));
510
511 assert!(producer.announce("a/b"));
512 assert!(producer.announce("a/c"));
513 assert!(producer.announce("d/e"));
514
515 consumer.assert_active("b");
516 consumer.assert_active("c");
517 consumer.assert_wait();
518 }
519
520 #[test]
521 fn prefix_unannounce() {
522 let mut producer = AnnouncedProducer::new();
523 let mut consumer = producer.subscribe(Filter::Prefix("a/".into()));
524
525 assert!(producer.announce("a/b"));
526 assert!(producer.announce("a/c"));
527 assert!(producer.announce("d/e"));
528
529 consumer.assert_active("b");
530 consumer.assert_active("c");
531 consumer.assert_wait();
532
533 assert!(producer.unannounce("d/e"));
534 assert!(producer.unannounce("a/c"));
535 assert!(producer.unannounce("a/b"));
536
537 consumer.assert_ended("c");
538 consumer.assert_ended("b");
539 consumer.assert_wait();
540 }
541
542 #[test]
543 fn flicker() {
544 let mut producer = AnnouncedProducer::new();
545 let mut consumer = producer.subscribe(Filter::Any);
546
547 assert!(!producer.is_active("a/b"));
548 assert!(producer.announce("a/b"));
549 assert!(producer.is_active("a/b"));
550 assert!(producer.unannounce("a/b"));
551 assert!(!producer.is_active("a/b"));
552
553 consumer.assert_wait();
555 }
556
557 #[test]
558 fn dropped() {
559 let mut producer = AnnouncedProducer::new();
560 let mut consumer = producer.subscribe(Filter::Any);
561
562 producer.announce("a/b");
563 consumer.assert_active("a/b");
564 producer.announce("a/c");
565 consumer.assert_active("a/c");
566
567 producer.announce("d/e");
569 drop(producer);
570
571 consumer.assert_ended("a/b");
572 consumer.assert_ended("a/c");
573 consumer.assert_done();
574 }
575
576 #[test]
577 fn live() {
578 let mut producer = AnnouncedProducer::new();
579 let mut consumer = producer.subscribe(Filter::Any);
580
581 producer.announce("a/b");
582 producer.live();
583 producer.announce("a/c");
584
585 consumer.assert_active("a/b");
586 consumer.assert_active("a/c");
587 consumer.assert_live();
589
590 producer.live(); producer.announce("d/e");
592
593 consumer.assert_active("d/e");
594 consumer.assert_wait();
595 }
596
597 #[tokio::test]
598 async fn wakeup() {
599 tokio::time::pause();
600
601 let mut producer = AnnouncedProducer::new();
602 let mut consumer = producer.subscribe(Filter::Any);
603
604 tokio::spawn(async move {
605 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
606 producer.announce("a/b");
607 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
608 producer.announce("a/c");
609 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
610 producer.unannounce("a/b");
611 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
612 drop(producer);
614 });
615
616 consumer.next().await.unwrap().assert_active("a/b");
617 consumer.next().await.unwrap().assert_active("a/c");
618 consumer.next().await.unwrap().assert_ended("a/b");
619 consumer.next().await.unwrap().assert_ended("a/c");
620 assert_eq!(consumer.next().await, None);
621 }
622}