1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125 }
126
127 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128 match self.nested.get(dir) {
129 Some(next) => next.clone(),
130 None => {
131 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132 self.nested.insert(dir.to_string(), next.clone());
133 next
134 }
135 }
136 }
137
138 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139 let full = full.as_path();
140 let rest = relative.as_path();
141
142 if let Some((dir, relative)) = rest.next_part() {
144 self.entry(dir).lock().publish(&full, broadcast, &relative);
146 } else if let Some(existing) = &mut self.broadcast {
147 let old = existing.active.clone();
149 existing.active = broadcast.clone();
150 existing.backup.push(old);
151
152 self.notify.lock().reannounce(full, broadcast);
153 } else {
154 self.broadcast = Some(OriginBroadcast {
156 path: full.to_owned(),
157 active: broadcast.clone(),
158 backup: Vec::new(),
159 });
160 self.notify.lock().announce(full, broadcast);
161 }
162 }
163
164 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165 self.consume_initial(&mut notify);
166 self.notify.lock().consumers.insert(id, notify);
167 }
168
169 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170 if let Some(broadcast) = &self.broadcast {
171 notify.announce(&broadcast.path, broadcast.active.clone());
172 }
173
174 for nested in self.nested.values() {
176 nested.lock().consume_initial(notify);
177 }
178 }
179
180 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181 let rest = rest.as_path();
182
183 if let Some((dir, rest)) = rest.next_part() {
184 let node = self.nested.get(dir)?.lock();
185 node.consume_broadcast(&rest)
186 } else {
187 self.broadcast.as_ref().map(|b| b.active.clone())
188 }
189 }
190
191 fn unconsume(&mut self, id: ConsumerId) {
192 self.notify.lock().consumers.remove(&id).expect("consumer not found");
193 if self.is_empty() {
194 }
197 }
198
199 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201 let full = full.as_path();
202 let relative = relative.as_path();
203
204 if let Some((dir, relative)) = relative.next_part() {
205 let nested = self.entry(dir);
206 let mut locked = nested.lock();
207 locked.remove(&full, broadcast, &relative);
208
209 if locked.is_empty() {
210 drop(locked);
211 self.nested.remove(dir);
212 }
213 } else {
214 let entry = match &mut self.broadcast {
215 Some(existing) => existing,
216 None => return,
217 };
218
219 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221 if let Some(pos) = pos {
222 entry.backup.remove(pos);
223 return;
225 }
226
227 assert!(entry.active.is_clone(&broadcast));
229
230 if let Some(active) = entry.backup.pop() {
232 entry.active = active;
233 self.notify.lock().reannounce(full, &entry.active);
234 } else {
235 self.broadcast = None;
237 self.notify.lock().unannounce(full);
238 }
239 }
240 }
241
242 fn is_empty(&self) -> bool {
243 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244 }
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253 pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
256 let mut roots = Vec::new();
257
258 for (root, state) in &self.nodes {
259 for prefix in prefixes {
260 if root.has_prefix(prefix) {
261 roots.push((root.to_owned(), state.clone()));
263 continue;
264 }
265
266 if let Some(suffix) = prefix.strip_prefix(root) {
267 let nested = state.lock().leaf(&suffix);
269 roots.push((prefix.to_owned(), nested));
270 }
271 }
272 }
273
274 if roots.is_empty() {
275 None
276 } else {
277 Some(Self { nodes: roots })
278 }
279 }
280
281 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282 let new_root = new_root.as_path();
283 let mut roots = Vec::new();
284
285 if new_root.is_empty() {
286 return Some(self.clone());
287 }
288
289 for (root, state) in &self.nodes {
290 if let Some(suffix) = root.strip_prefix(&new_root) {
291 roots.push((suffix.to_owned(), state.clone()));
293 } else if let Some(suffix) = new_root.strip_prefix(root) {
294 let nested = state.lock().leaf(&suffix);
297 roots.push(("".into(), nested));
298 }
299 }
300
301 if roots.is_empty() {
302 None
303 } else {
304 Some(Self { nodes: roots })
305 }
306 }
307
308 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310 let path = path.as_path();
311
312 for (root, state) in &self.nodes {
313 if let Some(suffix) = path.strip_prefix(root) {
314 return Some((state.clone(), suffix.to_owned()));
315 }
316 }
317
318 None
319 }
320}
321
322impl Default for OriginNodes {
323 fn default() -> Self {
324 Self {
325 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326 }
327 }
328}
329
330pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333pub struct Origin {}
335
336impl Origin {
337 pub fn produce() -> OriginProducer {
338 OriginProducer::new()
339 }
340}
341
342#[derive(Clone, Default)]
344pub struct OriginProducer {
345 nodes: OriginNodes,
348
349 root: PathOwned,
351}
352
353impl OriginProducer {
354 pub fn new() -> Self {
355 Self::default()
356 }
357
358 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
363 let broadcast = Broadcast::produce();
364 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
365 }
366
367 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
376 let path = path.as_path();
377
378 let (root, rest) = match self.nodes.get(&path) {
379 Some(root) => root,
380 None => return false,
381 };
382
383 let full = self.root.join(&path);
384
385 root.lock().publish(&full, &broadcast, &rest);
386 let root = root.clone();
387
388 web_async::spawn(async move {
389 broadcast.closed().await;
390 root.lock().remove(&full, broadcast, &rest);
391 });
392
393 true
394 }
395
396 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
400 Some(OriginProducer {
401 nodes: self.nodes.select(prefixes)?,
402 root: self.root.clone(),
403 })
404 }
405
406 pub fn consume(&self) -> OriginConsumer {
408 OriginConsumer::new(self.root.clone(), self.nodes.clone())
409 }
410
411 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
417 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
418 }
419
420 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
424 let path = path.as_path();
425 let (root, rest) = self.nodes.get(&path)?;
426 let state = root.lock();
427 state.consume_broadcast(&rest)
428 }
429
430 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
434 let prefix = prefix.as_path();
435
436 Some(Self {
437 root: self.root.join(&prefix).to_owned(),
438 nodes: self.nodes.root(&prefix)?,
439 })
440 }
441
442 pub fn root(&self) -> &Path<'_> {
444 &self.root
445 }
446
447 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
448 self.nodes.nodes.iter().map(|(root, _)| root)
449 }
450
451 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
453 self.root.join(path)
454 }
455}
456
457pub struct OriginConsumer {
461 id: ConsumerId,
462 nodes: OriginNodes,
463 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
464
465 root: PathOwned,
467}
468
469impl OriginConsumer {
470 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
471 let (tx, rx) = mpsc::unbounded_channel();
472
473 let id = ConsumerId::new();
474
475 for (_, state) in &nodes.nodes {
476 let notify = OriginConsumerNotify {
477 root: root.clone(),
478 tx: tx.clone(),
479 };
480 state.lock().consume(id, notify);
481 }
482
483 Self {
484 id,
485 nodes,
486 updates: rx,
487 root,
488 }
489 }
490
491 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
499 self.updates.recv().await
500 }
501
502 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
507 self.updates.try_recv().ok()
508 }
509
510 pub fn consume(&self) -> Self {
511 self.clone()
512 }
513
514 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
520 let path = path.as_path();
521 let (root, rest) = self.nodes.get(&path)?;
522 let state = root.lock();
523 state.consume_broadcast(&rest)
524 }
525
526 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
530 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
531 }
532
533 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
537 let prefix = prefix.as_path();
538
539 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
540 }
541
542 pub fn root(&self) -> &Path<'_> {
544 &self.root
545 }
546
547 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
548 self.nodes.nodes.iter().map(|(root, _)| root)
549 }
550
551 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
553 self.root.join(path)
554 }
555}
556
557impl Drop for OriginConsumer {
558 fn drop(&mut self) {
559 for (_, root) in &self.nodes.nodes {
560 root.lock().unconsume(self.id);
561 }
562 }
563}
564
565impl Clone for OriginConsumer {
566 fn clone(&self) -> Self {
567 OriginConsumer::new(self.root.clone(), self.nodes.clone())
568 }
569}
570
571#[cfg(test)]
572use futures::FutureExt;
573
574#[cfg(test)]
575impl OriginConsumer {
576 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
577 let expected = expected.as_path();
578 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
579 assert_eq!(path, expected, "wrong path");
580 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
581 }
582
583 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
584 let expected = expected.as_path();
585 let (path, active) = self.try_announced().expect("no next");
586 assert_eq!(path, expected, "wrong path");
587 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
588 }
589
590 pub fn assert_next_none(&mut self, expected: impl AsPath) {
591 let expected = expected.as_path();
592 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
593 assert_eq!(path, expected, "wrong path");
594 assert!(active.is_none(), "should be unannounced");
595 }
596
597 pub fn assert_next_wait(&mut self) {
598 if let Some(res) = self.announced().now_or_never() {
599 panic!("next should block: got {:?}", res.map(|(path, _)| path));
600 }
601 }
602
603 }
612
613#[cfg(test)]
614mod tests {
615 use crate::Broadcast;
616
617 use super::*;
618
619 #[tokio::test]
620 async fn test_announce() {
621 let origin = Origin::produce();
622 let broadcast1 = Broadcast::produce();
623 let broadcast2 = Broadcast::produce();
624
625 let mut consumer1 = origin.consume();
626 consumer1.assert_next_wait();
628
629 origin.publish_broadcast("test1", broadcast1.consume());
631
632 consumer1.assert_next("test1", &broadcast1.consume());
633 consumer1.assert_next_wait();
634
635 let mut consumer2 = origin.consume();
638
639 origin.publish_broadcast("test2", broadcast2.consume());
641
642 consumer1.assert_next("test2", &broadcast2.consume());
643 consumer1.assert_next_wait();
644
645 consumer2.assert_next("test1", &broadcast1.consume());
646 consumer2.assert_next("test2", &broadcast2.consume());
647 consumer2.assert_next_wait();
648
649 drop(broadcast1);
651
652 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
654
655 consumer1.assert_next_none("test1");
657 consumer2.assert_next_none("test1");
658 consumer1.assert_next_wait();
659 consumer2.assert_next_wait();
660
661 let mut consumer3 = origin.consume();
663 consumer3.assert_next("test2", &broadcast2.consume());
664 consumer3.assert_next_wait();
665
666 drop(broadcast2);
668
669 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
671
672 consumer1.assert_next_none("test2");
673 consumer2.assert_next_none("test2");
674 consumer3.assert_next_none("test2");
675
676 }
682
683 #[tokio::test]
684 async fn test_duplicate() {
685 let origin = Origin::produce();
686
687 let broadcast1 = Broadcast::produce();
688 let broadcast2 = Broadcast::produce();
689 let broadcast3 = Broadcast::produce();
690
691 let consumer1 = broadcast1.consume();
692 let consumer2 = broadcast2.consume();
693 let consumer3 = broadcast3.consume();
694
695 origin.publish_broadcast("test", consumer1.clone());
696 origin.publish_broadcast("test", consumer2.clone());
697 origin.publish_broadcast("test", consumer3.clone());
698
699 let mut consumer = origin.consume();
700 assert!(consumer.consume_broadcast("test").is_some());
701
702 consumer.assert_next("test", &consumer1);
703 consumer.assert_next_none("test");
704 consumer.assert_next("test", &consumer2);
705 consumer.assert_next_none("test");
706 consumer.assert_next("test", &consumer3);
707
708 drop(broadcast2);
710
711 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
713
714 assert!(consumer.consume_broadcast("test").is_some());
715 consumer.assert_next_wait();
716
717 drop(broadcast3);
719
720 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
722
723 assert!(consumer.consume_broadcast("test").is_some());
724 consumer.assert_next_none("test");
725 consumer.assert_next("test", &consumer1);
726
727 drop(broadcast1);
729
730 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
732 assert!(consumer.consume_broadcast("test").is_none());
733
734 consumer.assert_next_none("test");
735 consumer.assert_next_wait();
736 }
737
738 #[tokio::test]
739 async fn test_duplicate_reverse() {
740 let origin = Origin::produce();
741 let broadcast1 = Broadcast::produce();
742 let broadcast2 = Broadcast::produce();
743
744 origin.publish_broadcast("test", broadcast1.consume());
745 origin.publish_broadcast("test", broadcast2.consume());
746 assert!(origin.consume_broadcast("test").is_some());
747
748 drop(broadcast2);
750
751 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
753 assert!(origin.consume_broadcast("test").is_some());
754
755 drop(broadcast1);
756
757 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
759 assert!(origin.consume_broadcast("test").is_none());
760 }
761
762 #[tokio::test]
763 async fn test_double_publish() {
764 let origin = Origin::produce();
765 let broadcast = Broadcast::produce();
766
767 origin.publish_broadcast("test", broadcast.consume());
769 origin.publish_broadcast("test", broadcast.consume());
770
771 assert!(origin.consume_broadcast("test").is_some());
772
773 drop(broadcast);
774
775 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
777 assert!(origin.consume_broadcast("test").is_none());
778 }
779 #[tokio::test]
781 #[should_panic]
782 async fn test_128() {
783 let origin = Origin::produce();
784 let broadcast = Broadcast::produce();
785
786 for i in 0..256 {
787 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
788 }
789
790 let mut consumer = origin.consume();
791 for i in 0..256 {
792 consumer.assert_next(format!("test{i}"), &broadcast.consume());
793 }
794 }
795
796 #[tokio::test]
797 async fn test_128_fix() {
798 let origin = Origin::produce();
799 let broadcast = Broadcast::produce();
800
801 for i in 0..256 {
802 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
803 }
804
805 let mut consumer = origin.consume();
806 for i in 0..256 {
807 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
809 }
810 }
811
812 #[tokio::test]
813 async fn test_with_root_basic() {
814 let origin = Origin::produce();
815 let broadcast = Broadcast::produce();
816
817 let foo_producer = origin.with_root("foo").expect("should create root");
819 assert_eq!(foo_producer.root().as_str(), "foo");
820
821 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
823
824 let mut consumer = origin.consume();
825 consumer.assert_next("foo/bar/baz", &broadcast.consume());
827
828 let mut foo_consumer = foo_producer.consume();
830 foo_consumer.assert_next("bar/baz", &broadcast.consume());
831 }
832
833 #[tokio::test]
834 async fn test_with_root_nested() {
835 let origin = Origin::produce();
836 let broadcast = Broadcast::produce();
837
838 let foo_producer = origin.with_root("foo").expect("should create foo root");
840 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
841 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
842
843 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
845
846 let mut consumer = origin.consume();
847 consumer.assert_next("foo/bar/baz", &broadcast.consume());
849
850 let mut foo_bar_consumer = foo_bar_producer.consume();
852 foo_bar_consumer.assert_next("baz", &broadcast.consume());
853 }
854
855 #[tokio::test]
856 async fn test_publish_only_allows() {
857 let origin = Origin::produce();
858 let broadcast = Broadcast::produce();
859
860 let limited_producer = origin
862 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
863 .expect("should create limited producer");
864
865 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
867 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
868 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
869
870 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
872 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
874 }
875
876 #[tokio::test]
877 async fn test_publish_only_empty() {
878 let origin = Origin::produce();
879
880 assert!(origin.publish_only(&[]).is_none());
882 }
883
884 #[tokio::test]
885 async fn test_consume_only_filters() {
886 let origin = Origin::produce();
887 let broadcast1 = Broadcast::produce();
888 let broadcast2 = Broadcast::produce();
889 let broadcast3 = Broadcast::produce();
890
891 origin.publish_broadcast("allowed", broadcast1.consume());
893 origin.publish_broadcast("allowed/nested", broadcast2.consume());
894 origin.publish_broadcast("notallowed", broadcast3.consume());
895
896 let mut limited_consumer = origin
898 .consume_only(&["allowed".into()])
899 .expect("should create limited consumer");
900
901 limited_consumer.assert_next("allowed", &broadcast1.consume());
903 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
904 limited_consumer.assert_next_wait(); let mut consumer = origin.consume();
908 consumer.assert_next("allowed", &broadcast1.consume());
909 consumer.assert_next("allowed/nested", &broadcast2.consume());
910 consumer.assert_next("notallowed", &broadcast3.consume());
911 }
912
913 #[tokio::test]
914 async fn test_consume_only_multiple_prefixes() {
915 let origin = Origin::produce();
916 let broadcast1 = Broadcast::produce();
917 let broadcast2 = Broadcast::produce();
918 let broadcast3 = Broadcast::produce();
919
920 origin.publish_broadcast("foo/test", broadcast1.consume());
921 origin.publish_broadcast("bar/test", broadcast2.consume());
922 origin.publish_broadcast("baz/test", broadcast3.consume());
923
924 let mut limited_consumer = origin
926 .consume_only(&["foo".into(), "bar".into()])
927 .expect("should create limited consumer");
928
929 limited_consumer.assert_next("foo/test", &broadcast1.consume());
930 limited_consumer.assert_next("bar/test", &broadcast2.consume());
931 limited_consumer.assert_next_wait(); }
933
934 #[tokio::test]
935 async fn test_with_root_and_publish_only() {
936 let origin = Origin::produce();
937 let broadcast = Broadcast::produce();
938
939 let foo_producer = origin.with_root("foo").expect("should create foo root");
941
942 let limited_producer = foo_producer
944 .publish_only(&["bar".into(), "goop/pee".into()])
945 .expect("should create limited producer");
946
947 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
949 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
950 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
951 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
952
953 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
955 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
957
958 let mut consumer = origin.consume();
960 consumer.assert_next("foo/bar", &broadcast.consume());
961 consumer.assert_next("foo/bar/nested", &broadcast.consume());
962 consumer.assert_next("foo/goop/pee", &broadcast.consume());
963 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
964 }
965
966 #[tokio::test]
967 async fn test_with_root_and_consume_only() {
968 let origin = Origin::produce();
969 let broadcast1 = Broadcast::produce();
970 let broadcast2 = Broadcast::produce();
971 let broadcast3 = Broadcast::produce();
972
973 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
975 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
976 origin.publish_broadcast("foo/other/test", broadcast3.consume());
977
978 let foo_producer = origin.with_root("foo").expect("should create foo root");
980
981 let mut limited_consumer = foo_producer
983 .consume_only(&["bar".into(), "goop/pee".into()])
984 .expect("should create limited consumer");
985
986 limited_consumer.assert_next("bar/test", &broadcast1.consume());
988 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
989 limited_consumer.assert_next_wait(); }
991
992 #[tokio::test]
993 async fn test_with_root_unauthorized() {
994 let origin = Origin::produce();
995
996 let limited_producer = origin
998 .publish_only(&["allowed".into()])
999 .expect("should create limited producer");
1000
1001 assert!(limited_producer.with_root("notallowed").is_none());
1003
1004 let allowed_root = limited_producer
1006 .with_root("allowed")
1007 .expect("should create allowed root");
1008 assert_eq!(allowed_root.root().as_str(), "allowed");
1009 }
1010
1011 #[tokio::test]
1012 async fn test_wildcard_permission() {
1013 let origin = Origin::produce();
1014 let broadcast = Broadcast::produce();
1015
1016 let root_producer = origin.clone();
1018
1019 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1021 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1022
1023 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1025 assert_eq!(foo_producer.root().as_str(), "foo");
1026 }
1027
1028 #[tokio::test]
1029 async fn test_consume_broadcast_with_permissions() {
1030 let origin = Origin::produce();
1031 let broadcast1 = Broadcast::produce();
1032 let broadcast2 = Broadcast::produce();
1033
1034 origin.publish_broadcast("allowed/test", broadcast1.consume());
1035 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1036
1037 let limited_consumer = origin
1039 .consume_only(&["allowed".into()])
1040 .expect("should create limited consumer");
1041
1042 let result = limited_consumer.consume_broadcast("allowed/test");
1044 assert!(result.is_some());
1045 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1046
1047 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1049
1050 let consumer = origin.consume();
1052 assert!(consumer.consume_broadcast("allowed/test").is_some());
1053 assert!(consumer.consume_broadcast("notallowed/test").is_some());
1054 }
1055
1056 #[tokio::test]
1057 async fn test_nested_paths_with_permissions() {
1058 let origin = Origin::produce();
1059 let broadcast = Broadcast::produce();
1060
1061 let limited_producer = origin
1063 .publish_only(&["a/b/c".into()])
1064 .expect("should create limited producer");
1065
1066 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1068 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1069 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1070
1071 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1073 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1074 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1075 }
1076
1077 #[tokio::test]
1078 async fn test_multiple_consumers_with_different_permissions() {
1079 let origin = Origin::produce();
1080 let broadcast1 = Broadcast::produce();
1081 let broadcast2 = Broadcast::produce();
1082 let broadcast3 = Broadcast::produce();
1083
1084 origin.publish_broadcast("foo/test", broadcast1.consume());
1086 origin.publish_broadcast("bar/test", broadcast2.consume());
1087 origin.publish_broadcast("baz/test", broadcast3.consume());
1088
1089 let mut foo_consumer = origin
1091 .consume_only(&["foo".into()])
1092 .expect("should create foo consumer");
1093
1094 let mut bar_consumer = origin
1095 .consume_only(&["bar".into()])
1096 .expect("should create bar consumer");
1097
1098 let mut foobar_consumer = origin
1099 .consume_only(&["foo".into(), "bar".into()])
1100 .expect("should create foobar consumer");
1101
1102 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1104 foo_consumer.assert_next_wait();
1105
1106 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1107 bar_consumer.assert_next_wait();
1108
1109 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1110 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1111 foobar_consumer.assert_next_wait();
1112 }
1113
1114 #[tokio::test]
1115 async fn test_select_with_empty_prefix() {
1116 let origin = Origin::produce();
1117 let broadcast1 = Broadcast::produce();
1118 let broadcast2 = Broadcast::produce();
1119
1120 let demo_producer = origin.with_root("demo").expect("should create demo root");
1122 let limited_producer = demo_producer
1123 .publish_only(&["worm-node".into(), "foobar".into()])
1124 .expect("should create limited producer");
1125
1126 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1128 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1129
1130 let mut consumer = limited_producer
1132 .consume_only(&["".into()])
1133 .expect("should create consumer with empty prefix");
1134
1135 consumer.assert_next("worm-node/test", &broadcast1.consume());
1137 consumer.assert_next("foobar/test", &broadcast2.consume());
1138 consumer.assert_next_wait();
1139 }
1140
1141 #[tokio::test]
1142 async fn test_select_narrowing_scope() {
1143 let origin = Origin::produce();
1144 let broadcast1 = Broadcast::produce();
1145 let broadcast2 = Broadcast::produce();
1146 let broadcast3 = Broadcast::produce();
1147
1148 let demo_producer = origin.with_root("demo").expect("should create demo root");
1150 let limited_producer = demo_producer
1151 .publish_only(&["worm-node".into(), "foobar".into()])
1152 .expect("should create limited producer");
1153
1154 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1156 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1157 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1158
1159 let mut worm_consumer = limited_producer
1161 .consume_only(&["worm-node".into()])
1162 .expect("should create worm-node consumer");
1163
1164 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1166 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1167 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1171 .consume_only(&["worm-node/foo".into()])
1172 .expect("should create worm-node/foo consumer");
1173
1174 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1175 foo_consumer.assert_next_wait(); }
1177
1178 #[tokio::test]
1179 async fn test_select_multiple_roots_with_empty_prefix() {
1180 let origin = Origin::produce();
1181 let broadcast1 = Broadcast::produce();
1182 let broadcast2 = Broadcast::produce();
1183 let broadcast3 = Broadcast::produce();
1184
1185 let limited_producer = origin
1187 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1188 .expect("should create limited producer");
1189
1190 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1192 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1193 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1194
1195 let mut consumer = limited_producer
1197 .consume_only(&["".into()])
1198 .expect("should create consumer with empty prefix");
1199
1200 consumer.assert_next("app1/data", &broadcast1.consume());
1202 consumer.assert_next("app2/config", &broadcast2.consume());
1203 consumer.assert_next("shared/resource", &broadcast3.consume());
1204 consumer.assert_next_wait();
1205 }
1206
1207 #[tokio::test]
1208 async fn test_publish_only_with_empty_prefix() {
1209 let origin = Origin::produce();
1210 let broadcast = Broadcast::produce();
1211
1212 let limited_producer = origin
1214 .publish_only(&["services/api".into(), "services/web".into()])
1215 .expect("should create limited producer");
1216
1217 let same_producer = limited_producer
1219 .publish_only(&["".into()])
1220 .expect("should create producer with empty prefix");
1221
1222 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1224 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1225 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1226 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1227 }
1228
1229 #[tokio::test]
1230 async fn test_select_narrowing_to_deeper_path() {
1231 let origin = Origin::produce();
1232 let broadcast1 = Broadcast::produce();
1233 let broadcast2 = Broadcast::produce();
1234 let broadcast3 = Broadcast::produce();
1235
1236 let limited_producer = origin
1238 .publish_only(&["org".into()])
1239 .expect("should create limited producer");
1240
1241 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1243 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1244 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1245
1246 let mut team1_consumer = limited_producer
1248 .consume_only(&["org/team2".into()])
1249 .expect("should create team1 consumer");
1250
1251 team1_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1252 team1_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1256 .consume_only(&["org/team1/project1".into()])
1257 .expect("should create project1 consumer");
1258
1259 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1261 project1_consumer.assert_next_wait();
1262 }
1263
1264 #[tokio::test]
1265 async fn test_select_with_non_matching_prefix() {
1266 let origin = Origin::produce();
1267
1268 let limited_producer = origin
1270 .publish_only(&["allowed/path".into()])
1271 .expect("should create limited producer");
1272
1273 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1275
1276 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1278 }
1279
1280 #[tokio::test]
1281 async fn test_select_maintains_access_with_wider_prefix() {
1282 let origin = Origin::produce();
1283 let broadcast1 = Broadcast::produce();
1284 let broadcast2 = Broadcast::produce();
1285
1286 let demo_producer = origin.with_root("demo").expect("should create demo root");
1288 let user_producer = demo_producer
1289 .publish_only(&["worm-node".into(), "foobar".into()])
1290 .expect("should create user producer");
1291
1292 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1294 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1295
1296 let mut consumer = user_producer
1298 .consume_only(&["".into()])
1299 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1300
1301 consumer.assert_next("worm-node/data", &broadcast1.consume());
1303 consumer.assert_next("foobar", &broadcast2.consume());
1304 consumer.assert_next_wait();
1305
1306 let mut narrow_consumer = user_producer
1308 .consume_only(&["worm-node".into()])
1309 .expect("should be able to narrow scope to worm-node");
1310
1311 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1312 narrow_consumer.assert_next_wait(); }
1314}