1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125 }
126
127 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128 match self.nested.get(dir) {
129 Some(next) => next.clone(),
130 None => {
131 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132 self.nested.insert(dir.to_string(), next.clone());
133 next
134 }
135 }
136 }
137
138 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139 let full = full.as_path();
140 let rest = relative.as_path();
141
142 if let Some((dir, relative)) = rest.next_part() {
144 self.entry(dir).lock().publish(&full, broadcast, &relative);
146 } else if let Some(existing) = &mut self.broadcast {
147 let old = existing.active.clone();
149 existing.active = broadcast.clone();
150 existing.backup.push(old);
151
152 self.notify.lock().reannounce(full, broadcast);
153 } else {
154 self.broadcast = Some(OriginBroadcast {
156 path: full.to_owned(),
157 active: broadcast.clone(),
158 backup: Vec::new(),
159 });
160 self.notify.lock().announce(full, broadcast);
161 }
162 }
163
164 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165 self.consume_initial(&mut notify);
166 self.notify.lock().consumers.insert(id, notify);
167 }
168
169 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170 if let Some(broadcast) = &self.broadcast {
171 notify.announce(&broadcast.path, broadcast.active.clone());
172 }
173
174 for nested in self.nested.values() {
176 nested.lock().consume_initial(notify);
177 }
178 }
179
180 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181 let rest = rest.as_path();
182
183 if let Some((dir, rest)) = rest.next_part() {
184 let node = self.nested.get(dir)?.lock();
185 node.consume_broadcast(&rest)
186 } else {
187 self.broadcast.as_ref().map(|b| b.active.clone())
188 }
189 }
190
191 fn unconsume(&mut self, id: ConsumerId) {
192 self.notify.lock().consumers.remove(&id).expect("consumer not found");
193 if self.is_empty() {
194 }
197 }
198
199 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201 let full = full.as_path();
202 let relative = relative.as_path();
203
204 if let Some((dir, relative)) = relative.next_part() {
205 let nested = self.entry(dir);
206 let mut locked = nested.lock();
207 locked.remove(&full, broadcast, &relative);
208
209 if locked.is_empty() {
210 drop(locked);
211 self.nested.remove(dir);
212 }
213 } else {
214 let entry = match &mut self.broadcast {
215 Some(existing) => existing,
216 None => return,
217 };
218
219 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221 if let Some(pos) = pos {
222 entry.backup.remove(pos);
223 return;
225 }
226
227 assert!(entry.active.is_clone(&broadcast));
229
230 if let Some(active) = entry.backup.pop() {
232 entry.active = active;
233 self.notify.lock().reannounce(full, &entry.active);
234 } else {
235 self.broadcast = None;
237 self.notify.lock().unannounce(full);
238 }
239 }
240 }
241
242 fn is_empty(&self) -> bool {
243 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244 }
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253 pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
256 let mut roots = Vec::new();
257
258 for (root, state) in &self.nodes {
259 for prefix in prefixes {
260 if root.has_prefix(prefix) {
261 roots.push((root.to_owned(), state.clone()));
263 continue;
264 }
265
266 if let Some(suffix) = prefix.strip_prefix(root) {
267 let nested = state.lock().leaf(&suffix);
269 roots.push((prefix.to_owned(), nested));
270 }
271 }
272 }
273
274 if roots.is_empty() {
275 None
276 } else {
277 Some(Self { nodes: roots })
278 }
279 }
280
281 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282 let new_root = new_root.as_path();
283 let mut roots = Vec::new();
284
285 if new_root.is_empty() {
286 return Some(self.clone());
287 }
288
289 for (root, state) in &self.nodes {
290 if let Some(suffix) = root.strip_prefix(&new_root) {
291 roots.push((suffix.to_owned(), state.clone()));
293 } else if let Some(suffix) = new_root.strip_prefix(root) {
294 let nested = state.lock().leaf(&suffix);
297 roots.push(("".into(), nested));
298 }
299 }
300
301 if roots.is_empty() {
302 None
303 } else {
304 Some(Self { nodes: roots })
305 }
306 }
307
308 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310 let path = path.as_path();
311
312 for (root, state) in &self.nodes {
313 if let Some(suffix) = path.strip_prefix(root) {
314 return Some((state.clone(), suffix.to_owned()));
315 }
316 }
317
318 None
319 }
320}
321
322impl Default for OriginNodes {
323 fn default() -> Self {
324 Self {
325 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326 }
327 }
328}
329
330pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333pub struct Origin {}
335
336impl Origin {
337 pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
338 let producer = OriginProducer::default();
339 let consumer = producer.consume();
340 Produce { producer, consumer }
341 }
342}
343
344#[derive(Clone, Default)]
346pub struct OriginProducer {
347 nodes: OriginNodes,
350
351 root: PathOwned,
353}
354
355impl OriginProducer {
356 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
361 let broadcast = Broadcast::produce();
362 self.publish_broadcast(path, broadcast.consumer)
363 .then_some(broadcast.producer)
364 }
365
366 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
375 let path = path.as_path();
376
377 let (root, rest) = match self.nodes.get(&path) {
378 Some(root) => root,
379 None => return false,
380 };
381
382 let full = self.root.join(&path);
383
384 root.lock().publish(&full, &broadcast, &rest);
385 let root = root.clone();
386
387 web_async::spawn(async move {
388 broadcast.closed().await;
389 root.lock().remove(&full, broadcast, &rest);
390 });
391
392 true
393 }
394
395 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
399 Some(OriginProducer {
400 nodes: self.nodes.select(prefixes)?,
401 root: self.root.clone(),
402 })
403 }
404
405 pub fn consume(&self) -> OriginConsumer {
407 OriginConsumer::new(self.root.clone(), self.nodes.clone())
408 }
409
410 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
416 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
417 }
418
419 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
423 let prefix = prefix.as_path();
424
425 Some(Self {
426 root: self.root.join(&prefix).to_owned(),
427 nodes: self.nodes.root(&prefix)?,
428 })
429 }
430
431 pub fn root(&self) -> &Path<'_> {
433 &self.root
434 }
435
436 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
437 self.nodes.nodes.iter().map(|(root, _)| root)
438 }
439
440 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
442 self.root.join(path)
443 }
444}
445
446pub struct OriginConsumer {
450 id: ConsumerId,
451 nodes: OriginNodes,
452 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
453
454 root: PathOwned,
456}
457
458impl OriginConsumer {
459 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
460 let (tx, rx) = mpsc::unbounded_channel();
461
462 let id = ConsumerId::new();
463
464 for (_, state) in &nodes.nodes {
465 let notify = OriginConsumerNotify {
466 root: root.clone(),
467 tx: tx.clone(),
468 };
469 state.lock().consume(id, notify);
470 }
471
472 Self {
473 id,
474 nodes,
475 updates: rx,
476 root,
477 }
478 }
479
480 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
488 self.updates.recv().await
489 }
490
491 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
496 self.updates.try_recv().ok()
497 }
498
499 pub fn consume(&self) -> Self {
500 self.clone()
501 }
502
503 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
509 let path = path.as_path();
510 let (root, rest) = self.nodes.get(&path)?;
511 let state = root.lock();
512 state.consume_broadcast(&rest)
513 }
514
515 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
519 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
520 }
521
522 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
526 let prefix = prefix.as_path();
527
528 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
529 }
530
531 pub fn root(&self) -> &Path<'_> {
533 &self.root
534 }
535
536 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
537 self.nodes.nodes.iter().map(|(root, _)| root)
538 }
539
540 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
542 self.root.join(path)
543 }
544}
545
546impl Drop for OriginConsumer {
547 fn drop(&mut self) {
548 for (_, root) in &self.nodes.nodes {
549 root.lock().unconsume(self.id);
550 }
551 }
552}
553
554impl Clone for OriginConsumer {
555 fn clone(&self) -> Self {
556 OriginConsumer::new(self.root.clone(), self.nodes.clone())
557 }
558}
559
560#[cfg(test)]
561use futures::FutureExt;
562
563#[cfg(test)]
564impl OriginConsumer {
565 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
566 let expected = expected.as_path();
567 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
568 assert_eq!(path, expected, "wrong path");
569 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
570 }
571
572 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
573 let expected = expected.as_path();
574 let (path, active) = self.try_announced().expect("no next");
575 assert_eq!(path, expected, "wrong path");
576 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
577 }
578
579 pub fn assert_next_none(&mut self, expected: impl AsPath) {
580 let expected = expected.as_path();
581 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
582 assert_eq!(path, expected, "wrong path");
583 assert!(active.is_none(), "should be unannounced");
584 }
585
586 pub fn assert_next_wait(&mut self) {
587 if let Some(res) = self.announced().now_or_never() {
588 panic!("next should block: got {:?}", res.map(|(path, _)| path));
589 }
590 }
591
592 }
601
602#[cfg(test)]
603mod tests {
604 use crate::Broadcast;
605
606 use super::*;
607
608 #[tokio::test]
609 async fn test_announce() {
610 let origin = Origin::produce();
611 let broadcast1 = Broadcast::produce();
612 let broadcast2 = Broadcast::produce();
613
614 let mut consumer1 = origin.consumer;
615 consumer1.assert_next_wait();
617
618 origin.producer.publish_broadcast("test1", broadcast1.consumer);
620
621 consumer1.assert_next("test1", &broadcast1.producer.consume());
622 consumer1.assert_next_wait();
623
624 let mut consumer2 = origin.producer.consume();
627
628 origin.producer.publish_broadcast("test2", broadcast2.consumer);
630
631 consumer1.assert_next("test2", &broadcast2.producer.consume());
632 consumer1.assert_next_wait();
633
634 consumer2.assert_next("test1", &broadcast1.producer.consume());
635 consumer2.assert_next("test2", &broadcast2.producer.consume());
636 consumer2.assert_next_wait();
637
638 drop(broadcast1.producer);
640
641 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
643
644 consumer1.assert_next_none("test1");
646 consumer2.assert_next_none("test1");
647 consumer1.assert_next_wait();
648 consumer2.assert_next_wait();
649
650 let mut consumer3 = origin.producer.consume();
652 consumer3.assert_next("test2", &broadcast2.producer.consume());
653 consumer3.assert_next_wait();
654
655 drop(broadcast2.producer);
657
658 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
660
661 consumer1.assert_next_none("test2");
662 consumer2.assert_next_none("test2");
663 consumer3.assert_next_none("test2");
664
665 }
671
672 #[tokio::test]
673 async fn test_duplicate() {
674 let mut origin = Origin::produce();
675
676 let broadcast1 = Broadcast::produce();
677 let broadcast2 = Broadcast::produce();
678 let broadcast3 = Broadcast::produce();
679
680 let consumer1 = broadcast1.consumer;
681 let consumer2 = broadcast2.consumer;
682 let consumer3 = broadcast3.consumer;
683
684 origin.producer.publish_broadcast("test", consumer1.clone());
685 origin.producer.publish_broadcast("test", consumer2.clone());
686 origin.producer.publish_broadcast("test", consumer3.clone());
687
688 assert!(origin.consumer.consume_broadcast("test").is_some());
689
690 origin.consumer.assert_next("test", &consumer1);
691 origin.consumer.assert_next_none("test");
692 origin.consumer.assert_next("test", &consumer2);
693 origin.consumer.assert_next_none("test");
694 origin.consumer.assert_next("test", &consumer3);
695
696 drop(broadcast2.producer);
698
699 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
701
702 assert!(origin.consumer.consume_broadcast("test").is_some());
703 origin.consumer.assert_next_wait();
704
705 drop(broadcast3.producer);
707
708 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
710
711 assert!(origin.consumer.consume_broadcast("test").is_some());
712 origin.consumer.assert_next_none("test");
713 origin.consumer.assert_next("test", &consumer1);
714
715 drop(broadcast1.producer);
717
718 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
720 assert!(origin.consumer.consume_broadcast("test").is_none());
721
722 origin.consumer.assert_next_none("test");
723 origin.consumer.assert_next_wait();
724 }
725
726 #[tokio::test]
727 async fn test_duplicate_reverse() {
728 let origin = Origin::produce();
729 let broadcast1 = Broadcast::produce();
730 let broadcast2 = Broadcast::produce();
731
732 origin.producer.publish_broadcast("test", broadcast1.consumer);
733 origin.producer.publish_broadcast("test", broadcast2.consumer);
734 assert!(origin.consumer.consume_broadcast("test").is_some());
735
736 drop(broadcast2.producer);
738
739 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
741 assert!(origin.consumer.consume_broadcast("test").is_some());
742
743 drop(broadcast1.producer);
744
745 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
747 assert!(origin.consumer.consume_broadcast("test").is_none());
748 }
749
750 #[tokio::test]
751 async fn test_double_publish() {
752 let origin = Origin::produce();
753 let broadcast = Broadcast::produce();
754
755 origin.producer.publish_broadcast("test", broadcast.producer.consume());
757 origin.producer.publish_broadcast("test", broadcast.producer.consume());
758
759 assert!(origin.consumer.consume_broadcast("test").is_some());
760
761 drop(broadcast.producer);
762
763 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
765 assert!(origin.consumer.consume_broadcast("test").is_none());
766 }
767 #[tokio::test]
769 #[should_panic]
770 async fn test_128() {
771 let mut origin = Origin::produce();
772 let broadcast = Broadcast::produce();
773
774 for i in 0..256 {
775 origin
776 .producer
777 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
778 }
779
780 for i in 0..256 {
781 origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
782 }
783 }
784
785 #[tokio::test]
786 async fn test_128_fix() {
787 let mut origin = Origin::produce();
788 let broadcast = Broadcast::produce();
789
790 for i in 0..256 {
791 origin
792 .producer
793 .publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
794 }
795
796 for i in 0..256 {
797 origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
799 }
800 }
801
802 #[tokio::test]
803 async fn test_with_root_basic() {
804 let mut origin = Origin::produce();
805 let broadcast = Broadcast::produce();
806
807 let foo_producer = origin.producer.with_root("foo").expect("should create root");
809 assert_eq!(foo_producer.root().as_str(), "foo");
810
811 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
813
814 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
816
817 let mut foo_consumer = foo_producer.consume();
819 foo_consumer.assert_next("bar/baz", &broadcast.consumer);
820 }
821
822 #[tokio::test]
823 async fn test_with_root_nested() {
824 let mut origin = Origin::produce();
825 let broadcast = Broadcast::produce();
826
827 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
829 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
830 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
831
832 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
834
835 origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
837
838 let mut foo_bar_consumer = foo_bar_producer.consume();
840 foo_bar_consumer.assert_next("baz", &broadcast.consumer);
841 }
842
843 #[tokio::test]
844 async fn test_publish_only_allows() {
845 let origin = Origin::produce();
846 let broadcast = Broadcast::produce();
847
848 let limited_producer = origin
850 .producer
851 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
852 .expect("should create limited producer");
853
854 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
856 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
857 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
858
859 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
861 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
863 }
864
865 #[tokio::test]
866 async fn test_publish_only_empty() {
867 let origin = Origin::produce();
868
869 assert!(origin.producer.publish_only(&[]).is_none());
871 }
872
873 #[tokio::test]
874 async fn test_consume_only_filters() {
875 let mut origin = Origin::produce();
876 let broadcast1 = Broadcast::produce();
877 let broadcast2 = Broadcast::produce();
878 let broadcast3 = Broadcast::produce();
879
880 origin
882 .producer
883 .publish_broadcast("allowed", broadcast1.consumer.clone());
884 origin
885 .producer
886 .publish_broadcast("allowed/nested", broadcast2.consumer.clone());
887 origin
888 .producer
889 .publish_broadcast("notallowed", broadcast3.consumer.clone());
890
891 let mut limited_consumer = origin
893 .consumer
894 .consume_only(&["allowed".into()])
895 .expect("should create limited consumer");
896
897 limited_consumer.assert_next("allowed", &broadcast1.consumer);
899 limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
900 limited_consumer.assert_next_wait(); origin.consumer.assert_next("allowed", &broadcast1.consumer);
904 origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
905 origin.consumer.assert_next("notallowed", &broadcast3.consumer);
906 }
907
908 #[tokio::test]
909 async fn test_consume_only_multiple_prefixes() {
910 let origin = Origin::produce();
911 let broadcast1 = Broadcast::produce();
912 let broadcast2 = Broadcast::produce();
913 let broadcast3 = Broadcast::produce();
914
915 origin
916 .producer
917 .publish_broadcast("foo/test", broadcast1.consumer.clone());
918 origin
919 .producer
920 .publish_broadcast("bar/test", broadcast2.consumer.clone());
921 origin
922 .producer
923 .publish_broadcast("baz/test", broadcast3.consumer.clone());
924
925 let mut limited_consumer = origin
927 .consumer
928 .consume_only(&["foo".into(), "bar".into()])
929 .expect("should create limited consumer");
930
931 limited_consumer.assert_next("foo/test", &broadcast1.consumer);
932 limited_consumer.assert_next("bar/test", &broadcast2.consumer);
933 limited_consumer.assert_next_wait(); }
935
936 #[tokio::test]
937 async fn test_with_root_and_publish_only() {
938 let mut origin = Origin::produce();
939 let broadcast = Broadcast::produce();
940
941 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
943
944 let limited_producer = foo_producer
946 .publish_only(&["bar".into(), "goop/pee".into()])
947 .expect("should create limited producer");
948
949 assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
951 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
952 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
953 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
954
955 assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
957 assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
959
960 origin.consumer.assert_next("foo/bar", &broadcast.consumer);
962 origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
963 origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
964 origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
965 }
966
967 #[tokio::test]
968 async fn test_with_root_and_consume_only() {
969 let origin = Origin::produce();
970 let broadcast1 = Broadcast::produce();
971 let broadcast2 = Broadcast::produce();
972 let broadcast3 = Broadcast::produce();
973
974 origin
976 .producer
977 .publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
978 origin
979 .producer
980 .publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
981 origin
982 .producer
983 .publish_broadcast("foo/other/test", broadcast3.consumer.clone());
984
985 let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
987
988 let mut limited_consumer = foo_producer
990 .consume_only(&["bar".into(), "goop/pee".into()])
991 .expect("should create limited consumer");
992
993 limited_consumer.assert_next("bar/test", &broadcast1.consumer);
995 limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
996 limited_consumer.assert_next_wait(); }
998
999 #[tokio::test]
1000 async fn test_with_root_unauthorized() {
1001 let origin = Origin::produce();
1002
1003 let limited_producer = origin
1005 .producer
1006 .publish_only(&["allowed".into()])
1007 .expect("should create limited producer");
1008
1009 assert!(limited_producer.with_root("notallowed").is_none());
1011
1012 let allowed_root = limited_producer
1014 .with_root("allowed")
1015 .expect("should create allowed root");
1016 assert_eq!(allowed_root.root().as_str(), "allowed");
1017 }
1018
1019 #[tokio::test]
1020 async fn test_wildcard_permission() {
1021 let origin = Origin::produce();
1022 let broadcast = Broadcast::produce();
1023
1024 let root_producer = origin.producer.clone();
1026
1027 assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1029 assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1030
1031 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1033 assert_eq!(foo_producer.root().as_str(), "foo");
1034 }
1035
1036 #[tokio::test]
1037 async fn test_consume_broadcast_with_permissions() {
1038 let origin = Origin::produce();
1039 let broadcast1 = Broadcast::produce();
1040 let broadcast2 = Broadcast::produce();
1041
1042 origin
1043 .producer
1044 .publish_broadcast("allowed/test", broadcast1.consumer.clone());
1045 origin
1046 .producer
1047 .publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1048
1049 let limited_consumer = origin
1051 .consumer
1052 .consume_only(&["allowed".into()])
1053 .expect("should create limited consumer");
1054
1055 let result = limited_consumer.consume_broadcast("allowed/test");
1057 assert!(result.is_some());
1058 assert!(result.unwrap().is_clone(&broadcast1.consumer));
1059
1060 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1062
1063 assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1065 assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1066 }
1067
1068 #[tokio::test]
1069 async fn test_nested_paths_with_permissions() {
1070 let origin = Origin::produce();
1071 let broadcast = Broadcast::produce();
1072
1073 let limited_producer = origin
1075 .producer
1076 .publish_only(&["a/b/c".into()])
1077 .expect("should create limited producer");
1078
1079 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1081 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1082 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1083
1084 assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1086 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1087 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1088 }
1089
1090 #[tokio::test]
1091 async fn test_multiple_consumers_with_different_permissions() {
1092 let origin = Origin::produce();
1093 let broadcast1 = Broadcast::produce();
1094 let broadcast2 = Broadcast::produce();
1095 let broadcast3 = Broadcast::produce();
1096
1097 origin
1099 .producer
1100 .publish_broadcast("foo/test", broadcast1.consumer.clone());
1101 origin
1102 .producer
1103 .publish_broadcast("bar/test", broadcast2.consumer.clone());
1104 origin
1105 .producer
1106 .publish_broadcast("baz/test", broadcast3.consumer.clone());
1107
1108 let mut foo_consumer = origin
1110 .consumer
1111 .consume_only(&["foo".into()])
1112 .expect("should create foo consumer");
1113
1114 let mut bar_consumer = origin
1115 .consumer
1116 .consume_only(&["bar".into()])
1117 .expect("should create bar consumer");
1118
1119 let mut foobar_consumer = origin
1120 .consumer
1121 .consume_only(&["foo".into(), "bar".into()])
1122 .expect("should create foobar consumer");
1123
1124 foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1126 foo_consumer.assert_next_wait();
1127
1128 bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1129 bar_consumer.assert_next_wait();
1130
1131 foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1132 foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1133 foobar_consumer.assert_next_wait();
1134 }
1135
1136 #[tokio::test]
1137 async fn test_select_with_empty_prefix() {
1138 let origin = Origin::produce();
1139 let broadcast1 = Broadcast::produce();
1140 let broadcast2 = Broadcast::produce();
1141
1142 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1144 let limited_producer = demo_producer
1145 .publish_only(&["worm-node".into(), "foobar".into()])
1146 .expect("should create limited producer");
1147
1148 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1150 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1151
1152 let mut consumer = limited_producer
1154 .consume_only(&["".into()])
1155 .expect("should create consumer with empty prefix");
1156
1157 consumer.assert_next("worm-node/test", &broadcast1.consumer);
1159 consumer.assert_next("foobar/test", &broadcast2.consumer);
1160 consumer.assert_next_wait();
1161 }
1162
1163 #[tokio::test]
1164 async fn test_select_narrowing_scope() {
1165 let origin = Origin::produce();
1166 let broadcast1 = Broadcast::produce();
1167 let broadcast2 = Broadcast::produce();
1168 let broadcast3 = Broadcast::produce();
1169
1170 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1172 let limited_producer = demo_producer
1173 .publish_only(&["worm-node".into(), "foobar".into()])
1174 .expect("should create limited producer");
1175
1176 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1178 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1179 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1180
1181 let mut worm_consumer = limited_producer
1183 .consume_only(&["worm-node".into()])
1184 .expect("should create worm-node consumer");
1185
1186 worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1188 worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1189 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1193 .consume_only(&["worm-node/foo".into()])
1194 .expect("should create worm-node/foo consumer");
1195
1196 foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1197 foo_consumer.assert_next_wait(); }
1199
1200 #[tokio::test]
1201 async fn test_select_multiple_roots_with_empty_prefix() {
1202 let origin = Origin::produce();
1203 let broadcast1 = Broadcast::produce();
1204 let broadcast2 = Broadcast::produce();
1205 let broadcast3 = Broadcast::produce();
1206
1207 let limited_producer = origin
1209 .producer
1210 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1211 .expect("should create limited producer");
1212
1213 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1215 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1216 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1217
1218 let mut consumer = limited_producer
1220 .consume_only(&["".into()])
1221 .expect("should create consumer with empty prefix");
1222
1223 consumer.assert_next("app1/data", &broadcast1.consumer);
1225 consumer.assert_next("app2/config", &broadcast2.consumer);
1226 consumer.assert_next("shared/resource", &broadcast3.consumer);
1227 consumer.assert_next_wait();
1228 }
1229
1230 #[tokio::test]
1231 async fn test_publish_only_with_empty_prefix() {
1232 let origin = Origin::produce();
1233 let broadcast = Broadcast::produce();
1234
1235 let limited_producer = origin
1237 .producer
1238 .publish_only(&["services/api".into(), "services/web".into()])
1239 .expect("should create limited producer");
1240
1241 let same_producer = limited_producer
1243 .publish_only(&["".into()])
1244 .expect("should create producer with empty prefix");
1245
1246 assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1248 assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1249 assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1250 assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1251 }
1252
1253 #[tokio::test]
1254 async fn test_select_narrowing_to_deeper_path() {
1255 let origin = Origin::produce();
1256 let broadcast1 = Broadcast::produce();
1257 let broadcast2 = Broadcast::produce();
1258 let broadcast3 = Broadcast::produce();
1259
1260 let limited_producer = origin
1262 .producer
1263 .publish_only(&["org".into()])
1264 .expect("should create limited producer");
1265
1266 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1268 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1269 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1270
1271 let mut team1_consumer = limited_producer
1273 .consume_only(&["org/team2".into()])
1274 .expect("should create team1 consumer");
1275
1276 team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1277 team1_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1281 .consume_only(&["org/team1/project1".into()])
1282 .expect("should create project1 consumer");
1283
1284 project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1286 project1_consumer.assert_next_wait();
1287 }
1288
1289 #[tokio::test]
1290 async fn test_select_with_non_matching_prefix() {
1291 let origin = Origin::produce();
1292
1293 let limited_producer = origin
1295 .producer
1296 .publish_only(&["allowed/path".into()])
1297 .expect("should create limited producer");
1298
1299 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1301
1302 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1304 }
1305
1306 #[tokio::test]
1307 async fn test_select_maintains_access_with_wider_prefix() {
1308 let origin = Origin::produce();
1309 let broadcast1 = Broadcast::produce();
1310 let broadcast2 = Broadcast::produce();
1311
1312 let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1314 let user_producer = demo_producer
1315 .publish_only(&["worm-node".into(), "foobar".into()])
1316 .expect("should create user producer");
1317
1318 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1320 assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1321
1322 let mut consumer = user_producer
1324 .consume_only(&["".into()])
1325 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1326
1327 consumer.assert_next("worm-node/data", &broadcast1.consumer);
1329 consumer.assert_next("foobar", &broadcast2.consumer);
1330 consumer.assert_next_wait();
1331
1332 let mut narrow_consumer = user_producer
1334 .consume_only(&["worm-node".into()])
1335 .expect("should be able to narrow scope to worm-node");
1336
1337 narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1338 narrow_consumer.assert_next_wait(); }
1340}