1use std::{
2 collections::{HashMap, VecDeque},
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: VecDeque<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125 }
126
127 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128 match self.nested.get(dir) {
129 Some(next) => next.clone(),
130 None => {
131 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132 self.nested.insert(dir.to_string(), next.clone());
133 next
134 }
135 }
136 }
137
138 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139 let full = full.as_path();
140 let rest = relative.as_path();
141
142 if let Some((dir, relative)) = rest.next_part() {
144 self.entry(dir).lock().publish(&full, broadcast, &relative);
146 } else if let Some(existing) = &mut self.broadcast {
147 existing.backup.push_back(broadcast.clone());
151 } else {
152 self.broadcast = Some(OriginBroadcast {
154 path: full.to_owned(),
155 active: broadcast.clone(),
156 backup: VecDeque::new(),
157 });
158 self.notify.lock().announce(full, broadcast);
159 }
160 }
161
162 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
163 self.consume_initial(&mut notify);
164 self.notify.lock().consumers.insert(id, notify);
165 }
166
167 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
168 if let Some(broadcast) = &self.broadcast {
169 notify.announce(&broadcast.path, broadcast.active.clone());
170 }
171
172 for nested in self.nested.values() {
174 nested.lock().consume_initial(notify);
175 }
176 }
177
178 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
179 let rest = rest.as_path();
180
181 if let Some((dir, rest)) = rest.next_part() {
182 let node = self.nested.get(dir)?.lock();
183 node.consume_broadcast(&rest)
184 } else {
185 self.broadcast.as_ref().map(|b| b.active.clone())
186 }
187 }
188
189 fn unconsume(&mut self, id: ConsumerId) {
190 self.notify.lock().consumers.remove(&id).expect("consumer not found");
191 if self.is_empty() {
192 }
195 }
196
197 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
199 let full = full.as_path();
200 let relative = relative.as_path();
201
202 if let Some((dir, relative)) = relative.next_part() {
203 let nested = self.entry(dir);
204 let mut locked = nested.lock();
205 locked.remove(&full, broadcast, &relative);
206
207 if locked.is_empty() {
208 drop(locked);
209 self.nested.remove(dir);
210 }
211 } else {
212 let entry = match &mut self.broadcast {
213 Some(existing) => existing,
214 None => return,
215 };
216
217 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
219 if let Some(pos) = pos {
220 entry.backup.remove(pos);
221 return;
223 }
224
225 assert!(entry.active.is_clone(&broadcast));
227
228 if let Some(next) = entry.backup.pop_front() {
230 entry.active = next;
231 self.notify.lock().reannounce(full, &entry.active);
232 } else {
233 self.broadcast = None;
235 self.notify.lock().unannounce(full);
236 }
237 }
238 }
239
240 fn is_empty(&self) -> bool {
241 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
242 }
243}
244
245#[derive(Clone)]
246struct OriginNodes {
247 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
248}
249
250impl OriginNodes {
251 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
254 let mut roots = Vec::new();
255
256 for (root, state) in &self.nodes {
257 for prefix in prefixes {
258 if root.has_prefix(prefix) {
259 roots.push((root.to_owned(), state.clone()));
261 continue;
262 }
263
264 if let Some(suffix) = prefix.strip_prefix(root) {
265 let nested = state.lock().leaf(&suffix);
267 roots.push((prefix.to_owned(), nested));
268 }
269 }
270 }
271
272 if roots.is_empty() {
273 None
274 } else {
275 Some(Self { nodes: roots })
276 }
277 }
278
279 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
280 let new_root = new_root.as_path();
281 let mut roots = Vec::new();
282
283 if new_root.is_empty() {
284 return Some(self.clone());
285 }
286
287 for (root, state) in &self.nodes {
288 if let Some(suffix) = root.strip_prefix(&new_root) {
289 roots.push((suffix.to_owned(), state.clone()));
291 } else if let Some(suffix) = new_root.strip_prefix(root) {
292 let nested = state.lock().leaf(&suffix);
295 roots.push(("".into(), nested));
296 }
297 }
298
299 if roots.is_empty() {
300 None
301 } else {
302 Some(Self { nodes: roots })
303 }
304 }
305
306 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
308 let path = path.as_path();
309
310 for (root, state) in &self.nodes {
311 if let Some(suffix) = path.strip_prefix(root) {
312 return Some((state.clone(), suffix.to_owned()));
313 }
314 }
315
316 None
317 }
318}
319
320impl Default for OriginNodes {
321 fn default() -> Self {
322 Self {
323 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
324 }
325 }
326}
327
328pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
330
331pub struct Origin {}
333
334impl Origin {
335 pub fn produce() -> OriginProducer {
336 OriginProducer::new()
337 }
338}
339
340#[derive(Clone, Default)]
342pub struct OriginProducer {
343 nodes: OriginNodes,
346
347 root: PathOwned,
349}
350
351impl OriginProducer {
352 pub fn new() -> Self {
353 Self::default()
354 }
355
356 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
361 let broadcast = Broadcast::produce();
362 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
363 }
364
365 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
375 let path = path.as_path();
376
377 let (root, rest) = match self.nodes.get(&path) {
378 Some(root) => root,
379 None => return false,
380 };
381
382 let full = self.root.join(&path);
383
384 root.lock().publish(&full, &broadcast, &rest);
385 let root = root.clone();
386
387 web_async::spawn(async move {
388 broadcast.closed().await;
389 root.lock().remove(&full, broadcast, &rest);
390 });
391
392 true
393 }
394
395 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
400 let prefixes = PathPrefixes::new(prefixes);
401 Some(OriginProducer {
402 nodes: self.nodes.select(&prefixes)?,
403 root: self.root.clone(),
404 })
405 }
406
407 pub fn consume(&self) -> OriginConsumer {
409 OriginConsumer::new(self.root.clone(), self.nodes.clone())
410 }
411
412 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
417 let prefixes = PathPrefixes::new(prefixes);
418 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
419 }
420
421 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
425 let path = path.as_path();
426 let (root, rest) = self.nodes.get(&path)?;
427 let state = root.lock();
428 state.consume_broadcast(&rest)
429 }
430
431 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
435 let prefix = prefix.as_path();
436
437 Some(Self {
438 root: self.root.join(&prefix).to_owned(),
439 nodes: self.nodes.root(&prefix)?,
440 })
441 }
442
443 pub fn root(&self) -> &Path<'_> {
445 &self.root
446 }
447
448 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
450 self.nodes.nodes.iter().map(|(root, _)| root)
451 }
452
453 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
455 self.root.join(path)
456 }
457}
458
459pub struct OriginConsumer {
463 id: ConsumerId,
464 nodes: OriginNodes,
465 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
466
467 root: PathOwned,
469}
470
471impl OriginConsumer {
472 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
473 let (tx, rx) = mpsc::unbounded_channel();
474
475 let id = ConsumerId::new();
476
477 for (_, state) in &nodes.nodes {
478 let notify = OriginConsumerNotify {
479 root: root.clone(),
480 tx: tx.clone(),
481 };
482 state.lock().consume(id, notify);
483 }
484
485 Self {
486 id,
487 nodes,
488 updates: rx,
489 root,
490 }
491 }
492
493 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
501 self.updates.recv().await
502 }
503
504 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
509 self.updates.try_recv().ok()
510 }
511
512 pub fn consume(&self) -> Self {
513 self.clone()
514 }
515
516 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
522 let path = path.as_path();
523 let (root, rest) = self.nodes.get(&path)?;
524 let state = root.lock();
525 state.consume_broadcast(&rest)
526 }
527
528 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
533 let prefixes = PathPrefixes::new(prefixes);
534 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
535 }
536
537 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
541 let prefix = prefix.as_path();
542
543 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
544 }
545
546 pub fn root(&self) -> &Path<'_> {
548 &self.root
549 }
550
551 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
553 self.nodes.nodes.iter().map(|(root, _)| root)
554 }
555
556 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
558 self.root.join(path)
559 }
560}
561
562impl Drop for OriginConsumer {
563 fn drop(&mut self) {
564 for (_, root) in &self.nodes.nodes {
565 root.lock().unconsume(self.id);
566 }
567 }
568}
569
570impl Clone for OriginConsumer {
571 fn clone(&self) -> Self {
572 OriginConsumer::new(self.root.clone(), self.nodes.clone())
573 }
574}
575
576#[cfg(test)]
577use futures::FutureExt;
578
579#[cfg(test)]
580impl OriginConsumer {
581 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
582 let expected = expected.as_path();
583 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
584 assert_eq!(path, expected, "wrong path");
585 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
586 }
587
588 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
589 let expected = expected.as_path();
590 let (path, active) = self.try_announced().expect("no next");
591 assert_eq!(path, expected, "wrong path");
592 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
593 }
594
595 pub fn assert_next_none(&mut self, expected: impl AsPath) {
596 let expected = expected.as_path();
597 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
598 assert_eq!(path, expected, "wrong path");
599 assert!(active.is_none(), "should be unannounced");
600 }
601
602 pub fn assert_next_wait(&mut self) {
603 if let Some(res) = self.announced().now_or_never() {
604 panic!("next should block: got {:?}", res.map(|(path, _)| path));
605 }
606 }
607
608 }
617
618#[cfg(test)]
619mod tests {
620 use crate::Broadcast;
621
622 use super::*;
623
624 #[tokio::test]
625 async fn test_announce() {
626 tokio::time::pause();
627
628 let origin = Origin::produce();
629 let broadcast1 = Broadcast::produce();
630 let broadcast2 = Broadcast::produce();
631
632 let mut consumer1 = origin.consume();
633 consumer1.assert_next_wait();
635
636 origin.publish_broadcast("test1", broadcast1.consume());
638
639 consumer1.assert_next("test1", &broadcast1.consume());
640 consumer1.assert_next_wait();
641
642 let mut consumer2 = origin.consume();
645
646 origin.publish_broadcast("test2", broadcast2.consume());
648
649 consumer1.assert_next("test2", &broadcast2.consume());
650 consumer1.assert_next_wait();
651
652 consumer2.assert_next("test1", &broadcast1.consume());
653 consumer2.assert_next("test2", &broadcast2.consume());
654 consumer2.assert_next_wait();
655
656 drop(broadcast1);
658
659 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
661
662 consumer1.assert_next_none("test1");
664 consumer2.assert_next_none("test1");
665 consumer1.assert_next_wait();
666 consumer2.assert_next_wait();
667
668 let mut consumer3 = origin.consume();
670 consumer3.assert_next("test2", &broadcast2.consume());
671 consumer3.assert_next_wait();
672
673 drop(broadcast2);
675
676 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
678
679 consumer1.assert_next_none("test2");
680 consumer2.assert_next_none("test2");
681 consumer3.assert_next_none("test2");
682
683 }
689
690 #[tokio::test]
691 async fn test_duplicate() {
692 tokio::time::pause();
693
694 let origin = Origin::produce();
695
696 let broadcast1 = Broadcast::produce();
697 let broadcast2 = Broadcast::produce();
698 let broadcast3 = Broadcast::produce();
699
700 let consumer1 = broadcast1.consume();
701 let consumer2 = broadcast2.consume();
702 let consumer3 = broadcast3.consume();
703
704 let mut consumer = origin.consume();
705
706 origin.publish_broadcast("test", consumer1.clone());
707 origin.publish_broadcast("test", consumer2.clone());
708 origin.publish_broadcast("test", consumer3.clone());
709 assert!(consumer.consume_broadcast("test").is_some());
710
711 consumer.assert_next("test", &consumer1);
713 consumer.assert_next_wait();
714
715 drop(broadcast2);
717
718 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
720
721 assert!(consumer.consume_broadcast("test").is_some());
722 consumer.assert_next_wait();
723
724 drop(broadcast1);
726
727 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
729
730 assert!(consumer.consume_broadcast("test").is_some());
731 consumer.assert_next_none("test");
732 consumer.assert_next("test", &consumer3);
733
734 drop(broadcast3);
736
737 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
739 assert!(consumer.consume_broadcast("test").is_none());
740
741 consumer.assert_next_none("test");
742 consumer.assert_next_wait();
743 }
744
745 #[tokio::test]
746 async fn test_duplicate_fifo_order() {
747 tokio::time::pause();
748
749 let origin = Origin::produce();
750
751 let broadcast1 = Broadcast::produce();
752 let broadcast2 = Broadcast::produce();
753 let broadcast3 = Broadcast::produce();
754
755 let consumer1 = broadcast1.consume();
756 let consumer2 = broadcast2.consume();
757 let consumer3 = broadcast3.consume();
758
759 let mut consumer = origin.consume();
760
761 origin.publish_broadcast("test", consumer1.clone());
762 origin.publish_broadcast("test", consumer2.clone());
763 origin.publish_broadcast("test", consumer3.clone());
764
765 consumer.assert_next("test", &consumer1);
767 consumer.assert_next_wait();
768
769 drop(broadcast1);
771 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
772 consumer.assert_next_none("test");
773 consumer.assert_next("test", &consumer2);
774 consumer.assert_next_wait();
775
776 drop(broadcast2);
778 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
779 consumer.assert_next_none("test");
780 consumer.assert_next("test", &consumer3);
781 consumer.assert_next_wait();
782
783 drop(broadcast3);
785 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
786 consumer.assert_next_none("test");
787 consumer.assert_next_wait();
788 }
789
790 #[tokio::test]
791 async fn test_duplicate_reverse() {
792 tokio::time::pause();
793
794 let origin = Origin::produce();
795 let broadcast1 = Broadcast::produce();
796 let broadcast2 = Broadcast::produce();
797
798 origin.publish_broadcast("test", broadcast1.consume());
799 origin.publish_broadcast("test", broadcast2.consume());
800 assert!(origin.consume_broadcast("test").is_some());
801
802 drop(broadcast2);
804
805 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
807 assert!(origin.consume_broadcast("test").is_some());
808
809 drop(broadcast1);
810
811 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
813 assert!(origin.consume_broadcast("test").is_none());
814 }
815
816 #[tokio::test]
817 async fn test_double_publish() {
818 tokio::time::pause();
819
820 let origin = Origin::produce();
821 let broadcast = Broadcast::produce();
822
823 origin.publish_broadcast("test", broadcast.consume());
825 origin.publish_broadcast("test", broadcast.consume());
826
827 assert!(origin.consume_broadcast("test").is_some());
828
829 drop(broadcast);
830
831 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
833 assert!(origin.consume_broadcast("test").is_none());
834 }
835 #[tokio::test]
837 #[should_panic]
838 async fn test_128() {
839 let origin = Origin::produce();
840 let broadcast = Broadcast::produce();
841
842 let mut consumer = origin.consume();
843 for i in 0..256 {
844 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
845 }
846
847 for i in 0..256 {
848 consumer.assert_next(format!("test{i}"), &broadcast.consume());
849 }
850 }
851
852 #[tokio::test]
853 async fn test_128_fix() {
854 let origin = Origin::produce();
855 let broadcast = Broadcast::produce();
856
857 let mut consumer = origin.consume();
858 for i in 0..256 {
859 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
860 }
861
862 for i in 0..256 {
863 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
865 }
866 }
867
868 #[tokio::test]
869 async fn test_with_root_basic() {
870 let origin = Origin::produce();
871 let broadcast = Broadcast::produce();
872
873 let foo_producer = origin.with_root("foo").expect("should create root");
875 assert_eq!(foo_producer.root().as_str(), "foo");
876
877 let mut consumer = origin.consume();
878
879 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
881 consumer.assert_next("foo/bar/baz", &broadcast.consume());
883
884 let mut foo_consumer = foo_producer.consume();
886 foo_consumer.assert_next("bar/baz", &broadcast.consume());
887 }
888
889 #[tokio::test]
890 async fn test_with_root_nested() {
891 let origin = Origin::produce();
892 let broadcast = Broadcast::produce();
893
894 let foo_producer = origin.with_root("foo").expect("should create foo root");
896 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
897 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
898
899 let mut consumer = origin.consume();
900
901 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
903 consumer.assert_next("foo/bar/baz", &broadcast.consume());
905
906 let mut foo_bar_consumer = foo_bar_producer.consume();
908 foo_bar_consumer.assert_next("baz", &broadcast.consume());
909 }
910
911 #[tokio::test]
912 async fn test_publish_only_allows() {
913 let origin = Origin::produce();
914 let broadcast = Broadcast::produce();
915
916 let limited_producer = origin
918 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
919 .expect("should create limited producer");
920
921 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
923 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
924 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
925
926 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
928 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
930 }
931
932 #[tokio::test]
933 async fn test_publish_only_empty() {
934 let origin = Origin::produce();
935
936 assert!(origin.publish_only(&[]).is_none());
938 }
939
940 #[tokio::test]
941 async fn test_consume_only_filters() {
942 let origin = Origin::produce();
943 let broadcast1 = Broadcast::produce();
944 let broadcast2 = Broadcast::produce();
945 let broadcast3 = Broadcast::produce();
946
947 let mut consumer = origin.consume();
948
949 origin.publish_broadcast("allowed", broadcast1.consume());
951 origin.publish_broadcast("allowed/nested", broadcast2.consume());
952 origin.publish_broadcast("notallowed", broadcast3.consume());
953
954 let mut limited_consumer = origin
956 .consume_only(&["allowed".into()])
957 .expect("should create limited consumer");
958
959 limited_consumer.assert_next("allowed", &broadcast1.consume());
961 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
962 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
966 consumer.assert_next("allowed/nested", &broadcast2.consume());
967 consumer.assert_next("notallowed", &broadcast3.consume());
968 }
969
970 #[tokio::test]
971 async fn test_consume_only_multiple_prefixes() {
972 let origin = Origin::produce();
973 let broadcast1 = Broadcast::produce();
974 let broadcast2 = Broadcast::produce();
975 let broadcast3 = Broadcast::produce();
976
977 origin.publish_broadcast("foo/test", broadcast1.consume());
978 origin.publish_broadcast("bar/test", broadcast2.consume());
979 origin.publish_broadcast("baz/test", broadcast3.consume());
980
981 let mut limited_consumer = origin
983 .consume_only(&["foo".into(), "bar".into()])
984 .expect("should create limited consumer");
985
986 limited_consumer.assert_next("bar/test", &broadcast2.consume());
988 limited_consumer.assert_next("foo/test", &broadcast1.consume());
989 limited_consumer.assert_next_wait(); }
991
992 #[tokio::test]
993 async fn test_with_root_and_publish_only() {
994 let origin = Origin::produce();
995 let broadcast = Broadcast::produce();
996
997 let foo_producer = origin.with_root("foo").expect("should create foo root");
999
1000 let limited_producer = foo_producer
1002 .publish_only(&["bar".into(), "goop/pee".into()])
1003 .expect("should create limited producer");
1004
1005 let mut consumer = origin.consume();
1006
1007 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1009 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1010 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1011 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1012
1013 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1015 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1017
1018 consumer.assert_next("foo/bar", &broadcast.consume());
1020 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1021 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1022 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1023 }
1024
1025 #[tokio::test]
1026 async fn test_with_root_and_consume_only() {
1027 let origin = Origin::produce();
1028 let broadcast1 = Broadcast::produce();
1029 let broadcast2 = Broadcast::produce();
1030 let broadcast3 = Broadcast::produce();
1031
1032 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1034 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1035 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1036
1037 let foo_producer = origin.with_root("foo").expect("should create foo root");
1039
1040 let mut limited_consumer = foo_producer
1042 .consume_only(&["bar".into(), "goop/pee".into()])
1043 .expect("should create limited consumer");
1044
1045 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1047 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1048 limited_consumer.assert_next_wait(); }
1050
1051 #[tokio::test]
1052 async fn test_with_root_unauthorized() {
1053 let origin = Origin::produce();
1054
1055 let limited_producer = origin
1057 .publish_only(&["allowed".into()])
1058 .expect("should create limited producer");
1059
1060 assert!(limited_producer.with_root("notallowed").is_none());
1062
1063 let allowed_root = limited_producer
1065 .with_root("allowed")
1066 .expect("should create allowed root");
1067 assert_eq!(allowed_root.root().as_str(), "allowed");
1068 }
1069
1070 #[tokio::test]
1071 async fn test_wildcard_permission() {
1072 let origin = Origin::produce();
1073 let broadcast = Broadcast::produce();
1074
1075 let root_producer = origin.clone();
1077
1078 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1080 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1081
1082 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1084 assert_eq!(foo_producer.root().as_str(), "foo");
1085 }
1086
1087 #[tokio::test]
1088 async fn test_consume_broadcast_with_permissions() {
1089 let origin = Origin::produce();
1090 let broadcast1 = Broadcast::produce();
1091 let broadcast2 = Broadcast::produce();
1092
1093 origin.publish_broadcast("allowed/test", broadcast1.consume());
1094 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1095
1096 let limited_consumer = origin
1098 .consume_only(&["allowed".into()])
1099 .expect("should create limited consumer");
1100
1101 let result = limited_consumer.consume_broadcast("allowed/test");
1103 assert!(result.is_some());
1104 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1105
1106 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1108
1109 let consumer = origin.consume();
1111 assert!(consumer.consume_broadcast("allowed/test").is_some());
1112 assert!(consumer.consume_broadcast("notallowed/test").is_some());
1113 }
1114
1115 #[tokio::test]
1116 async fn test_nested_paths_with_permissions() {
1117 let origin = Origin::produce();
1118 let broadcast = Broadcast::produce();
1119
1120 let limited_producer = origin
1122 .publish_only(&["a/b/c".into()])
1123 .expect("should create limited producer");
1124
1125 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1127 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1128 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1129
1130 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1132 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1133 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1134 }
1135
1136 #[tokio::test]
1137 async fn test_multiple_consumers_with_different_permissions() {
1138 let origin = Origin::produce();
1139 let broadcast1 = Broadcast::produce();
1140 let broadcast2 = Broadcast::produce();
1141 let broadcast3 = Broadcast::produce();
1142
1143 origin.publish_broadcast("foo/test", broadcast1.consume());
1145 origin.publish_broadcast("bar/test", broadcast2.consume());
1146 origin.publish_broadcast("baz/test", broadcast3.consume());
1147
1148 let mut foo_consumer = origin
1150 .consume_only(&["foo".into()])
1151 .expect("should create foo consumer");
1152
1153 let mut bar_consumer = origin
1154 .consume_only(&["bar".into()])
1155 .expect("should create bar consumer");
1156
1157 let mut foobar_consumer = origin
1158 .consume_only(&["foo".into(), "bar".into()])
1159 .expect("should create foobar consumer");
1160
1161 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1163 foo_consumer.assert_next_wait();
1164
1165 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1166 bar_consumer.assert_next_wait();
1167
1168 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1169 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1170 foobar_consumer.assert_next_wait();
1171 }
1172
1173 #[tokio::test]
1174 async fn test_select_with_empty_prefix() {
1175 let origin = Origin::produce();
1176 let broadcast1 = Broadcast::produce();
1177 let broadcast2 = Broadcast::produce();
1178
1179 let demo_producer = origin.with_root("demo").expect("should create demo root");
1181 let limited_producer = demo_producer
1182 .publish_only(&["worm-node".into(), "foobar".into()])
1183 .expect("should create limited producer");
1184
1185 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1187 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1188
1189 let mut consumer = limited_producer
1191 .consume_only(&["".into()])
1192 .expect("should create consumer with empty prefix");
1193
1194 let a1 = consumer.try_announced().expect("expected first announcement");
1196 let a2 = consumer.try_announced().expect("expected second announcement");
1197 consumer.assert_next_wait();
1198
1199 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1200 paths.sort();
1201 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1202 }
1203
1204 #[tokio::test]
1205 async fn test_select_narrowing_scope() {
1206 let origin = Origin::produce();
1207 let broadcast1 = Broadcast::produce();
1208 let broadcast2 = Broadcast::produce();
1209 let broadcast3 = Broadcast::produce();
1210
1211 let demo_producer = origin.with_root("demo").expect("should create demo root");
1213 let limited_producer = demo_producer
1214 .publish_only(&["worm-node".into(), "foobar".into()])
1215 .expect("should create limited producer");
1216
1217 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1219 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1220 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1221
1222 let mut worm_consumer = limited_producer
1224 .consume_only(&["worm-node".into()])
1225 .expect("should create worm-node consumer");
1226
1227 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1229 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1230 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1234 .consume_only(&["worm-node/foo".into()])
1235 .expect("should create worm-node/foo consumer");
1236
1237 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1238 foo_consumer.assert_next_wait(); }
1240
1241 #[tokio::test]
1242 async fn test_select_multiple_roots_with_empty_prefix() {
1243 let origin = Origin::produce();
1244 let broadcast1 = Broadcast::produce();
1245 let broadcast2 = Broadcast::produce();
1246 let broadcast3 = Broadcast::produce();
1247
1248 let limited_producer = origin
1250 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1251 .expect("should create limited producer");
1252
1253 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1255 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1256 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1257
1258 let mut consumer = limited_producer
1260 .consume_only(&["".into()])
1261 .expect("should create consumer with empty prefix");
1262
1263 consumer.assert_next("app1/data", &broadcast1.consume());
1265 consumer.assert_next("app2/config", &broadcast2.consume());
1266 consumer.assert_next("shared/resource", &broadcast3.consume());
1267 consumer.assert_next_wait();
1268 }
1269
1270 #[tokio::test]
1271 async fn test_publish_only_with_empty_prefix() {
1272 let origin = Origin::produce();
1273 let broadcast = Broadcast::produce();
1274
1275 let limited_producer = origin
1277 .publish_only(&["services/api".into(), "services/web".into()])
1278 .expect("should create limited producer");
1279
1280 let same_producer = limited_producer
1282 .publish_only(&["".into()])
1283 .expect("should create producer with empty prefix");
1284
1285 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1287 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1288 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1289 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1290 }
1291
1292 #[tokio::test]
1293 async fn test_select_narrowing_to_deeper_path() {
1294 let origin = Origin::produce();
1295 let broadcast1 = Broadcast::produce();
1296 let broadcast2 = Broadcast::produce();
1297 let broadcast3 = Broadcast::produce();
1298
1299 let limited_producer = origin
1301 .publish_only(&["org".into()])
1302 .expect("should create limited producer");
1303
1304 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1306 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1307 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1308
1309 let mut team2_consumer = limited_producer
1311 .consume_only(&["org/team2".into()])
1312 .expect("should create team2 consumer");
1313
1314 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1315 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1319 .consume_only(&["org/team1/project1".into()])
1320 .expect("should create project1 consumer");
1321
1322 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1324 project1_consumer.assert_next_wait();
1325 }
1326
1327 #[tokio::test]
1328 async fn test_select_with_non_matching_prefix() {
1329 let origin = Origin::produce();
1330
1331 let limited_producer = origin
1333 .publish_only(&["allowed/path".into()])
1334 .expect("should create limited producer");
1335
1336 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1338
1339 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1341 }
1342
1343 #[tokio::test]
1346 async fn test_with_root_trailing_slash_consumer() {
1347 let origin = Origin::produce();
1348
1349 let prefix = "some_prefix/".to_string();
1351 let mut consumer = origin.consume().with_root(prefix).unwrap();
1352
1353 let b = origin.create_broadcast("some_prefix/test").unwrap();
1354 consumer.assert_next("test", &b.consume());
1355 }
1356
1357 #[tokio::test]
1359 async fn test_with_root_trailing_slash_producer() {
1360 let origin = Origin::produce();
1361
1362 let prefix = "some_prefix/".to_string();
1364 let rooted = origin.with_root(prefix).unwrap();
1365
1366 let b = rooted.create_broadcast("test").unwrap();
1367
1368 let mut consumer = rooted.consume();
1369 consumer.assert_next("test", &b.consume());
1370 }
1371
1372 #[tokio::test]
1374 async fn test_with_root_trailing_slash_unannounce() {
1375 tokio::time::pause();
1376
1377 let origin = Origin::produce();
1378
1379 let prefix = "some_prefix/".to_string();
1380 let mut consumer = origin.consume().with_root(prefix).unwrap();
1381
1382 let b = origin.create_broadcast("some_prefix/test").unwrap();
1383 consumer.assert_next("test", &b.consume());
1384
1385 drop(b);
1387 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1388
1389 consumer.assert_next_none("test");
1391 }
1392
1393 #[tokio::test]
1394 async fn test_select_maintains_access_with_wider_prefix() {
1395 let origin = Origin::produce();
1396 let broadcast1 = Broadcast::produce();
1397 let broadcast2 = Broadcast::produce();
1398
1399 let demo_producer = origin.with_root("demo").expect("should create demo root");
1401 let user_producer = demo_producer
1402 .publish_only(&["worm-node".into(), "foobar".into()])
1403 .expect("should create user producer");
1404
1405 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1407 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1408
1409 let mut consumer = user_producer
1411 .consume_only(&["".into()])
1412 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1413
1414 let a1 = consumer.try_announced().expect("expected first announcement");
1416 let a2 = consumer.try_announced().expect("expected second announcement");
1417 consumer.assert_next_wait();
1418
1419 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1420 paths.sort();
1421 assert_eq!(paths, ["foobar", "worm-node/data"]);
1422
1423 let mut narrow_consumer = user_producer
1425 .consume_only(&["worm-node".into()])
1426 .expect("should be able to narrow scope to worm-node");
1427
1428 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1429 narrow_consumer.assert_next_wait(); }
1431
1432 #[tokio::test]
1433 async fn test_duplicate_prefixes_deduped() {
1434 let origin = Origin::produce();
1435 let broadcast = Broadcast::produce();
1436
1437 let producer = origin
1439 .publish_only(&["demo".into(), "demo".into()])
1440 .expect("should create producer");
1441
1442 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1443
1444 let mut consumer = producer.consume();
1445 consumer.assert_next("demo/stream", &broadcast.consume());
1446 consumer.assert_next_wait();
1447 }
1448
1449 #[tokio::test]
1450 async fn test_overlapping_prefixes_deduped() {
1451 let origin = Origin::produce();
1452 let broadcast = Broadcast::produce();
1453
1454 let producer = origin
1456 .publish_only(&["demo".into(), "demo/foo".into()])
1457 .expect("should create producer");
1458
1459 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1461
1462 let mut consumer = producer.consume();
1463 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1464 consumer.assert_next_wait();
1465 }
1466
1467 #[tokio::test]
1468 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1469 let origin = Origin::produce();
1470 let broadcast = Broadcast::produce();
1471
1472 let producer = origin
1474 .publish_only(&["demo".into(), "demo/foo".into()])
1475 .expect("should create producer");
1476
1477 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1478
1479 let mut consumer = producer.consume();
1480 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1482 consumer.assert_next_wait();
1483 }
1484
1485 #[tokio::test]
1486 async fn test_allowed_returns_deduped_prefixes() {
1487 let origin = Origin::produce();
1488
1489 let producer = origin
1490 .publish_only(&["demo".into(), "demo/foo".into(), "anon".into()])
1491 .expect("should create producer");
1492
1493 let allowed: Vec<_> = producer.allowed().collect();
1494 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1495 }
1496}