1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() {
125 next
126 } else {
127 next.lock().leaf(&rest)
128 }
129 }
130
131 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
132 match self.nested.get(dir) {
133 Some(next) => next.clone(),
134 None => {
135 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
136 self.nested.insert(dir.to_string(), next.clone());
137 next
138 }
139 }
140 }
141
142 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
143 let full = full.as_path();
144 let rest = relative.as_path();
145
146 if let Some((dir, relative)) = rest.next_part() {
148 self.entry(dir).lock().publish(&full, broadcast, &relative);
150 } else if let Some(existing) = &mut self.broadcast {
151 let old = existing.active.clone();
153 existing.active = broadcast.clone();
154 existing.backup.push(old);
155
156 self.notify.lock().reannounce(full, broadcast);
157 } else {
158 self.broadcast = Some(OriginBroadcast {
160 path: full.to_owned(),
161 active: broadcast.clone(),
162 backup: Vec::new(),
163 });
164 self.notify.lock().announce(full, broadcast);
165 }
166 }
167
168 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
169 self.consume_initial(&mut notify);
170 self.notify.lock().consumers.insert(id, notify);
171 }
172
173 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
174 if let Some(broadcast) = &self.broadcast {
175 notify.announce(&broadcast.path, broadcast.active.clone());
176 }
177
178 for nested in self.nested.values() {
180 nested.lock().consume_initial(notify);
181 }
182 }
183
184 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
185 let rest = rest.as_path();
186
187 if let Some((dir, rest)) = rest.next_part() {
188 let node = self.nested.get(dir)?.lock();
189 node.consume_broadcast(&rest)
190 } else {
191 self.broadcast.as_ref().map(|b| b.active.clone())
192 }
193 }
194
195 fn unconsume(&mut self, id: ConsumerId) {
196 self.notify.lock().consumers.remove(&id).expect("consumer not found");
197 if self.is_empty() {
198 }
201 }
202
203 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
205 let full = full.as_path();
206 let relative = relative.as_path();
207
208 if let Some((dir, relative)) = relative.next_part() {
209 let nested = self.entry(dir);
210 let mut locked = nested.lock();
211 locked.remove(&full, broadcast, &relative);
212
213 if locked.is_empty() {
214 drop(locked);
215 self.nested.remove(dir);
216 }
217 } else {
218 let entry = match &mut self.broadcast {
219 Some(existing) => existing,
220 None => return,
221 };
222
223 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
225 if let Some(pos) = pos {
226 entry.backup.remove(pos);
227 return;
229 }
230
231 assert!(entry.active.is_clone(&broadcast));
233
234 if let Some(active) = entry.backup.pop() {
236 entry.active = active;
237 self.notify.lock().reannounce(full, &entry.active);
238 } else {
239 self.broadcast = None;
241 self.notify.lock().unannounce(full);
242 }
243 }
244 }
245
246 fn is_empty(&self) -> bool {
247 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
248 }
249}
250
251#[derive(Clone)]
252struct OriginNodes {
253 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
254}
255
256impl OriginNodes {
257 pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
260 let mut roots = Vec::new();
261
262 for (root, state) in &self.nodes {
263 for prefix in prefixes {
264 if root.has_prefix(prefix) {
265 roots.push((root.to_owned(), state.clone()));
267 continue;
268 }
269
270 if let Some(suffix) = prefix.strip_prefix(root) {
271 let nested = state.lock().leaf(&suffix);
273 roots.push((prefix.to_owned(), nested));
274 }
275 }
276 }
277
278 if roots.is_empty() {
279 None
280 } else {
281 Some(Self { nodes: roots })
282 }
283 }
284
285 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
286 let new_root = new_root.as_path();
287 let mut roots = Vec::new();
288
289 if new_root.is_empty() {
290 return Some(self.clone());
291 }
292
293 for (root, state) in &self.nodes {
294 if let Some(suffix) = root.strip_prefix(&new_root) {
295 roots.push((suffix.to_owned(), state.clone()));
297 } else if let Some(suffix) = new_root.strip_prefix(root) {
298 let nested = state.lock().leaf(&suffix);
301 roots.push(("".into(), nested));
302 }
303 }
304
305 if roots.is_empty() {
306 None
307 } else {
308 Some(Self { nodes: roots })
309 }
310 }
311
312 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
314 let path = path.as_path();
315
316 for (root, state) in &self.nodes {
317 if let Some(suffix) = path.strip_prefix(root) {
318 return Some((state.clone(), suffix.to_owned()));
319 }
320 }
321
322 None
323 }
324}
325
326impl Default for OriginNodes {
327 fn default() -> Self {
328 Self {
329 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
330 }
331 }
332}
333
334pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
336
337pub struct Origin {}
338
339impl Origin {
340 pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
341 let producer = OriginProducer::default();
342 let consumer = producer.consume();
343 Produce { producer, consumer }
344 }
345}
346
347#[derive(Clone, Default)]
349pub struct OriginProducer {
350 nodes: OriginNodes,
353
354 root: PathOwned,
356}
357
358impl OriginProducer {
359 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
368 let path = path.as_path();
369
370 let (root, rest) = match self.nodes.get(&path) {
371 Some(root) => root,
372 None => return false,
373 };
374
375 let full = self.root.join(&path);
376
377 root.lock().publish(&full, &broadcast, &rest);
378 let root = root.clone();
379
380 web_async::spawn(async move {
381 broadcast.closed().await;
382 root.lock().remove(&full, broadcast, &rest);
383 });
384
385 true
386 }
387
388 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
392 Some(OriginProducer {
393 nodes: self.nodes.select(prefixes)?,
394 root: self.root.clone(),
395 })
396 }
397
398 pub fn consume(&self) -> OriginConsumer {
400 OriginConsumer::new(self.root.clone(), self.nodes.clone())
401 }
402
403 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
409 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
410 }
411
412 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
416 let prefix = prefix.as_path();
417
418 Some(Self {
419 root: self.root.join(&prefix).to_owned(),
420 nodes: self.nodes.root(&prefix)?,
421 })
422 }
423
424 pub fn root(&self) -> &Path {
426 &self.root
427 }
428
429 pub fn allowed(&self) -> impl Iterator<Item = &Path> {
430 self.nodes.nodes.iter().map(|(root, _)| root)
431 }
432
433 pub fn absolute(&self, path: impl AsPath) -> Path {
435 self.root.join(path)
436 }
437}
438
439pub struct OriginConsumer {
442 id: ConsumerId,
443 nodes: OriginNodes,
444 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
445
446 root: PathOwned,
448}
449
450impl OriginConsumer {
451 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
452 let (tx, rx) = mpsc::unbounded_channel();
453
454 let id = ConsumerId::new();
455
456 for (_, state) in &nodes.nodes {
457 let notify = OriginConsumerNotify {
458 root: root.clone(),
459 tx: tx.clone(),
460 };
461 state.lock().consume(id, notify);
462 }
463
464 Self {
465 id,
466 nodes,
467 updates: rx,
468 root,
469 }
470 }
471
472 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
480 self.updates.recv().await
481 }
482
483 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
488 self.updates.try_recv().ok()
489 }
490
491 pub fn consume(&self) -> OriginConsumer {
492 OriginConsumer::new(self.root.clone(), self.nodes.clone())
493 }
494
495 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
501 let path = path.as_path();
502 let (root, rest) = self.nodes.get(&path)?;
503 let state = root.lock();
504 state.consume_broadcast(&rest)
505 }
506
507 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
511 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
512 }
513
514 pub fn root(&self) -> &Path {
516 &self.root
517 }
518
519 pub fn allowed(&self) -> impl Iterator<Item = &Path> {
520 self.nodes.nodes.iter().map(|(root, _)| root)
521 }
522
523 pub fn absolute(&self, path: impl AsPath) -> Path {
525 self.root.join(path)
526 }
527}
528
529impl Drop for OriginConsumer {
530 fn drop(&mut self) {
531 for (_, root) in &self.nodes.nodes {
532 root.lock().unconsume(self.id);
533 }
534 }
535}
536
537#[cfg(test)]
538use futures::FutureExt;
539
540#[cfg(test)]
541impl OriginConsumer {
542 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
543 let expected = expected.as_path();
544 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
545 assert_eq!(path, expected, "wrong path");
546 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
547 }
548
549 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
550 let expected = expected.as_path();
551 let (path, active) = self.try_announced().expect("no next");
552 assert_eq!(path, expected, "wrong path");
553 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
554 }
555
556 pub fn assert_next_none(&mut self, expected: impl AsPath) {
557 let expected = expected.as_path();
558 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
559 assert_eq!(path, expected, "wrong path");
560 assert!(active.is_none(), "should be unannounced");
561 }
562
563 pub fn assert_next_wait(&mut self) {
564 if let Some(res) = self.announced().now_or_never() {
565 panic!("next should block: got {:?}", res.map(|(path, _)| path));
566 }
567 }
568
569 }
578
579#[cfg(test)]
580mod tests {
581 use crate::Broadcast;
582
583 use super::*;
584
585 #[tokio::test]
586 async fn test_announce() {
587 let origin = Origin::produce();
588 let broadcast1 = Broadcast::produce();
589 let broadcast2 = Broadcast::produce();
590
591 let mut consumer1 = origin.consumer;
592 consumer1.assert_next_wait();
594
595 origin.producer.publish_broadcast("test1", broadcast1.consumer);
597
598 consumer1.assert_next("test1", &broadcast1.producer.consume());
599 consumer1.assert_next_wait();
600
601 let mut consumer2 = origin.producer.consume();
604
605 origin.producer.publish_broadcast("test2", broadcast2.consumer);
607
608 consumer1.assert_next("test2", &broadcast2.producer.consume());
609 consumer1.assert_next_wait();
610
611 consumer2.assert_next("test1", &broadcast1.producer.consume());
612 consumer2.assert_next("test2", &broadcast2.producer.consume());
613 consumer2.assert_next_wait();
614
615 drop(broadcast1.producer);
617
618 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
620
621 consumer1.assert_next_none("test1");
623 consumer2.assert_next_none("test1");
624 consumer1.assert_next_wait();
625 consumer2.assert_next_wait();
626
627 let mut consumer3 = origin.producer.consume();
629 consumer3.assert_next("test2", &broadcast2.producer.consume());
630 consumer3.assert_next_wait();
631
632 drop(broadcast2.producer);
634
635 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
637
638 consumer1.assert_next_none("test2");
639 consumer2.assert_next_none("test2");
640 consumer3.assert_next_none("test2");
641
642 }
648
649 #[tokio::test]
650 async fn test_duplicate() {
651 let mut origin = Origin::produce();
652
653 let broadcast1 = Broadcast::produce();
654 let broadcast2 = Broadcast::produce();
655 let broadcast3 = Broadcast::produce();
656
657 let consumer1 = broadcast1.consumer;
658 let consumer2 = broadcast2.consumer;
659 let consumer3 = broadcast3.consumer;
660
661 origin.producer.publish_broadcast("test", consumer1.clone());
662 origin.producer.publish_broadcast("test", consumer2.clone());
663 origin.producer.publish_broadcast("test", consumer3.clone());
664
665 assert!(origin.consumer.consume_broadcast("test").is_some());
666
667 origin.consumer.assert_next("test", &consumer1);
668 origin.consumer.assert_next_none("test");
669 origin.consumer.assert_next("test", &consumer2);
670 origin.consumer.assert_next_none("test");
671 origin.consumer.assert_next("test", &consumer3);
672
673 drop(broadcast2.producer);
675
676 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
678
679 assert!(origin.consumer.consume_broadcast("test").is_some());
680 origin.consumer.assert_next_wait();
681
682 drop(broadcast3.producer);
684
685 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
687
688 assert!(origin.consumer.consume_broadcast("test").is_some());
689 origin.consumer.assert_next_none("test");
690 origin.consumer.assert_next("test", &consumer1);
691
692 drop(broadcast1.producer);
694
695 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
697 assert!(origin.consumer.consume_broadcast("test").is_none());
698
699 origin.consumer.assert_next_none("test");
700 origin.consumer.assert_next_wait();
701 }
702
703 #[tokio::test]
704 async fn test_duplicate_reverse() {
705 let origin = Origin::produce();
706 let broadcast1 = Broadcast::produce();
707 let broadcast2 = Broadcast::produce();
708
709 origin.producer.publish_broadcast("test", broadcast1.consumer);
710 origin.producer.publish_broadcast("test", broadcast2.consumer);
711 assert!(origin.consumer.consume_broadcast("test").is_some());
712
713 drop(broadcast2.producer);
715
716 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
718 assert!(origin.consumer.consume_broadcast("test").is_some());
719
720 drop(broadcast1.producer);
721
722 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
724 assert!(origin.consumer.consume_broadcast("test").is_none());
725 }
726
727 #[tokio::test]
728 async fn test_double_publish() {
729 let origin = Origin::produce();
730 let broadcast = Broadcast::produce();
731
732 origin.producer.publish_broadcast("test", broadcast.producer.consume());
734 origin.producer.publish_broadcast("test", broadcast.producer.consume());
735
736 assert!(origin.consumer.consume_broadcast("test").is_some());
737
738 drop(broadcast.producer);
739
740 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
742 assert!(origin.consumer.consume_broadcast("test").is_none());
743 }
744 #[tokio::test]
746 #[should_panic]
747 async fn test_128() {
748 let mut origin = Origin::produce();
749 let broadcast = Broadcast::produce();
750
751 for i in 0..256 {
752 origin
753 .producer
754 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
755 }
756
757 for i in 0..256 {
758 origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
759 }
760 }
761
762 #[tokio::test]
763 async fn test_128_fix() {
764 let mut origin = Origin::produce();
765 let broadcast = Broadcast::produce();
766
767 for i in 0..256 {
768 origin
769 .producer
770 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
771 }
772
773 for i in 0..256 {
774 origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
776 }
777 }
778
779 #[tokio::test]
780 async fn test_with_root_basic() {
781 let mut origin = Origin::produce();
782 let broadcast = Broadcast::produce();
783
784 let foo_producer = origin.producer.with_root("foo").expect("should create root");
786 assert_eq!(foo_producer.root().as_str(), "foo");
787
788 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
790
791 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
793
794 let mut foo_consumer = foo_producer.consume();
796 foo_consumer.assert_next("bar/baz", &broadcast.consumer);
797 }
798
799 #[tokio::test]
800 async fn test_with_root_nested() {
801 let mut origin = Origin::produce();
802 let broadcast = Broadcast::produce();
803
804 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
806 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
807 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
808
809 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
811
812 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
814
815 let mut foo_bar_consumer = foo_bar_producer.consume();
817 foo_bar_consumer.assert_next("baz", &broadcast.consumer);
818 }
819
820 #[tokio::test]
821 async fn test_publish_only_allows() {
822 let origin = Origin::produce();
823 let broadcast = Broadcast::produce();
824
825 let limited_producer = origin
827 .producer
828 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
829 .expect("should create limited producer");
830
831 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
833 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
834 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
835
836 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
838 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
840 }
841
842 #[tokio::test]
843 async fn test_publish_only_empty() {
844 let origin = Origin::produce();
845
846 assert!(origin.producer.publish_only(&[]).is_none());
848 }
849
850 #[tokio::test]
851 async fn test_consume_only_filters() {
852 let mut origin = Origin::produce();
853 let broadcast1 = Broadcast::produce();
854 let broadcast2 = Broadcast::produce();
855 let broadcast3 = Broadcast::produce();
856
857 origin
859 .producer
860 .publish_broadcast("allowed", broadcast1.consumer.clone());
861 origin
862 .producer
863 .publish_broadcast("allowed/nested", broadcast2.consumer.clone());
864 origin
865 .producer
866 .publish_broadcast("notallowed", broadcast3.consumer.clone());
867
868 let mut limited_consumer = origin
870 .consumer
871 .consume_only(&["allowed".into()])
872 .expect("should create limited consumer");
873
874 limited_consumer.assert_next("allowed", &broadcast1.consumer);
876 limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
877 limited_consumer.assert_next_wait(); origin.consumer.assert_next("allowed", &broadcast1.consumer);
881 origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
882 origin.consumer.assert_next("notallowed", &broadcast3.consumer);
883 }
884
885 #[tokio::test]
886 async fn test_consume_only_multiple_prefixes() {
887 let origin = Origin::produce();
888 let broadcast1 = Broadcast::produce();
889 let broadcast2 = Broadcast::produce();
890 let broadcast3 = Broadcast::produce();
891
892 origin
893 .producer
894 .publish_broadcast("foo/test", broadcast1.consumer.clone());
895 origin
896 .producer
897 .publish_broadcast("bar/test", broadcast2.consumer.clone());
898 origin
899 .producer
900 .publish_broadcast("baz/test", broadcast3.consumer.clone());
901
902 let mut limited_consumer = origin
904 .consumer
905 .consume_only(&["foo".into(), "bar".into()])
906 .expect("should create limited consumer");
907
908 limited_consumer.assert_next("foo/test", &broadcast1.consumer);
909 limited_consumer.assert_next("bar/test", &broadcast2.consumer);
910 limited_consumer.assert_next_wait(); }
912
913 #[tokio::test]
914 async fn test_with_root_and_publish_only() {
915 let mut origin = Origin::produce();
916 let broadcast = Broadcast::produce();
917
918 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
920
921 let limited_producer = foo_producer
923 .publish_only(&["bar".into(), "goop/pee".into()])
924 .expect("should create limited producer");
925
926 assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
928 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
929 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
930 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
931
932 assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
934 assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
936
937 origin.consumer.assert_next("foo/bar", &broadcast.consumer);
939 origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
940 origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
941 origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
942 }
943
944 #[tokio::test]
945 async fn test_with_root_and_consume_only() {
946 let origin = Origin::produce();
947 let broadcast1 = Broadcast::produce();
948 let broadcast2 = Broadcast::produce();
949 let broadcast3 = Broadcast::produce();
950
951 origin
953 .producer
954 .publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
955 origin
956 .producer
957 .publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
958 origin
959 .producer
960 .publish_broadcast("foo/other/test", broadcast3.consumer.clone());
961
962 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
964
965 let mut limited_consumer = foo_producer
967 .consume_only(&["bar".into(), "goop/pee".into()])
968 .expect("should create limited consumer");
969
970 limited_consumer.assert_next("bar/test", &broadcast1.consumer);
972 limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
973 limited_consumer.assert_next_wait(); }
975
976 #[tokio::test]
977 async fn test_with_root_unauthorized() {
978 let origin = Origin::produce();
979
980 let limited_producer = origin
982 .producer
983 .publish_only(&["allowed".into()])
984 .expect("should create limited producer");
985
986 assert!(limited_producer.with_root("notallowed").is_none());
988
989 let allowed_root = limited_producer
991 .with_root("allowed")
992 .expect("should create allowed root");
993 assert_eq!(allowed_root.root().as_str(), "allowed");
994 }
995
996 #[tokio::test]
997 async fn test_wildcard_permission() {
998 let origin = Origin::produce();
999 let broadcast = Broadcast::produce();
1000
1001 let root_producer = origin.producer.clone();
1003
1004 assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1006 assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1007
1008 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1010 assert_eq!(foo_producer.root().as_str(), "foo");
1011 }
1012
1013 #[tokio::test]
1014 async fn test_consume_broadcast_with_permissions() {
1015 let origin = Origin::produce();
1016 let broadcast1 = Broadcast::produce();
1017 let broadcast2 = Broadcast::produce();
1018
1019 origin
1020 .producer
1021 .publish_broadcast("allowed/test", broadcast1.consumer.clone());
1022 origin
1023 .producer
1024 .publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1025
1026 let limited_consumer = origin
1028 .consumer
1029 .consume_only(&["allowed".into()])
1030 .expect("should create limited consumer");
1031
1032 let result = limited_consumer.consume_broadcast("allowed/test");
1034 assert!(result.is_some());
1035 assert!(result.unwrap().is_clone(&broadcast1.consumer));
1036
1037 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1039
1040 assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1042 assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1043 }
1044
1045 #[tokio::test]
1046 async fn test_nested_paths_with_permissions() {
1047 let origin = Origin::produce();
1048 let broadcast = Broadcast::produce();
1049
1050 let limited_producer = origin
1052 .producer
1053 .publish_only(&["a/b/c".into()])
1054 .expect("should create limited producer");
1055
1056 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1058 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1059 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1060
1061 assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1063 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1064 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1065 }
1066
1067 #[tokio::test]
1068 async fn test_multiple_consumers_with_different_permissions() {
1069 let origin = Origin::produce();
1070 let broadcast1 = Broadcast::produce();
1071 let broadcast2 = Broadcast::produce();
1072 let broadcast3 = Broadcast::produce();
1073
1074 origin
1076 .producer
1077 .publish_broadcast("foo/test", broadcast1.consumer.clone());
1078 origin
1079 .producer
1080 .publish_broadcast("bar/test", broadcast2.consumer.clone());
1081 origin
1082 .producer
1083 .publish_broadcast("baz/test", broadcast3.consumer.clone());
1084
1085 let mut foo_consumer = origin
1087 .consumer
1088 .consume_only(&["foo".into()])
1089 .expect("should create foo consumer");
1090
1091 let mut bar_consumer = origin
1092 .consumer
1093 .consume_only(&["bar".into()])
1094 .expect("should create bar consumer");
1095
1096 let mut foobar_consumer = origin
1097 .consumer
1098 .consume_only(&["foo".into(), "bar".into()])
1099 .expect("should create foobar consumer");
1100
1101 foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1103 foo_consumer.assert_next_wait();
1104
1105 bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1106 bar_consumer.assert_next_wait();
1107
1108 foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1109 foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1110 foobar_consumer.assert_next_wait();
1111 }
1112
1113 #[tokio::test]
1114 async fn test_select_with_empty_prefix() {
1115 let origin = Origin::produce();
1116 let broadcast1 = Broadcast::produce();
1117 let broadcast2 = Broadcast::produce();
1118
1119 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1121 let limited_producer = demo_producer
1122 .publish_only(&["worm-node".into(), "foobar".into()])
1123 .expect("should create limited producer");
1124
1125 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1127 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1128
1129 let mut consumer = limited_producer
1131 .consume_only(&["".into()])
1132 .expect("should create consumer with empty prefix");
1133
1134 consumer.assert_next("worm-node/test", &broadcast1.consumer);
1136 consumer.assert_next("foobar/test", &broadcast2.consumer);
1137 consumer.assert_next_wait();
1138 }
1139
1140 #[tokio::test]
1141 async fn test_select_narrowing_scope() {
1142 let origin = Origin::produce();
1143 let broadcast1 = Broadcast::produce();
1144 let broadcast2 = Broadcast::produce();
1145 let broadcast3 = Broadcast::produce();
1146
1147 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1149 let limited_producer = demo_producer
1150 .publish_only(&["worm-node".into(), "foobar".into()])
1151 .expect("should create limited producer");
1152
1153 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1155 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1156 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1157
1158 let mut worm_consumer = limited_producer
1160 .consume_only(&["worm-node".into()])
1161 .expect("should create worm-node consumer");
1162
1163 worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1165 worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1166 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1170 .consume_only(&["worm-node/foo".into()])
1171 .expect("should create worm-node/foo consumer");
1172
1173 foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1174 foo_consumer.assert_next_wait(); }
1176
1177 #[tokio::test]
1178 async fn test_select_multiple_roots_with_empty_prefix() {
1179 let origin = Origin::produce();
1180 let broadcast1 = Broadcast::produce();
1181 let broadcast2 = Broadcast::produce();
1182 let broadcast3 = Broadcast::produce();
1183
1184 let limited_producer = origin
1186 .producer
1187 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1188 .expect("should create limited producer");
1189
1190 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1192 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1193 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1194
1195 let mut consumer = limited_producer
1197 .consume_only(&["".into()])
1198 .expect("should create consumer with empty prefix");
1199
1200 consumer.assert_next("app1/data", &broadcast1.consumer);
1202 consumer.assert_next("app2/config", &broadcast2.consumer);
1203 consumer.assert_next("shared/resource", &broadcast3.consumer);
1204 consumer.assert_next_wait();
1205 }
1206
1207 #[tokio::test]
1208 async fn test_publish_only_with_empty_prefix() {
1209 let origin = Origin::produce();
1210 let broadcast = Broadcast::produce();
1211
1212 let limited_producer = origin
1214 .producer
1215 .publish_only(&["services/api".into(), "services/web".into()])
1216 .expect("should create limited producer");
1217
1218 let same_producer = limited_producer
1220 .publish_only(&["".into()])
1221 .expect("should create producer with empty prefix");
1222
1223 assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1225 assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1226 assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1227 assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1228 }
1229
1230 #[tokio::test]
1231 async fn test_select_narrowing_to_deeper_path() {
1232 let origin = Origin::produce();
1233 let broadcast1 = Broadcast::produce();
1234 let broadcast2 = Broadcast::produce();
1235 let broadcast3 = Broadcast::produce();
1236
1237 let limited_producer = origin
1239 .producer
1240 .publish_only(&["org".into()])
1241 .expect("should create limited producer");
1242
1243 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1245 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1246 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1247
1248 let mut team1_consumer = limited_producer
1250 .consume_only(&["org/team2".into()])
1251 .expect("should create team1 consumer");
1252
1253 team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1254 team1_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1258 .consume_only(&["org/team1/project1".into()])
1259 .expect("should create project1 consumer");
1260
1261 project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1263 project1_consumer.assert_next_wait();
1264 }
1265
1266 #[tokio::test]
1267 async fn test_select_with_non_matching_prefix() {
1268 let origin = Origin::produce();
1269
1270 let limited_producer = origin
1272 .producer
1273 .publish_only(&["allowed/path".into()])
1274 .expect("should create limited producer");
1275
1276 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1278
1279 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1281 }
1282
1283 #[tokio::test]
1284 async fn test_select_maintains_access_with_wider_prefix() {
1285 let origin = Origin::produce();
1286 let broadcast1 = Broadcast::produce();
1287 let broadcast2 = Broadcast::produce();
1288
1289 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1291 let user_producer = demo_producer
1292 .publish_only(&["worm-node".into(), "foobar".into()])
1293 .expect("should create user producer");
1294
1295 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1297 assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1298
1299 let mut consumer = user_producer
1301 .consume_only(&["".into()])
1302 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1303
1304 consumer.assert_next("worm-node/data", &broadcast1.consumer);
1306 consumer.assert_next("foobar", &broadcast2.consumer);
1307 consumer.assert_next_wait();
1308
1309 let mut narrow_consumer = user_producer
1311 .consume_only(&["worm-node".into()])
1312 .expect("should be able to narrow scope to worm-node");
1313
1314 narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1315 narrow_consumer.assert_next_wait(); }
1317}