1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125 }
126
127 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128 match self.nested.get(dir) {
129 Some(next) => next.clone(),
130 None => {
131 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132 self.nested.insert(dir.to_string(), next.clone());
133 next
134 }
135 }
136 }
137
138 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139 let full = full.as_path();
140 let rest = relative.as_path();
141
142 if let Some((dir, relative)) = rest.next_part() {
144 self.entry(dir).lock().publish(&full, broadcast, &relative);
146 } else if let Some(existing) = &mut self.broadcast {
147 let old = existing.active.clone();
149 existing.active = broadcast.clone();
150 existing.backup.push(old);
151
152 self.notify.lock().reannounce(full, broadcast);
153 } else {
154 self.broadcast = Some(OriginBroadcast {
156 path: full.to_owned(),
157 active: broadcast.clone(),
158 backup: Vec::new(),
159 });
160 self.notify.lock().announce(full, broadcast);
161 }
162 }
163
164 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165 self.consume_initial(&mut notify);
166 self.notify.lock().consumers.insert(id, notify);
167 }
168
169 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170 if let Some(broadcast) = &self.broadcast {
171 notify.announce(&broadcast.path, broadcast.active.clone());
172 }
173
174 for nested in self.nested.values() {
176 nested.lock().consume_initial(notify);
177 }
178 }
179
180 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181 let rest = rest.as_path();
182
183 if let Some((dir, rest)) = rest.next_part() {
184 let node = self.nested.get(dir)?.lock();
185 node.consume_broadcast(&rest)
186 } else {
187 self.broadcast.as_ref().map(|b| b.active.clone())
188 }
189 }
190
191 fn unconsume(&mut self, id: ConsumerId) {
192 self.notify.lock().consumers.remove(&id).expect("consumer not found");
193 if self.is_empty() {
194 }
197 }
198
199 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201 let full = full.as_path();
202 let relative = relative.as_path();
203
204 if let Some((dir, relative)) = relative.next_part() {
205 let nested = self.entry(dir);
206 let mut locked = nested.lock();
207 locked.remove(&full, broadcast, &relative);
208
209 if locked.is_empty() {
210 drop(locked);
211 self.nested.remove(dir);
212 }
213 } else {
214 let entry = match &mut self.broadcast {
215 Some(existing) => existing,
216 None => return,
217 };
218
219 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221 if let Some(pos) = pos {
222 entry.backup.remove(pos);
223 return;
225 }
226
227 assert!(entry.active.is_clone(&broadcast));
229
230 if let Some(active) = entry.backup.pop() {
232 entry.active = active;
233 self.notify.lock().reannounce(full, &entry.active);
234 } else {
235 self.broadcast = None;
237 self.notify.lock().unannounce(full);
238 }
239 }
240 }
241
242 fn is_empty(&self) -> bool {
243 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244 }
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253 pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
256 let mut roots = Vec::new();
257
258 for (root, state) in &self.nodes {
259 for prefix in prefixes {
260 if root.has_prefix(prefix) {
261 roots.push((root.to_owned(), state.clone()));
263 continue;
264 }
265
266 if let Some(suffix) = prefix.strip_prefix(root) {
267 let nested = state.lock().leaf(&suffix);
269 roots.push((prefix.to_owned(), nested));
270 }
271 }
272 }
273
274 if roots.is_empty() {
275 None
276 } else {
277 Some(Self { nodes: roots })
278 }
279 }
280
281 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282 let new_root = new_root.as_path();
283 let mut roots = Vec::new();
284
285 if new_root.is_empty() {
286 return Some(self.clone());
287 }
288
289 for (root, state) in &self.nodes {
290 if let Some(suffix) = root.strip_prefix(&new_root) {
291 roots.push((suffix.to_owned(), state.clone()));
293 } else if let Some(suffix) = new_root.strip_prefix(root) {
294 let nested = state.lock().leaf(&suffix);
297 roots.push(("".into(), nested));
298 }
299 }
300
301 if roots.is_empty() {
302 None
303 } else {
304 Some(Self { nodes: roots })
305 }
306 }
307
308 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310 let path = path.as_path();
311
312 for (root, state) in &self.nodes {
313 if let Some(suffix) = path.strip_prefix(root) {
314 return Some((state.clone(), suffix.to_owned()));
315 }
316 }
317
318 None
319 }
320}
321
322impl Default for OriginNodes {
323 fn default() -> Self {
324 Self {
325 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326 }
327 }
328}
329
330pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333pub struct Origin {}
335
336impl Origin {
337 pub fn produce() -> OriginProducer {
338 OriginProducer::new()
339 }
340}
341
342#[derive(Clone, Default)]
344pub struct OriginProducer {
345 nodes: OriginNodes,
348
349 root: PathOwned,
351}
352
353impl OriginProducer {
354 pub fn new() -> Self {
355 Self::default()
356 }
357
358 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
363 let broadcast = Broadcast::produce();
364 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
365 }
366
367 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
376 let path = path.as_path();
377
378 let (root, rest) = match self.nodes.get(&path) {
379 Some(root) => root,
380 None => return false,
381 };
382
383 let full = self.root.join(&path);
384
385 root.lock().publish(&full, &broadcast, &rest);
386 let root = root.clone();
387
388 web_async::spawn(async move {
389 broadcast.closed().await;
390 root.lock().remove(&full, broadcast, &rest);
391 });
392
393 true
394 }
395
396 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
400 Some(OriginProducer {
401 nodes: self.nodes.select(prefixes)?,
402 root: self.root.clone(),
403 })
404 }
405
406 pub fn consume(&self) -> OriginConsumer {
408 OriginConsumer::new(self.root.clone(), self.nodes.clone())
409 }
410
411 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
417 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
418 }
419
420 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
424 let path = path.as_path();
425 let (root, rest) = self.nodes.get(&path)?;
426 let state = root.lock();
427 state.consume_broadcast(&rest)
428 }
429
430 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
434 let prefix = prefix.as_path();
435
436 Some(Self {
437 root: self.root.join(&prefix).to_owned(),
438 nodes: self.nodes.root(&prefix)?,
439 })
440 }
441
442 pub fn root(&self) -> &Path<'_> {
444 &self.root
445 }
446
447 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
448 self.nodes.nodes.iter().map(|(root, _)| root)
449 }
450
451 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
453 self.root.join(path)
454 }
455}
456
457pub struct OriginConsumer {
461 id: ConsumerId,
462 nodes: OriginNodes,
463 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
464
465 root: PathOwned,
467}
468
469impl OriginConsumer {
470 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
471 let (tx, rx) = mpsc::unbounded_channel();
472
473 let id = ConsumerId::new();
474
475 for (_, state) in &nodes.nodes {
476 let notify = OriginConsumerNotify {
477 root: root.clone(),
478 tx: tx.clone(),
479 };
480 state.lock().consume(id, notify);
481 }
482
483 Self {
484 id,
485 nodes,
486 updates: rx,
487 root,
488 }
489 }
490
491 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
499 self.updates.recv().await
500 }
501
502 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
507 self.updates.try_recv().ok()
508 }
509
510 pub fn consume(&self) -> Self {
511 self.clone()
512 }
513
514 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
520 let path = path.as_path();
521 let (root, rest) = self.nodes.get(&path)?;
522 let state = root.lock();
523 state.consume_broadcast(&rest)
524 }
525
526 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
530 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
531 }
532
533 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
537 let prefix = prefix.as_path();
538
539 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
540 }
541
542 pub fn root(&self) -> &Path<'_> {
544 &self.root
545 }
546
547 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
548 self.nodes.nodes.iter().map(|(root, _)| root)
549 }
550
551 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
553 self.root.join(path)
554 }
555}
556
557impl Drop for OriginConsumer {
558 fn drop(&mut self) {
559 for (_, root) in &self.nodes.nodes {
560 root.lock().unconsume(self.id);
561 }
562 }
563}
564
565impl Clone for OriginConsumer {
566 fn clone(&self) -> Self {
567 OriginConsumer::new(self.root.clone(), self.nodes.clone())
568 }
569}
570
571#[cfg(test)]
572use futures::FutureExt;
573
574#[cfg(test)]
575impl OriginConsumer {
576 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
577 let expected = expected.as_path();
578 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
579 assert_eq!(path, expected, "wrong path");
580 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
581 }
582
583 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
584 let expected = expected.as_path();
585 let (path, active) = self.try_announced().expect("no next");
586 assert_eq!(path, expected, "wrong path");
587 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
588 }
589
590 pub fn assert_next_none(&mut self, expected: impl AsPath) {
591 let expected = expected.as_path();
592 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
593 assert_eq!(path, expected, "wrong path");
594 assert!(active.is_none(), "should be unannounced");
595 }
596
597 pub fn assert_next_wait(&mut self) {
598 if let Some(res) = self.announced().now_or_never() {
599 panic!("next should block: got {:?}", res.map(|(path, _)| path));
600 }
601 }
602
603 }
612
613#[cfg(test)]
614mod tests {
615 use crate::Broadcast;
616
617 use super::*;
618
619 #[tokio::test]
620 async fn test_announce() {
621 let origin = Origin::produce();
622 let broadcast1 = Broadcast::produce();
623 let broadcast2 = Broadcast::produce();
624
625 let mut consumer1 = origin.consume();
626 consumer1.assert_next_wait();
628
629 origin.publish_broadcast("test1", broadcast1.consume());
631
632 consumer1.assert_next("test1", &broadcast1.consume());
633 consumer1.assert_next_wait();
634
635 let mut consumer2 = origin.consume();
638
639 origin.publish_broadcast("test2", broadcast2.consume());
641
642 consumer1.assert_next("test2", &broadcast2.consume());
643 consumer1.assert_next_wait();
644
645 consumer2.assert_next("test1", &broadcast1.consume());
646 consumer2.assert_next("test2", &broadcast2.consume());
647 consumer2.assert_next_wait();
648
649 drop(broadcast1);
651
652 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
654
655 consumer1.assert_next_none("test1");
657 consumer2.assert_next_none("test1");
658 consumer1.assert_next_wait();
659 consumer2.assert_next_wait();
660
661 let mut consumer3 = origin.consume();
663 consumer3.assert_next("test2", &broadcast2.consume());
664 consumer3.assert_next_wait();
665
666 drop(broadcast2);
668
669 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
671
672 consumer1.assert_next_none("test2");
673 consumer2.assert_next_none("test2");
674 consumer3.assert_next_none("test2");
675
676 }
682
683 #[tokio::test]
684 async fn test_duplicate() {
685 let origin = Origin::produce();
686
687 let broadcast1 = Broadcast::produce();
688 let broadcast2 = Broadcast::produce();
689 let broadcast3 = Broadcast::produce();
690
691 let consumer1 = broadcast1.consume();
692 let consumer2 = broadcast2.consume();
693 let consumer3 = broadcast3.consume();
694
695 let mut consumer = origin.consume();
696
697 origin.publish_broadcast("test", consumer1.clone());
698 origin.publish_broadcast("test", consumer2.clone());
699 origin.publish_broadcast("test", consumer3.clone());
700 assert!(consumer.consume_broadcast("test").is_some());
701
702 consumer.assert_next("test", &consumer1);
703 consumer.assert_next_none("test");
704 consumer.assert_next("test", &consumer2);
705 consumer.assert_next_none("test");
706 consumer.assert_next("test", &consumer3);
707
708 drop(broadcast2);
710
711 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
713
714 assert!(consumer.consume_broadcast("test").is_some());
715 consumer.assert_next_wait();
716
717 drop(broadcast3);
719
720 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
722
723 assert!(consumer.consume_broadcast("test").is_some());
724 consumer.assert_next_none("test");
725 consumer.assert_next("test", &consumer1);
726
727 drop(broadcast1);
729
730 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
732 assert!(consumer.consume_broadcast("test").is_none());
733
734 consumer.assert_next_none("test");
735 consumer.assert_next_wait();
736 }
737
738 #[tokio::test]
739 async fn test_duplicate_reverse() {
740 let origin = Origin::produce();
741 let broadcast1 = Broadcast::produce();
742 let broadcast2 = Broadcast::produce();
743
744 origin.publish_broadcast("test", broadcast1.consume());
745 origin.publish_broadcast("test", broadcast2.consume());
746 assert!(origin.consume_broadcast("test").is_some());
747
748 drop(broadcast2);
750
751 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
753 assert!(origin.consume_broadcast("test").is_some());
754
755 drop(broadcast1);
756
757 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
759 assert!(origin.consume_broadcast("test").is_none());
760 }
761
762 #[tokio::test]
763 async fn test_double_publish() {
764 let origin = Origin::produce();
765 let broadcast = Broadcast::produce();
766
767 origin.publish_broadcast("test", broadcast.consume());
769 origin.publish_broadcast("test", broadcast.consume());
770
771 assert!(origin.consume_broadcast("test").is_some());
772
773 drop(broadcast);
774
775 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
777 assert!(origin.consume_broadcast("test").is_none());
778 }
779 #[tokio::test]
781 #[should_panic]
782 async fn test_128() {
783 let origin = Origin::produce();
784 let broadcast = Broadcast::produce();
785
786 let mut consumer = origin.consume();
787 for i in 0..256 {
788 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
789 }
790
791 for i in 0..256 {
792 consumer.assert_next(format!("test{i}"), &broadcast.consume());
793 }
794 }
795
796 #[tokio::test]
797 async fn test_128_fix() {
798 let origin = Origin::produce();
799 let broadcast = Broadcast::produce();
800
801 let mut consumer = origin.consume();
802 for i in 0..256 {
803 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
804 }
805
806 for i in 0..256 {
807 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
809 }
810 }
811
812 #[tokio::test]
813 async fn test_with_root_basic() {
814 let origin = Origin::produce();
815 let broadcast = Broadcast::produce();
816
817 let foo_producer = origin.with_root("foo").expect("should create root");
819 assert_eq!(foo_producer.root().as_str(), "foo");
820
821 let mut consumer = origin.consume();
822
823 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
825 consumer.assert_next("foo/bar/baz", &broadcast.consume());
827
828 let mut foo_consumer = foo_producer.consume();
830 foo_consumer.assert_next("bar/baz", &broadcast.consume());
831 }
832
833 #[tokio::test]
834 async fn test_with_root_nested() {
835 let origin = Origin::produce();
836 let broadcast = Broadcast::produce();
837
838 let foo_producer = origin.with_root("foo").expect("should create foo root");
840 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
841 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
842
843 let mut consumer = origin.consume();
844
845 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
847 consumer.assert_next("foo/bar/baz", &broadcast.consume());
849
850 let mut foo_bar_consumer = foo_bar_producer.consume();
852 foo_bar_consumer.assert_next("baz", &broadcast.consume());
853 }
854
855 #[tokio::test]
856 async fn test_publish_only_allows() {
857 let origin = Origin::produce();
858 let broadcast = Broadcast::produce();
859
860 let limited_producer = origin
862 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
863 .expect("should create limited producer");
864
865 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
867 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
868 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
869
870 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
872 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
874 }
875
876 #[tokio::test]
877 async fn test_publish_only_empty() {
878 let origin = Origin::produce();
879
880 assert!(origin.publish_only(&[]).is_none());
882 }
883
884 #[tokio::test]
885 async fn test_consume_only_filters() {
886 let origin = Origin::produce();
887 let broadcast1 = Broadcast::produce();
888 let broadcast2 = Broadcast::produce();
889 let broadcast3 = Broadcast::produce();
890
891 let mut consumer = origin.consume();
892
893 origin.publish_broadcast("allowed", broadcast1.consume());
895 origin.publish_broadcast("allowed/nested", broadcast2.consume());
896 origin.publish_broadcast("notallowed", broadcast3.consume());
897
898 let mut limited_consumer = origin
900 .consume_only(&["allowed".into()])
901 .expect("should create limited consumer");
902
903 limited_consumer.assert_next("allowed", &broadcast1.consume());
905 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
906 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
910 consumer.assert_next("allowed/nested", &broadcast2.consume());
911 consumer.assert_next("notallowed", &broadcast3.consume());
912 }
913
914 #[tokio::test]
915 async fn test_consume_only_multiple_prefixes() {
916 let origin = Origin::produce();
917 let broadcast1 = Broadcast::produce();
918 let broadcast2 = Broadcast::produce();
919 let broadcast3 = Broadcast::produce();
920
921 origin.publish_broadcast("foo/test", broadcast1.consume());
922 origin.publish_broadcast("bar/test", broadcast2.consume());
923 origin.publish_broadcast("baz/test", broadcast3.consume());
924
925 let mut limited_consumer = origin
927 .consume_only(&["foo".into(), "bar".into()])
928 .expect("should create limited consumer");
929
930 limited_consumer.assert_next("foo/test", &broadcast1.consume());
931 limited_consumer.assert_next("bar/test", &broadcast2.consume());
932 limited_consumer.assert_next_wait(); }
934
935 #[tokio::test]
936 async fn test_with_root_and_publish_only() {
937 let origin = Origin::produce();
938 let broadcast = Broadcast::produce();
939
940 let foo_producer = origin.with_root("foo").expect("should create foo root");
942
943 let limited_producer = foo_producer
945 .publish_only(&["bar".into(), "goop/pee".into()])
946 .expect("should create limited producer");
947
948 let mut consumer = origin.consume();
949
950 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
952 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
953 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
954 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
955
956 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
958 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
960
961 consumer.assert_next("foo/bar", &broadcast.consume());
963 consumer.assert_next("foo/bar/nested", &broadcast.consume());
964 consumer.assert_next("foo/goop/pee", &broadcast.consume());
965 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
966 }
967
968 #[tokio::test]
969 async fn test_with_root_and_consume_only() {
970 let origin = Origin::produce();
971 let broadcast1 = Broadcast::produce();
972 let broadcast2 = Broadcast::produce();
973 let broadcast3 = Broadcast::produce();
974
975 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
977 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
978 origin.publish_broadcast("foo/other/test", broadcast3.consume());
979
980 let foo_producer = origin.with_root("foo").expect("should create foo root");
982
983 let mut limited_consumer = foo_producer
985 .consume_only(&["bar".into(), "goop/pee".into()])
986 .expect("should create limited consumer");
987
988 limited_consumer.assert_next("bar/test", &broadcast1.consume());
990 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
991 limited_consumer.assert_next_wait(); }
993
994 #[tokio::test]
995 async fn test_with_root_unauthorized() {
996 let origin = Origin::produce();
997
998 let limited_producer = origin
1000 .publish_only(&["allowed".into()])
1001 .expect("should create limited producer");
1002
1003 assert!(limited_producer.with_root("notallowed").is_none());
1005
1006 let allowed_root = limited_producer
1008 .with_root("allowed")
1009 .expect("should create allowed root");
1010 assert_eq!(allowed_root.root().as_str(), "allowed");
1011 }
1012
1013 #[tokio::test]
1014 async fn test_wildcard_permission() {
1015 let origin = Origin::produce();
1016 let broadcast = Broadcast::produce();
1017
1018 let root_producer = origin.clone();
1020
1021 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1023 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1024
1025 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1027 assert_eq!(foo_producer.root().as_str(), "foo");
1028 }
1029
1030 #[tokio::test]
1031 async fn test_consume_broadcast_with_permissions() {
1032 let origin = Origin::produce();
1033 let broadcast1 = Broadcast::produce();
1034 let broadcast2 = Broadcast::produce();
1035
1036 origin.publish_broadcast("allowed/test", broadcast1.consume());
1037 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1038
1039 let limited_consumer = origin
1041 .consume_only(&["allowed".into()])
1042 .expect("should create limited consumer");
1043
1044 let result = limited_consumer.consume_broadcast("allowed/test");
1046 assert!(result.is_some());
1047 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1048
1049 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1051
1052 let consumer = origin.consume();
1054 assert!(consumer.consume_broadcast("allowed/test").is_some());
1055 assert!(consumer.consume_broadcast("notallowed/test").is_some());
1056 }
1057
1058 #[tokio::test]
1059 async fn test_nested_paths_with_permissions() {
1060 let origin = Origin::produce();
1061 let broadcast = Broadcast::produce();
1062
1063 let limited_producer = origin
1065 .publish_only(&["a/b/c".into()])
1066 .expect("should create limited producer");
1067
1068 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1070 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1071 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1072
1073 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1075 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1076 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1077 }
1078
1079 #[tokio::test]
1080 async fn test_multiple_consumers_with_different_permissions() {
1081 let origin = Origin::produce();
1082 let broadcast1 = Broadcast::produce();
1083 let broadcast2 = Broadcast::produce();
1084 let broadcast3 = Broadcast::produce();
1085
1086 origin.publish_broadcast("foo/test", broadcast1.consume());
1088 origin.publish_broadcast("bar/test", broadcast2.consume());
1089 origin.publish_broadcast("baz/test", broadcast3.consume());
1090
1091 let mut foo_consumer = origin
1093 .consume_only(&["foo".into()])
1094 .expect("should create foo consumer");
1095
1096 let mut bar_consumer = origin
1097 .consume_only(&["bar".into()])
1098 .expect("should create bar consumer");
1099
1100 let mut foobar_consumer = origin
1101 .consume_only(&["foo".into(), "bar".into()])
1102 .expect("should create foobar consumer");
1103
1104 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1106 foo_consumer.assert_next_wait();
1107
1108 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1109 bar_consumer.assert_next_wait();
1110
1111 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1112 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1113 foobar_consumer.assert_next_wait();
1114 }
1115
1116 #[tokio::test]
1117 async fn test_select_with_empty_prefix() {
1118 let origin = Origin::produce();
1119 let broadcast1 = Broadcast::produce();
1120 let broadcast2 = Broadcast::produce();
1121
1122 let demo_producer = origin.with_root("demo").expect("should create demo root");
1124 let limited_producer = demo_producer
1125 .publish_only(&["worm-node".into(), "foobar".into()])
1126 .expect("should create limited producer");
1127
1128 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1130 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1131
1132 let mut consumer = limited_producer
1134 .consume_only(&["".into()])
1135 .expect("should create consumer with empty prefix");
1136
1137 consumer.assert_next("worm-node/test", &broadcast1.consume());
1139 consumer.assert_next("foobar/test", &broadcast2.consume());
1140 consumer.assert_next_wait();
1141 }
1142
1143 #[tokio::test]
1144 async fn test_select_narrowing_scope() {
1145 let origin = Origin::produce();
1146 let broadcast1 = Broadcast::produce();
1147 let broadcast2 = Broadcast::produce();
1148 let broadcast3 = Broadcast::produce();
1149
1150 let demo_producer = origin.with_root("demo").expect("should create demo root");
1152 let limited_producer = demo_producer
1153 .publish_only(&["worm-node".into(), "foobar".into()])
1154 .expect("should create limited producer");
1155
1156 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1158 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1159 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1160
1161 let mut worm_consumer = limited_producer
1163 .consume_only(&["worm-node".into()])
1164 .expect("should create worm-node consumer");
1165
1166 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1168 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1169 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1173 .consume_only(&["worm-node/foo".into()])
1174 .expect("should create worm-node/foo consumer");
1175
1176 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1177 foo_consumer.assert_next_wait(); }
1179
1180 #[tokio::test]
1181 async fn test_select_multiple_roots_with_empty_prefix() {
1182 let origin = Origin::produce();
1183 let broadcast1 = Broadcast::produce();
1184 let broadcast2 = Broadcast::produce();
1185 let broadcast3 = Broadcast::produce();
1186
1187 let limited_producer = origin
1189 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1190 .expect("should create limited producer");
1191
1192 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1194 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1195 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1196
1197 let mut consumer = limited_producer
1199 .consume_only(&["".into()])
1200 .expect("should create consumer with empty prefix");
1201
1202 consumer.assert_next("app1/data", &broadcast1.consume());
1204 consumer.assert_next("app2/config", &broadcast2.consume());
1205 consumer.assert_next("shared/resource", &broadcast3.consume());
1206 consumer.assert_next_wait();
1207 }
1208
1209 #[tokio::test]
1210 async fn test_publish_only_with_empty_prefix() {
1211 let origin = Origin::produce();
1212 let broadcast = Broadcast::produce();
1213
1214 let limited_producer = origin
1216 .publish_only(&["services/api".into(), "services/web".into()])
1217 .expect("should create limited producer");
1218
1219 let same_producer = limited_producer
1221 .publish_only(&["".into()])
1222 .expect("should create producer with empty prefix");
1223
1224 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1226 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1227 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1228 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1229 }
1230
1231 #[tokio::test]
1232 async fn test_select_narrowing_to_deeper_path() {
1233 let origin = Origin::produce();
1234 let broadcast1 = Broadcast::produce();
1235 let broadcast2 = Broadcast::produce();
1236 let broadcast3 = Broadcast::produce();
1237
1238 let limited_producer = origin
1240 .publish_only(&["org".into()])
1241 .expect("should create limited producer");
1242
1243 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1245 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1246 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1247
1248 let mut team2_consumer = limited_producer
1250 .consume_only(&["org/team2".into()])
1251 .expect("should create team2 consumer");
1252
1253 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1254 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1258 .consume_only(&["org/team1/project1".into()])
1259 .expect("should create project1 consumer");
1260
1261 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1263 project1_consumer.assert_next_wait();
1264 }
1265
1266 #[tokio::test]
1267 async fn test_select_with_non_matching_prefix() {
1268 let origin = Origin::produce();
1269
1270 let limited_producer = origin
1272 .publish_only(&["allowed/path".into()])
1273 .expect("should create limited producer");
1274
1275 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1277
1278 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1280 }
1281
1282 #[tokio::test]
1283 async fn test_select_maintains_access_with_wider_prefix() {
1284 let origin = Origin::produce();
1285 let broadcast1 = Broadcast::produce();
1286 let broadcast2 = Broadcast::produce();
1287
1288 let demo_producer = origin.with_root("demo").expect("should create demo root");
1290 let user_producer = demo_producer
1291 .publish_only(&["worm-node".into(), "foobar".into()])
1292 .expect("should create user producer");
1293
1294 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1296 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1297
1298 let mut consumer = user_producer
1300 .consume_only(&["".into()])
1301 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1302
1303 consumer.assert_next("worm-node/data", &broadcast1.consume());
1305 consumer.assert_next("foobar", &broadcast2.consume());
1306 consumer.assert_next_wait();
1307
1308 let mut narrow_consumer = user_producer
1310 .consume_only(&["worm-node".into()])
1311 .expect("should be able to narrow scope to worm-node");
1312
1313 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1314 narrow_consumer.assert_next_wait(); }
1316}