1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() {
125 next
126 } else {
127 next.lock().leaf(&rest)
128 }
129 }
130
131 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
132 match self.nested.get(dir) {
133 Some(next) => next.clone(),
134 None => {
135 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
136 self.nested.insert(dir.to_string(), next.clone());
137 next
138 }
139 }
140 }
141
142 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
143 let full = full.as_path();
144 let rest = relative.as_path();
145
146 if let Some((dir, relative)) = rest.next_part() {
148 self.entry(dir).lock().publish(&full, broadcast, &relative);
150 } else if let Some(existing) = &mut self.broadcast {
151 let old = existing.active.clone();
153 existing.active = broadcast.clone();
154 existing.backup.push(old);
155
156 self.notify.lock().reannounce(full, broadcast);
157 } else {
158 self.broadcast = Some(OriginBroadcast {
160 path: full.to_owned(),
161 active: broadcast.clone(),
162 backup: Vec::new(),
163 });
164 self.notify.lock().announce(full, broadcast);
165 }
166 }
167
168 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
169 self.consume_initial(&mut notify);
170 self.notify.lock().consumers.insert(id, notify);
171 }
172
173 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
174 if let Some(broadcast) = &self.broadcast {
175 notify.announce(&broadcast.path, broadcast.active.clone());
176 }
177
178 for nested in self.nested.values() {
180 nested.lock().consume_initial(notify);
181 }
182 }
183
184 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
185 let rest = rest.as_path();
186
187 if let Some((dir, rest)) = rest.next_part() {
188 let node = self.nested.get(dir)?.lock();
189 node.consume_broadcast(&rest)
190 } else {
191 self.broadcast.as_ref().map(|b| b.active.clone())
192 }
193 }
194
195 fn unconsume(&mut self, id: ConsumerId) {
196 self.notify.lock().consumers.remove(&id).expect("consumer not found");
197 if self.is_empty() {
198 }
201 }
202
203 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
205 let full = full.as_path();
206 let relative = relative.as_path();
207
208 if let Some((dir, relative)) = relative.next_part() {
209 let nested = self.entry(dir);
210 let mut locked = nested.lock();
211 locked.remove(&full, broadcast, &relative);
212
213 if locked.is_empty() {
214 drop(locked);
215 self.nested.remove(dir);
216 }
217 } else {
218 let entry = match &mut self.broadcast {
219 Some(existing) => existing,
220 None => return,
221 };
222
223 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
225 if let Some(pos) = pos {
226 entry.backup.remove(pos);
227 return;
229 }
230
231 assert!(entry.active.is_clone(&broadcast));
233
234 if let Some(active) = entry.backup.pop() {
236 entry.active = active;
237 self.notify.lock().reannounce(full, &entry.active);
238 } else {
239 self.broadcast = None;
241 self.notify.lock().unannounce(full);
242 }
243 }
244 }
245
246 fn is_empty(&self) -> bool {
247 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
248 }
249}
250
251#[derive(Clone)]
252struct OriginNodes {
253 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
254}
255
256impl OriginNodes {
257 pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
260 let mut roots = Vec::new();
261
262 for (root, state) in &self.nodes {
263 for prefix in prefixes {
264 if root.has_prefix(prefix) {
265 roots.push((root.to_owned(), state.clone()));
267 continue;
268 }
269
270 if let Some(suffix) = prefix.strip_prefix(root) {
271 let nested = state.lock().leaf(&suffix);
273 roots.push((prefix.to_owned(), nested));
274 }
275 }
276 }
277
278 if roots.is_empty() {
279 None
280 } else {
281 Some(Self { nodes: roots })
282 }
283 }
284
285 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
286 let new_root = new_root.as_path();
287 let mut roots = Vec::new();
288
289 if new_root.is_empty() {
290 return Some(self.clone());
291 }
292
293 for (root, state) in &self.nodes {
294 if let Some(suffix) = root.strip_prefix(&new_root) {
295 roots.push((suffix.to_owned(), state.clone()));
297 } else if let Some(suffix) = new_root.strip_prefix(root) {
298 let nested = state.lock().leaf(&suffix);
301 roots.push(("".into(), nested));
302 }
303 }
304
305 if roots.is_empty() {
306 None
307 } else {
308 Some(Self { nodes: roots })
309 }
310 }
311
312 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
314 let path = path.as_path();
315
316 for (root, state) in &self.nodes {
317 if let Some(suffix) = path.strip_prefix(root) {
318 return Some((state.clone(), suffix.to_owned()));
319 }
320 }
321
322 None
323 }
324}
325
326impl Default for OriginNodes {
327 fn default() -> Self {
328 Self {
329 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
330 }
331 }
332}
333
334pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
336
337pub struct Origin {}
338
339impl Origin {
340 pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
341 let producer = OriginProducer::default();
342 let consumer = producer.consume();
343 Produce { producer, consumer }
344 }
345}
346
347#[derive(Clone, Default)]
349pub struct OriginProducer {
350 nodes: OriginNodes,
353
354 root: PathOwned,
356}
357
358impl OriginProducer {
359 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
368 let path = path.as_path();
369
370 let (root, rest) = match self.nodes.get(&path) {
371 Some(root) => root,
372 None => return false,
373 };
374
375 let full = self.root.join(&path);
376
377 root.lock().publish(&full, &broadcast, &rest);
378 let root = root.clone();
379
380 web_async::spawn(async move {
381 broadcast.closed().await;
382 root.lock().remove(&full, broadcast, &rest);
383 });
384
385 true
386 }
387
388 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
392 Some(OriginProducer {
393 nodes: self.nodes.select(prefixes)?,
394 root: self.root.clone(),
395 })
396 }
397
398 pub fn consume(&self) -> OriginConsumer {
400 OriginConsumer::new(self.root.clone(), self.nodes.clone())
401 }
402
403 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
409 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
410 }
411
412 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
416 let prefix = prefix.as_path();
417
418 Some(Self {
419 root: self.root.join(&prefix).to_owned(),
420 nodes: self.nodes.root(&prefix)?,
421 })
422 }
423
424 pub fn root(&self) -> &Path<'_> {
426 &self.root
427 }
428
429 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
430 self.nodes.nodes.iter().map(|(root, _)| root)
431 }
432
433 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
435 self.root.join(path)
436 }
437}
438
439pub struct OriginConsumer {
443 id: ConsumerId,
444 nodes: OriginNodes,
445 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
446
447 root: PathOwned,
449}
450
451impl OriginConsumer {
452 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
453 let (tx, rx) = mpsc::unbounded_channel();
454
455 let id = ConsumerId::new();
456
457 for (_, state) in &nodes.nodes {
458 let notify = OriginConsumerNotify {
459 root: root.clone(),
460 tx: tx.clone(),
461 };
462 state.lock().consume(id, notify);
463 }
464
465 Self {
466 id,
467 nodes,
468 updates: rx,
469 root,
470 }
471 }
472
473 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
481 self.updates.recv().await
482 }
483
484 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
489 self.updates.try_recv().ok()
490 }
491
492 pub fn consume(&self) -> Self {
493 self.clone()
494 }
495
496 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
502 let path = path.as_path();
503 let (root, rest) = self.nodes.get(&path)?;
504 let state = root.lock();
505 state.consume_broadcast(&rest)
506 }
507
508 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
512 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
513 }
514
515 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
519 let prefix = prefix.as_path();
520
521 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
522 }
523
524 pub fn root(&self) -> &Path<'_> {
526 &self.root
527 }
528
529 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
530 self.nodes.nodes.iter().map(|(root, _)| root)
531 }
532
533 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
535 self.root.join(path)
536 }
537}
538
539impl Drop for OriginConsumer {
540 fn drop(&mut self) {
541 for (_, root) in &self.nodes.nodes {
542 root.lock().unconsume(self.id);
543 }
544 }
545}
546
547impl Clone for OriginConsumer {
548 fn clone(&self) -> Self {
549 OriginConsumer::new(self.root.clone(), self.nodes.clone())
550 }
551}
552
553#[cfg(test)]
554use futures::FutureExt;
555
556#[cfg(test)]
557impl OriginConsumer {
558 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
559 let expected = expected.as_path();
560 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
561 assert_eq!(path, expected, "wrong path");
562 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
563 }
564
565 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
566 let expected = expected.as_path();
567 let (path, active) = self.try_announced().expect("no next");
568 assert_eq!(path, expected, "wrong path");
569 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
570 }
571
572 pub fn assert_next_none(&mut self, expected: impl AsPath) {
573 let expected = expected.as_path();
574 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
575 assert_eq!(path, expected, "wrong path");
576 assert!(active.is_none(), "should be unannounced");
577 }
578
579 pub fn assert_next_wait(&mut self) {
580 if let Some(res) = self.announced().now_or_never() {
581 panic!("next should block: got {:?}", res.map(|(path, _)| path));
582 }
583 }
584
585 }
594
595#[cfg(test)]
596mod tests {
597 use crate::Broadcast;
598
599 use super::*;
600
601 #[tokio::test]
602 async fn test_announce() {
603 let origin = Origin::produce();
604 let broadcast1 = Broadcast::produce();
605 let broadcast2 = Broadcast::produce();
606
607 let mut consumer1 = origin.consumer;
608 consumer1.assert_next_wait();
610
611 origin.producer.publish_broadcast("test1", broadcast1.consumer);
613
614 consumer1.assert_next("test1", &broadcast1.producer.consume());
615 consumer1.assert_next_wait();
616
617 let mut consumer2 = origin.producer.consume();
620
621 origin.producer.publish_broadcast("test2", broadcast2.consumer);
623
624 consumer1.assert_next("test2", &broadcast2.producer.consume());
625 consumer1.assert_next_wait();
626
627 consumer2.assert_next("test1", &broadcast1.producer.consume());
628 consumer2.assert_next("test2", &broadcast2.producer.consume());
629 consumer2.assert_next_wait();
630
631 drop(broadcast1.producer);
633
634 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
636
637 consumer1.assert_next_none("test1");
639 consumer2.assert_next_none("test1");
640 consumer1.assert_next_wait();
641 consumer2.assert_next_wait();
642
643 let mut consumer3 = origin.producer.consume();
645 consumer3.assert_next("test2", &broadcast2.producer.consume());
646 consumer3.assert_next_wait();
647
648 drop(broadcast2.producer);
650
651 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
653
654 consumer1.assert_next_none("test2");
655 consumer2.assert_next_none("test2");
656 consumer3.assert_next_none("test2");
657
658 }
664
665 #[tokio::test]
666 async fn test_duplicate() {
667 let mut origin = Origin::produce();
668
669 let broadcast1 = Broadcast::produce();
670 let broadcast2 = Broadcast::produce();
671 let broadcast3 = Broadcast::produce();
672
673 let consumer1 = broadcast1.consumer;
674 let consumer2 = broadcast2.consumer;
675 let consumer3 = broadcast3.consumer;
676
677 origin.producer.publish_broadcast("test", consumer1.clone());
678 origin.producer.publish_broadcast("test", consumer2.clone());
679 origin.producer.publish_broadcast("test", consumer3.clone());
680
681 assert!(origin.consumer.consume_broadcast("test").is_some());
682
683 origin.consumer.assert_next("test", &consumer1);
684 origin.consumer.assert_next_none("test");
685 origin.consumer.assert_next("test", &consumer2);
686 origin.consumer.assert_next_none("test");
687 origin.consumer.assert_next("test", &consumer3);
688
689 drop(broadcast2.producer);
691
692 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
694
695 assert!(origin.consumer.consume_broadcast("test").is_some());
696 origin.consumer.assert_next_wait();
697
698 drop(broadcast3.producer);
700
701 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
703
704 assert!(origin.consumer.consume_broadcast("test").is_some());
705 origin.consumer.assert_next_none("test");
706 origin.consumer.assert_next("test", &consumer1);
707
708 drop(broadcast1.producer);
710
711 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
713 assert!(origin.consumer.consume_broadcast("test").is_none());
714
715 origin.consumer.assert_next_none("test");
716 origin.consumer.assert_next_wait();
717 }
718
719 #[tokio::test]
720 async fn test_duplicate_reverse() {
721 let origin = Origin::produce();
722 let broadcast1 = Broadcast::produce();
723 let broadcast2 = Broadcast::produce();
724
725 origin.producer.publish_broadcast("test", broadcast1.consumer);
726 origin.producer.publish_broadcast("test", broadcast2.consumer);
727 assert!(origin.consumer.consume_broadcast("test").is_some());
728
729 drop(broadcast2.producer);
731
732 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
734 assert!(origin.consumer.consume_broadcast("test").is_some());
735
736 drop(broadcast1.producer);
737
738 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
740 assert!(origin.consumer.consume_broadcast("test").is_none());
741 }
742
743 #[tokio::test]
744 async fn test_double_publish() {
745 let origin = Origin::produce();
746 let broadcast = Broadcast::produce();
747
748 origin.producer.publish_broadcast("test", broadcast.producer.consume());
750 origin.producer.publish_broadcast("test", broadcast.producer.consume());
751
752 assert!(origin.consumer.consume_broadcast("test").is_some());
753
754 drop(broadcast.producer);
755
756 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
758 assert!(origin.consumer.consume_broadcast("test").is_none());
759 }
760 #[tokio::test]
762 #[should_panic]
763 async fn test_128() {
764 let mut origin = Origin::produce();
765 let broadcast = Broadcast::produce();
766
767 for i in 0..256 {
768 origin
769 .producer
770 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
771 }
772
773 for i in 0..256 {
774 origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
775 }
776 }
777
778 #[tokio::test]
779 async fn test_128_fix() {
780 let mut origin = Origin::produce();
781 let broadcast = Broadcast::produce();
782
783 for i in 0..256 {
784 origin
785 .producer
786 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
787 }
788
789 for i in 0..256 {
790 origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
792 }
793 }
794
795 #[tokio::test]
796 async fn test_with_root_basic() {
797 let mut origin = Origin::produce();
798 let broadcast = Broadcast::produce();
799
800 let foo_producer = origin.producer.with_root("foo").expect("should create root");
802 assert_eq!(foo_producer.root().as_str(), "foo");
803
804 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
806
807 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
809
810 let mut foo_consumer = foo_producer.consume();
812 foo_consumer.assert_next("bar/baz", &broadcast.consumer);
813 }
814
815 #[tokio::test]
816 async fn test_with_root_nested() {
817 let mut origin = Origin::produce();
818 let broadcast = Broadcast::produce();
819
820 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
822 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
823 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
824
825 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
827
828 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
830
831 let mut foo_bar_consumer = foo_bar_producer.consume();
833 foo_bar_consumer.assert_next("baz", &broadcast.consumer);
834 }
835
836 #[tokio::test]
837 async fn test_publish_only_allows() {
838 let origin = Origin::produce();
839 let broadcast = Broadcast::produce();
840
841 let limited_producer = origin
843 .producer
844 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
845 .expect("should create limited producer");
846
847 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
849 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
850 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
851
852 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
854 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
856 }
857
858 #[tokio::test]
859 async fn test_publish_only_empty() {
860 let origin = Origin::produce();
861
862 assert!(origin.producer.publish_only(&[]).is_none());
864 }
865
866 #[tokio::test]
867 async fn test_consume_only_filters() {
868 let mut origin = Origin::produce();
869 let broadcast1 = Broadcast::produce();
870 let broadcast2 = Broadcast::produce();
871 let broadcast3 = Broadcast::produce();
872
873 origin
875 .producer
876 .publish_broadcast("allowed", broadcast1.consumer.clone());
877 origin
878 .producer
879 .publish_broadcast("allowed/nested", broadcast2.consumer.clone());
880 origin
881 .producer
882 .publish_broadcast("notallowed", broadcast3.consumer.clone());
883
884 let mut limited_consumer = origin
886 .consumer
887 .consume_only(&["allowed".into()])
888 .expect("should create limited consumer");
889
890 limited_consumer.assert_next("allowed", &broadcast1.consumer);
892 limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
893 limited_consumer.assert_next_wait(); origin.consumer.assert_next("allowed", &broadcast1.consumer);
897 origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
898 origin.consumer.assert_next("notallowed", &broadcast3.consumer);
899 }
900
901 #[tokio::test]
902 async fn test_consume_only_multiple_prefixes() {
903 let origin = Origin::produce();
904 let broadcast1 = Broadcast::produce();
905 let broadcast2 = Broadcast::produce();
906 let broadcast3 = Broadcast::produce();
907
908 origin
909 .producer
910 .publish_broadcast("foo/test", broadcast1.consumer.clone());
911 origin
912 .producer
913 .publish_broadcast("bar/test", broadcast2.consumer.clone());
914 origin
915 .producer
916 .publish_broadcast("baz/test", broadcast3.consumer.clone());
917
918 let mut limited_consumer = origin
920 .consumer
921 .consume_only(&["foo".into(), "bar".into()])
922 .expect("should create limited consumer");
923
924 limited_consumer.assert_next("foo/test", &broadcast1.consumer);
925 limited_consumer.assert_next("bar/test", &broadcast2.consumer);
926 limited_consumer.assert_next_wait(); }
928
929 #[tokio::test]
930 async fn test_with_root_and_publish_only() {
931 let mut origin = Origin::produce();
932 let broadcast = Broadcast::produce();
933
934 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
936
937 let limited_producer = foo_producer
939 .publish_only(&["bar".into(), "goop/pee".into()])
940 .expect("should create limited producer");
941
942 assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
944 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
945 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
946 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
947
948 assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
950 assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
952
953 origin.consumer.assert_next("foo/bar", &broadcast.consumer);
955 origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
956 origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
957 origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
958 }
959
960 #[tokio::test]
961 async fn test_with_root_and_consume_only() {
962 let origin = Origin::produce();
963 let broadcast1 = Broadcast::produce();
964 let broadcast2 = Broadcast::produce();
965 let broadcast3 = Broadcast::produce();
966
967 origin
969 .producer
970 .publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
971 origin
972 .producer
973 .publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
974 origin
975 .producer
976 .publish_broadcast("foo/other/test", broadcast3.consumer.clone());
977
978 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
980
981 let mut limited_consumer = foo_producer
983 .consume_only(&["bar".into(), "goop/pee".into()])
984 .expect("should create limited consumer");
985
986 limited_consumer.assert_next("bar/test", &broadcast1.consumer);
988 limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
989 limited_consumer.assert_next_wait(); }
991
992 #[tokio::test]
993 async fn test_with_root_unauthorized() {
994 let origin = Origin::produce();
995
996 let limited_producer = origin
998 .producer
999 .publish_only(&["allowed".into()])
1000 .expect("should create limited producer");
1001
1002 assert!(limited_producer.with_root("notallowed").is_none());
1004
1005 let allowed_root = limited_producer
1007 .with_root("allowed")
1008 .expect("should create allowed root");
1009 assert_eq!(allowed_root.root().as_str(), "allowed");
1010 }
1011
1012 #[tokio::test]
1013 async fn test_wildcard_permission() {
1014 let origin = Origin::produce();
1015 let broadcast = Broadcast::produce();
1016
1017 let root_producer = origin.producer.clone();
1019
1020 assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1022 assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1023
1024 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1026 assert_eq!(foo_producer.root().as_str(), "foo");
1027 }
1028
1029 #[tokio::test]
1030 async fn test_consume_broadcast_with_permissions() {
1031 let origin = Origin::produce();
1032 let broadcast1 = Broadcast::produce();
1033 let broadcast2 = Broadcast::produce();
1034
1035 origin
1036 .producer
1037 .publish_broadcast("allowed/test", broadcast1.consumer.clone());
1038 origin
1039 .producer
1040 .publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1041
1042 let limited_consumer = origin
1044 .consumer
1045 .consume_only(&["allowed".into()])
1046 .expect("should create limited consumer");
1047
1048 let result = limited_consumer.consume_broadcast("allowed/test");
1050 assert!(result.is_some());
1051 assert!(result.unwrap().is_clone(&broadcast1.consumer));
1052
1053 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1055
1056 assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1058 assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1059 }
1060
1061 #[tokio::test]
1062 async fn test_nested_paths_with_permissions() {
1063 let origin = Origin::produce();
1064 let broadcast = Broadcast::produce();
1065
1066 let limited_producer = origin
1068 .producer
1069 .publish_only(&["a/b/c".into()])
1070 .expect("should create limited producer");
1071
1072 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1074 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1075 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1076
1077 assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1079 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1080 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1081 }
1082
1083 #[tokio::test]
1084 async fn test_multiple_consumers_with_different_permissions() {
1085 let origin = Origin::produce();
1086 let broadcast1 = Broadcast::produce();
1087 let broadcast2 = Broadcast::produce();
1088 let broadcast3 = Broadcast::produce();
1089
1090 origin
1092 .producer
1093 .publish_broadcast("foo/test", broadcast1.consumer.clone());
1094 origin
1095 .producer
1096 .publish_broadcast("bar/test", broadcast2.consumer.clone());
1097 origin
1098 .producer
1099 .publish_broadcast("baz/test", broadcast3.consumer.clone());
1100
1101 let mut foo_consumer = origin
1103 .consumer
1104 .consume_only(&["foo".into()])
1105 .expect("should create foo consumer");
1106
1107 let mut bar_consumer = origin
1108 .consumer
1109 .consume_only(&["bar".into()])
1110 .expect("should create bar consumer");
1111
1112 let mut foobar_consumer = origin
1113 .consumer
1114 .consume_only(&["foo".into(), "bar".into()])
1115 .expect("should create foobar consumer");
1116
1117 foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1119 foo_consumer.assert_next_wait();
1120
1121 bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1122 bar_consumer.assert_next_wait();
1123
1124 foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1125 foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1126 foobar_consumer.assert_next_wait();
1127 }
1128
1129 #[tokio::test]
1130 async fn test_select_with_empty_prefix() {
1131 let origin = Origin::produce();
1132 let broadcast1 = Broadcast::produce();
1133 let broadcast2 = Broadcast::produce();
1134
1135 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1137 let limited_producer = demo_producer
1138 .publish_only(&["worm-node".into(), "foobar".into()])
1139 .expect("should create limited producer");
1140
1141 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1143 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1144
1145 let mut consumer = limited_producer
1147 .consume_only(&["".into()])
1148 .expect("should create consumer with empty prefix");
1149
1150 consumer.assert_next("worm-node/test", &broadcast1.consumer);
1152 consumer.assert_next("foobar/test", &broadcast2.consumer);
1153 consumer.assert_next_wait();
1154 }
1155
1156 #[tokio::test]
1157 async fn test_select_narrowing_scope() {
1158 let origin = Origin::produce();
1159 let broadcast1 = Broadcast::produce();
1160 let broadcast2 = Broadcast::produce();
1161 let broadcast3 = Broadcast::produce();
1162
1163 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1165 let limited_producer = demo_producer
1166 .publish_only(&["worm-node".into(), "foobar".into()])
1167 .expect("should create limited producer");
1168
1169 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1171 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1172 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1173
1174 let mut worm_consumer = limited_producer
1176 .consume_only(&["worm-node".into()])
1177 .expect("should create worm-node consumer");
1178
1179 worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1181 worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1182 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1186 .consume_only(&["worm-node/foo".into()])
1187 .expect("should create worm-node/foo consumer");
1188
1189 foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1190 foo_consumer.assert_next_wait(); }
1192
1193 #[tokio::test]
1194 async fn test_select_multiple_roots_with_empty_prefix() {
1195 let origin = Origin::produce();
1196 let broadcast1 = Broadcast::produce();
1197 let broadcast2 = Broadcast::produce();
1198 let broadcast3 = Broadcast::produce();
1199
1200 let limited_producer = origin
1202 .producer
1203 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1204 .expect("should create limited producer");
1205
1206 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1208 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1209 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1210
1211 let mut consumer = limited_producer
1213 .consume_only(&["".into()])
1214 .expect("should create consumer with empty prefix");
1215
1216 consumer.assert_next("app1/data", &broadcast1.consumer);
1218 consumer.assert_next("app2/config", &broadcast2.consumer);
1219 consumer.assert_next("shared/resource", &broadcast3.consumer);
1220 consumer.assert_next_wait();
1221 }
1222
1223 #[tokio::test]
1224 async fn test_publish_only_with_empty_prefix() {
1225 let origin = Origin::produce();
1226 let broadcast = Broadcast::produce();
1227
1228 let limited_producer = origin
1230 .producer
1231 .publish_only(&["services/api".into(), "services/web".into()])
1232 .expect("should create limited producer");
1233
1234 let same_producer = limited_producer
1236 .publish_only(&["".into()])
1237 .expect("should create producer with empty prefix");
1238
1239 assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1241 assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1242 assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1243 assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1244 }
1245
1246 #[tokio::test]
1247 async fn test_select_narrowing_to_deeper_path() {
1248 let origin = Origin::produce();
1249 let broadcast1 = Broadcast::produce();
1250 let broadcast2 = Broadcast::produce();
1251 let broadcast3 = Broadcast::produce();
1252
1253 let limited_producer = origin
1255 .producer
1256 .publish_only(&["org".into()])
1257 .expect("should create limited producer");
1258
1259 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1261 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1262 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1263
1264 let mut team1_consumer = limited_producer
1266 .consume_only(&["org/team2".into()])
1267 .expect("should create team1 consumer");
1268
1269 team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1270 team1_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1274 .consume_only(&["org/team1/project1".into()])
1275 .expect("should create project1 consumer");
1276
1277 project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1279 project1_consumer.assert_next_wait();
1280 }
1281
1282 #[tokio::test]
1283 async fn test_select_with_non_matching_prefix() {
1284 let origin = Origin::produce();
1285
1286 let limited_producer = origin
1288 .producer
1289 .publish_only(&["allowed/path".into()])
1290 .expect("should create limited producer");
1291
1292 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1294
1295 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1297 }
1298
1299 #[tokio::test]
1300 async fn test_select_maintains_access_with_wider_prefix() {
1301 let origin = Origin::produce();
1302 let broadcast1 = Broadcast::produce();
1303 let broadcast2 = Broadcast::produce();
1304
1305 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1307 let user_producer = demo_producer
1308 .publish_only(&["worm-node".into(), "foobar".into()])
1309 .expect("should create user producer");
1310
1311 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1313 assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1314
1315 let mut consumer = user_producer
1317 .consume_only(&["".into()])
1318 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1319
1320 consumer.assert_next("worm-node/data", &broadcast1.consumer);
1322 consumer.assert_next("foobar", &broadcast2.consumer);
1323 consumer.assert_next_wait();
1324
1325 let mut narrow_consumer = user_producer
1327 .consume_only(&["worm-node".into()])
1328 .expect("should be able to narrow scope to worm-node");
1329
1330 narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1331 narrow_consumer.assert_next_wait(); }
1333}