1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17 fn new() -> Self {
18 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19 }
20}
21
22struct OriginBroadcast {
24 path: PathOwned,
25 active: BroadcastConsumer,
26 backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31 root: PathOwned,
32 tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38 self.tx.send((path, Some(broadcast))).expect("consumer closed");
39 }
40
41 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43 self.tx.send((path.clone(), None)).expect("consumer closed");
44 self.tx.send((path, Some(broadcast))).expect("consumer closed");
45 }
46
47 fn unannounce(&self, path: impl AsPath) {
48 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49 self.tx.send((path, None)).expect("consumer closed");
50 }
51}
52
53struct NotifyNode {
54 parent: Option<Lock<NotifyNode>>,
55
56 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63 Self {
64 parent,
65 consumers: HashMap::new(),
66 }
67 }
68
69 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70 for consumer in self.consumers.values() {
71 consumer.announce(path.as_path(), broadcast.clone());
72 }
73
74 if let Some(parent) = &self.parent {
75 parent.lock().announce(path, broadcast);
76 }
77 }
78
79 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80 for consumer in self.consumers.values() {
81 consumer.reannounce(path.as_path(), broadcast.clone());
82 }
83
84 if let Some(parent) = &self.parent {
85 parent.lock().reannounce(path, broadcast);
86 }
87 }
88
89 fn unannounce(&mut self, path: impl AsPath) {
90 for consumer in self.consumers.values() {
91 consumer.unannounce(path.as_path());
92 }
93
94 if let Some(parent) = &self.parent {
95 parent.lock().unannounce(path);
96 }
97 }
98}
99
100struct OriginNode {
101 broadcast: Option<OriginBroadcast>,
103
104 nested: HashMap<String, Lock<OriginNode>>,
106
107 notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113 Self {
114 broadcast: None,
115 nested: HashMap::new(),
116 notify: Lock::new(NotifyNode::new(parent)),
117 }
118 }
119
120 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121 let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123 let next = self.entry(dir);
124 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125 }
126
127 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128 match self.nested.get(dir) {
129 Some(next) => next.clone(),
130 None => {
131 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132 self.nested.insert(dir.to_string(), next.clone());
133 next
134 }
135 }
136 }
137
138 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139 let full = full.as_path();
140 let rest = relative.as_path();
141
142 if let Some((dir, relative)) = rest.next_part() {
144 self.entry(dir).lock().publish(&full, broadcast, &relative);
146 } else if let Some(existing) = &mut self.broadcast {
147 let old = existing.active.clone();
149 existing.active = broadcast.clone();
150 existing.backup.push(old);
151
152 self.notify.lock().reannounce(full, broadcast);
153 } else {
154 self.broadcast = Some(OriginBroadcast {
156 path: full.to_owned(),
157 active: broadcast.clone(),
158 backup: Vec::new(),
159 });
160 self.notify.lock().announce(full, broadcast);
161 }
162 }
163
164 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165 self.consume_initial(&mut notify);
166 self.notify.lock().consumers.insert(id, notify);
167 }
168
169 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170 if let Some(broadcast) = &self.broadcast {
171 notify.announce(&broadcast.path, broadcast.active.clone());
172 }
173
174 for nested in self.nested.values() {
176 nested.lock().consume_initial(notify);
177 }
178 }
179
180 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181 let rest = rest.as_path();
182
183 if let Some((dir, rest)) = rest.next_part() {
184 let node = self.nested.get(dir)?.lock();
185 node.consume_broadcast(&rest)
186 } else {
187 self.broadcast.as_ref().map(|b| b.active.clone())
188 }
189 }
190
191 fn unconsume(&mut self, id: ConsumerId) {
192 self.notify.lock().consumers.remove(&id).expect("consumer not found");
193 if self.is_empty() {
194 }
197 }
198
199 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201 let full = full.as_path();
202 let relative = relative.as_path();
203
204 if let Some((dir, relative)) = relative.next_part() {
205 let nested = self.entry(dir);
206 let mut locked = nested.lock();
207 locked.remove(&full, broadcast, &relative);
208
209 if locked.is_empty() {
210 drop(locked);
211 self.nested.remove(dir);
212 }
213 } else {
214 let entry = match &mut self.broadcast {
215 Some(existing) => existing,
216 None => return,
217 };
218
219 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221 if let Some(pos) = pos {
222 entry.backup.remove(pos);
223 return;
225 }
226
227 assert!(entry.active.is_clone(&broadcast));
229
230 if let Some(active) = entry.backup.pop() {
232 entry.active = active;
233 self.notify.lock().reannounce(full, &entry.active);
234 } else {
235 self.broadcast = None;
237 self.notify.lock().unannounce(full);
238 }
239 }
240 }
241
242 fn is_empty(&self) -> bool {
243 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244 }
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
256 let mut roots = Vec::new();
257
258 for (root, state) in &self.nodes {
259 for prefix in prefixes {
260 if root.has_prefix(prefix) {
261 roots.push((root.to_owned(), state.clone()));
263 continue;
264 }
265
266 if let Some(suffix) = prefix.strip_prefix(root) {
267 let nested = state.lock().leaf(&suffix);
269 roots.push((prefix.to_owned(), nested));
270 }
271 }
272 }
273
274 if roots.is_empty() {
275 None
276 } else {
277 Some(Self { nodes: roots })
278 }
279 }
280
281 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282 let new_root = new_root.as_path();
283 let mut roots = Vec::new();
284
285 if new_root.is_empty() {
286 return Some(self.clone());
287 }
288
289 for (root, state) in &self.nodes {
290 if let Some(suffix) = root.strip_prefix(&new_root) {
291 roots.push((suffix.to_owned(), state.clone()));
293 } else if let Some(suffix) = new_root.strip_prefix(root) {
294 let nested = state.lock().leaf(&suffix);
297 roots.push(("".into(), nested));
298 }
299 }
300
301 if roots.is_empty() {
302 None
303 } else {
304 Some(Self { nodes: roots })
305 }
306 }
307
308 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310 let path = path.as_path();
311
312 for (root, state) in &self.nodes {
313 if let Some(suffix) = path.strip_prefix(root) {
314 return Some((state.clone(), suffix.to_owned()));
315 }
316 }
317
318 None
319 }
320}
321
322impl Default for OriginNodes {
323 fn default() -> Self {
324 Self {
325 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326 }
327 }
328}
329
330pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333pub struct Origin {}
335
336impl Origin {
337 pub fn produce() -> OriginProducer {
338 OriginProducer::new()
339 }
340}
341
342#[derive(Clone, Default)]
344pub struct OriginProducer {
345 nodes: OriginNodes,
348
349 root: PathOwned,
351}
352
353impl OriginProducer {
354 pub fn new() -> Self {
355 Self::default()
356 }
357
358 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
363 let broadcast = Broadcast::produce();
364 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
365 }
366
367 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
376 let path = path.as_path();
377
378 let (root, rest) = match self.nodes.get(&path) {
379 Some(root) => root,
380 None => return false,
381 };
382
383 let full = self.root.join(&path);
384
385 root.lock().publish(&full, &broadcast, &rest);
386 let root = root.clone();
387
388 web_async::spawn(async move {
389 broadcast.closed().await;
390 root.lock().remove(&full, broadcast, &rest);
391 });
392
393 true
394 }
395
396 pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
401 let prefixes = PathPrefixes::new(prefixes);
402 Some(OriginProducer {
403 nodes: self.nodes.select(&prefixes)?,
404 root: self.root.clone(),
405 })
406 }
407
408 pub fn consume(&self) -> OriginConsumer {
410 OriginConsumer::new(self.root.clone(), self.nodes.clone())
411 }
412
413 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
418 let prefixes = PathPrefixes::new(prefixes);
419 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
420 }
421
422 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
426 let path = path.as_path();
427 let (root, rest) = self.nodes.get(&path)?;
428 let state = root.lock();
429 state.consume_broadcast(&rest)
430 }
431
432 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
436 let prefix = prefix.as_path();
437
438 Some(Self {
439 root: self.root.join(&prefix).to_owned(),
440 nodes: self.nodes.root(&prefix)?,
441 })
442 }
443
444 pub fn root(&self) -> &Path<'_> {
446 &self.root
447 }
448
449 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
451 self.nodes.nodes.iter().map(|(root, _)| root)
452 }
453
454 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
456 self.root.join(path)
457 }
458}
459
460pub struct OriginConsumer {
464 id: ConsumerId,
465 nodes: OriginNodes,
466 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
467
468 root: PathOwned,
470}
471
472impl OriginConsumer {
473 fn new(root: PathOwned, nodes: OriginNodes) -> Self {
474 let (tx, rx) = mpsc::unbounded_channel();
475
476 let id = ConsumerId::new();
477
478 for (_, state) in &nodes.nodes {
479 let notify = OriginConsumerNotify {
480 root: root.clone(),
481 tx: tx.clone(),
482 };
483 state.lock().consume(id, notify);
484 }
485
486 Self {
487 id,
488 nodes,
489 updates: rx,
490 root,
491 }
492 }
493
494 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
502 self.updates.recv().await
503 }
504
505 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
510 self.updates.try_recv().ok()
511 }
512
513 pub fn consume(&self) -> Self {
514 self.clone()
515 }
516
517 pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
523 let path = path.as_path();
524 let (root, rest) = self.nodes.get(&path)?;
525 let state = root.lock();
526 state.consume_broadcast(&rest)
527 }
528
529 pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
534 let prefixes = PathPrefixes::new(prefixes);
535 Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
536 }
537
538 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
542 let prefix = prefix.as_path();
543
544 Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
545 }
546
547 pub fn root(&self) -> &Path<'_> {
549 &self.root
550 }
551
552 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
554 self.nodes.nodes.iter().map(|(root, _)| root)
555 }
556
557 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
559 self.root.join(path)
560 }
561}
562
563impl Drop for OriginConsumer {
564 fn drop(&mut self) {
565 for (_, root) in &self.nodes.nodes {
566 root.lock().unconsume(self.id);
567 }
568 }
569}
570
571impl Clone for OriginConsumer {
572 fn clone(&self) -> Self {
573 OriginConsumer::new(self.root.clone(), self.nodes.clone())
574 }
575}
576
577#[cfg(test)]
578use futures::FutureExt;
579
580#[cfg(test)]
581impl OriginConsumer {
582 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
583 let expected = expected.as_path();
584 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
585 assert_eq!(path, expected, "wrong path");
586 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
587 }
588
589 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
590 let expected = expected.as_path();
591 let (path, active) = self.try_announced().expect("no next");
592 assert_eq!(path, expected, "wrong path");
593 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
594 }
595
596 pub fn assert_next_none(&mut self, expected: impl AsPath) {
597 let expected = expected.as_path();
598 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
599 assert_eq!(path, expected, "wrong path");
600 assert!(active.is_none(), "should be unannounced");
601 }
602
603 pub fn assert_next_wait(&mut self) {
604 if let Some(res) = self.announced().now_or_never() {
605 panic!("next should block: got {:?}", res.map(|(path, _)| path));
606 }
607 }
608
609 }
618
619#[cfg(test)]
620mod tests {
621 use crate::Broadcast;
622
623 use super::*;
624
625 #[tokio::test]
626 async fn test_announce() {
627 let origin = Origin::produce();
628 let broadcast1 = Broadcast::produce();
629 let broadcast2 = Broadcast::produce();
630
631 let mut consumer1 = origin.consume();
632 consumer1.assert_next_wait();
634
635 origin.publish_broadcast("test1", broadcast1.consume());
637
638 consumer1.assert_next("test1", &broadcast1.consume());
639 consumer1.assert_next_wait();
640
641 let mut consumer2 = origin.consume();
644
645 origin.publish_broadcast("test2", broadcast2.consume());
647
648 consumer1.assert_next("test2", &broadcast2.consume());
649 consumer1.assert_next_wait();
650
651 consumer2.assert_next("test1", &broadcast1.consume());
652 consumer2.assert_next("test2", &broadcast2.consume());
653 consumer2.assert_next_wait();
654
655 drop(broadcast1);
657
658 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
660
661 consumer1.assert_next_none("test1");
663 consumer2.assert_next_none("test1");
664 consumer1.assert_next_wait();
665 consumer2.assert_next_wait();
666
667 let mut consumer3 = origin.consume();
669 consumer3.assert_next("test2", &broadcast2.consume());
670 consumer3.assert_next_wait();
671
672 drop(broadcast2);
674
675 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
677
678 consumer1.assert_next_none("test2");
679 consumer2.assert_next_none("test2");
680 consumer3.assert_next_none("test2");
681
682 }
688
689 #[tokio::test]
690 async fn test_duplicate() {
691 let origin = Origin::produce();
692
693 let broadcast1 = Broadcast::produce();
694 let broadcast2 = Broadcast::produce();
695 let broadcast3 = Broadcast::produce();
696
697 let consumer1 = broadcast1.consume();
698 let consumer2 = broadcast2.consume();
699 let consumer3 = broadcast3.consume();
700
701 let mut consumer = origin.consume();
702
703 origin.publish_broadcast("test", consumer1.clone());
704 origin.publish_broadcast("test", consumer2.clone());
705 origin.publish_broadcast("test", consumer3.clone());
706 assert!(consumer.consume_broadcast("test").is_some());
707
708 consumer.assert_next("test", &consumer1);
709 consumer.assert_next_none("test");
710 consumer.assert_next("test", &consumer2);
711 consumer.assert_next_none("test");
712 consumer.assert_next("test", &consumer3);
713
714 drop(broadcast2);
716
717 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
719
720 assert!(consumer.consume_broadcast("test").is_some());
721 consumer.assert_next_wait();
722
723 drop(broadcast3);
725
726 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
728
729 assert!(consumer.consume_broadcast("test").is_some());
730 consumer.assert_next_none("test");
731 consumer.assert_next("test", &consumer1);
732
733 drop(broadcast1);
735
736 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
738 assert!(consumer.consume_broadcast("test").is_none());
739
740 consumer.assert_next_none("test");
741 consumer.assert_next_wait();
742 }
743
744 #[tokio::test]
745 async fn test_duplicate_reverse() {
746 let origin = Origin::produce();
747 let broadcast1 = Broadcast::produce();
748 let broadcast2 = Broadcast::produce();
749
750 origin.publish_broadcast("test", broadcast1.consume());
751 origin.publish_broadcast("test", broadcast2.consume());
752 assert!(origin.consume_broadcast("test").is_some());
753
754 drop(broadcast2);
756
757 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
759 assert!(origin.consume_broadcast("test").is_some());
760
761 drop(broadcast1);
762
763 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
765 assert!(origin.consume_broadcast("test").is_none());
766 }
767
768 #[tokio::test]
769 async fn test_double_publish() {
770 let origin = Origin::produce();
771 let broadcast = Broadcast::produce();
772
773 origin.publish_broadcast("test", broadcast.consume());
775 origin.publish_broadcast("test", broadcast.consume());
776
777 assert!(origin.consume_broadcast("test").is_some());
778
779 drop(broadcast);
780
781 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
783 assert!(origin.consume_broadcast("test").is_none());
784 }
785 #[tokio::test]
787 #[should_panic]
788 async fn test_128() {
789 let origin = Origin::produce();
790 let broadcast = Broadcast::produce();
791
792 let mut consumer = origin.consume();
793 for i in 0..256 {
794 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
795 }
796
797 for i in 0..256 {
798 consumer.assert_next(format!("test{i}"), &broadcast.consume());
799 }
800 }
801
802 #[tokio::test]
803 async fn test_128_fix() {
804 let origin = Origin::produce();
805 let broadcast = Broadcast::produce();
806
807 let mut consumer = origin.consume();
808 for i in 0..256 {
809 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
810 }
811
812 for i in 0..256 {
813 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
815 }
816 }
817
818 #[tokio::test]
819 async fn test_with_root_basic() {
820 let origin = Origin::produce();
821 let broadcast = Broadcast::produce();
822
823 let foo_producer = origin.with_root("foo").expect("should create root");
825 assert_eq!(foo_producer.root().as_str(), "foo");
826
827 let mut consumer = origin.consume();
828
829 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
831 consumer.assert_next("foo/bar/baz", &broadcast.consume());
833
834 let mut foo_consumer = foo_producer.consume();
836 foo_consumer.assert_next("bar/baz", &broadcast.consume());
837 }
838
839 #[tokio::test]
840 async fn test_with_root_nested() {
841 let origin = Origin::produce();
842 let broadcast = Broadcast::produce();
843
844 let foo_producer = origin.with_root("foo").expect("should create foo root");
846 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
847 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
848
849 let mut consumer = origin.consume();
850
851 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
853 consumer.assert_next("foo/bar/baz", &broadcast.consume());
855
856 let mut foo_bar_consumer = foo_bar_producer.consume();
858 foo_bar_consumer.assert_next("baz", &broadcast.consume());
859 }
860
861 #[tokio::test]
862 async fn test_publish_only_allows() {
863 let origin = Origin::produce();
864 let broadcast = Broadcast::produce();
865
866 let limited_producer = origin
868 .publish_only(&["allowed/path1".into(), "allowed/path2".into()])
869 .expect("should create limited producer");
870
871 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
873 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
874 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
875
876 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
878 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
880 }
881
882 #[tokio::test]
883 async fn test_publish_only_empty() {
884 let origin = Origin::produce();
885
886 assert!(origin.publish_only(&[]).is_none());
888 }
889
890 #[tokio::test]
891 async fn test_consume_only_filters() {
892 let origin = Origin::produce();
893 let broadcast1 = Broadcast::produce();
894 let broadcast2 = Broadcast::produce();
895 let broadcast3 = Broadcast::produce();
896
897 let mut consumer = origin.consume();
898
899 origin.publish_broadcast("allowed", broadcast1.consume());
901 origin.publish_broadcast("allowed/nested", broadcast2.consume());
902 origin.publish_broadcast("notallowed", broadcast3.consume());
903
904 let mut limited_consumer = origin
906 .consume_only(&["allowed".into()])
907 .expect("should create limited consumer");
908
909 limited_consumer.assert_next("allowed", &broadcast1.consume());
911 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
912 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
916 consumer.assert_next("allowed/nested", &broadcast2.consume());
917 consumer.assert_next("notallowed", &broadcast3.consume());
918 }
919
920 #[tokio::test]
921 async fn test_consume_only_multiple_prefixes() {
922 let origin = Origin::produce();
923 let broadcast1 = Broadcast::produce();
924 let broadcast2 = Broadcast::produce();
925 let broadcast3 = Broadcast::produce();
926
927 origin.publish_broadcast("foo/test", broadcast1.consume());
928 origin.publish_broadcast("bar/test", broadcast2.consume());
929 origin.publish_broadcast("baz/test", broadcast3.consume());
930
931 let mut limited_consumer = origin
933 .consume_only(&["foo".into(), "bar".into()])
934 .expect("should create limited consumer");
935
936 limited_consumer.assert_next("bar/test", &broadcast2.consume());
938 limited_consumer.assert_next("foo/test", &broadcast1.consume());
939 limited_consumer.assert_next_wait(); }
941
942 #[tokio::test]
943 async fn test_with_root_and_publish_only() {
944 let origin = Origin::produce();
945 let broadcast = Broadcast::produce();
946
947 let foo_producer = origin.with_root("foo").expect("should create foo root");
949
950 let limited_producer = foo_producer
952 .publish_only(&["bar".into(), "goop/pee".into()])
953 .expect("should create limited producer");
954
955 let mut consumer = origin.consume();
956
957 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
959 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
960 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
961 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
962
963 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
965 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
967
968 consumer.assert_next("foo/bar", &broadcast.consume());
970 consumer.assert_next("foo/bar/nested", &broadcast.consume());
971 consumer.assert_next("foo/goop/pee", &broadcast.consume());
972 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
973 }
974
975 #[tokio::test]
976 async fn test_with_root_and_consume_only() {
977 let origin = Origin::produce();
978 let broadcast1 = Broadcast::produce();
979 let broadcast2 = Broadcast::produce();
980 let broadcast3 = Broadcast::produce();
981
982 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
984 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
985 origin.publish_broadcast("foo/other/test", broadcast3.consume());
986
987 let foo_producer = origin.with_root("foo").expect("should create foo root");
989
990 let mut limited_consumer = foo_producer
992 .consume_only(&["bar".into(), "goop/pee".into()])
993 .expect("should create limited consumer");
994
995 limited_consumer.assert_next("bar/test", &broadcast1.consume());
997 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
998 limited_consumer.assert_next_wait(); }
1000
1001 #[tokio::test]
1002 async fn test_with_root_unauthorized() {
1003 let origin = Origin::produce();
1004
1005 let limited_producer = origin
1007 .publish_only(&["allowed".into()])
1008 .expect("should create limited producer");
1009
1010 assert!(limited_producer.with_root("notallowed").is_none());
1012
1013 let allowed_root = limited_producer
1015 .with_root("allowed")
1016 .expect("should create allowed root");
1017 assert_eq!(allowed_root.root().as_str(), "allowed");
1018 }
1019
1020 #[tokio::test]
1021 async fn test_wildcard_permission() {
1022 let origin = Origin::produce();
1023 let broadcast = Broadcast::produce();
1024
1025 let root_producer = origin.clone();
1027
1028 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1030 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1031
1032 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1034 assert_eq!(foo_producer.root().as_str(), "foo");
1035 }
1036
1037 #[tokio::test]
1038 async fn test_consume_broadcast_with_permissions() {
1039 let origin = Origin::produce();
1040 let broadcast1 = Broadcast::produce();
1041 let broadcast2 = Broadcast::produce();
1042
1043 origin.publish_broadcast("allowed/test", broadcast1.consume());
1044 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1045
1046 let limited_consumer = origin
1048 .consume_only(&["allowed".into()])
1049 .expect("should create limited consumer");
1050
1051 let result = limited_consumer.consume_broadcast("allowed/test");
1053 assert!(result.is_some());
1054 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1055
1056 assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1058
1059 let consumer = origin.consume();
1061 assert!(consumer.consume_broadcast("allowed/test").is_some());
1062 assert!(consumer.consume_broadcast("notallowed/test").is_some());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_nested_paths_with_permissions() {
1067 let origin = Origin::produce();
1068 let broadcast = Broadcast::produce();
1069
1070 let limited_producer = origin
1072 .publish_only(&["a/b/c".into()])
1073 .expect("should create limited producer");
1074
1075 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1077 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1078 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1079
1080 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1082 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1083 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1084 }
1085
1086 #[tokio::test]
1087 async fn test_multiple_consumers_with_different_permissions() {
1088 let origin = Origin::produce();
1089 let broadcast1 = Broadcast::produce();
1090 let broadcast2 = Broadcast::produce();
1091 let broadcast3 = Broadcast::produce();
1092
1093 origin.publish_broadcast("foo/test", broadcast1.consume());
1095 origin.publish_broadcast("bar/test", broadcast2.consume());
1096 origin.publish_broadcast("baz/test", broadcast3.consume());
1097
1098 let mut foo_consumer = origin
1100 .consume_only(&["foo".into()])
1101 .expect("should create foo consumer");
1102
1103 let mut bar_consumer = origin
1104 .consume_only(&["bar".into()])
1105 .expect("should create bar consumer");
1106
1107 let mut foobar_consumer = origin
1108 .consume_only(&["foo".into(), "bar".into()])
1109 .expect("should create foobar consumer");
1110
1111 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1113 foo_consumer.assert_next_wait();
1114
1115 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1116 bar_consumer.assert_next_wait();
1117
1118 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1119 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1120 foobar_consumer.assert_next_wait();
1121 }
1122
1123 #[tokio::test]
1124 async fn test_select_with_empty_prefix() {
1125 let origin = Origin::produce();
1126 let broadcast1 = Broadcast::produce();
1127 let broadcast2 = Broadcast::produce();
1128
1129 let demo_producer = origin.with_root("demo").expect("should create demo root");
1131 let limited_producer = demo_producer
1132 .publish_only(&["worm-node".into(), "foobar".into()])
1133 .expect("should create limited producer");
1134
1135 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1137 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1138
1139 let mut consumer = limited_producer
1141 .consume_only(&["".into()])
1142 .expect("should create consumer with empty prefix");
1143
1144 let a1 = consumer.try_announced().expect("expected first announcement");
1146 let a2 = consumer.try_announced().expect("expected second announcement");
1147 consumer.assert_next_wait();
1148
1149 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1150 paths.sort();
1151 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1152 }
1153
1154 #[tokio::test]
1155 async fn test_select_narrowing_scope() {
1156 let origin = Origin::produce();
1157 let broadcast1 = Broadcast::produce();
1158 let broadcast2 = Broadcast::produce();
1159 let broadcast3 = Broadcast::produce();
1160
1161 let demo_producer = origin.with_root("demo").expect("should create demo root");
1163 let limited_producer = demo_producer
1164 .publish_only(&["worm-node".into(), "foobar".into()])
1165 .expect("should create limited producer");
1166
1167 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1169 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1170 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1171
1172 let mut worm_consumer = limited_producer
1174 .consume_only(&["worm-node".into()])
1175 .expect("should create worm-node consumer");
1176
1177 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1179 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1180 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1184 .consume_only(&["worm-node/foo".into()])
1185 .expect("should create worm-node/foo consumer");
1186
1187 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1188 foo_consumer.assert_next_wait(); }
1190
1191 #[tokio::test]
1192 async fn test_select_multiple_roots_with_empty_prefix() {
1193 let origin = Origin::produce();
1194 let broadcast1 = Broadcast::produce();
1195 let broadcast2 = Broadcast::produce();
1196 let broadcast3 = Broadcast::produce();
1197
1198 let limited_producer = origin
1200 .publish_only(&["app1".into(), "app2".into(), "shared".into()])
1201 .expect("should create limited producer");
1202
1203 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1205 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1206 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1207
1208 let mut consumer = limited_producer
1210 .consume_only(&["".into()])
1211 .expect("should create consumer with empty prefix");
1212
1213 consumer.assert_next("app1/data", &broadcast1.consume());
1215 consumer.assert_next("app2/config", &broadcast2.consume());
1216 consumer.assert_next("shared/resource", &broadcast3.consume());
1217 consumer.assert_next_wait();
1218 }
1219
1220 #[tokio::test]
1221 async fn test_publish_only_with_empty_prefix() {
1222 let origin = Origin::produce();
1223 let broadcast = Broadcast::produce();
1224
1225 let limited_producer = origin
1227 .publish_only(&["services/api".into(), "services/web".into()])
1228 .expect("should create limited producer");
1229
1230 let same_producer = limited_producer
1232 .publish_only(&["".into()])
1233 .expect("should create producer with empty prefix");
1234
1235 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1237 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1238 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1239 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1240 }
1241
1242 #[tokio::test]
1243 async fn test_select_narrowing_to_deeper_path() {
1244 let origin = Origin::produce();
1245 let broadcast1 = Broadcast::produce();
1246 let broadcast2 = Broadcast::produce();
1247 let broadcast3 = Broadcast::produce();
1248
1249 let limited_producer = origin
1251 .publish_only(&["org".into()])
1252 .expect("should create limited producer");
1253
1254 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1256 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1257 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1258
1259 let mut team2_consumer = limited_producer
1261 .consume_only(&["org/team2".into()])
1262 .expect("should create team2 consumer");
1263
1264 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1265 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1269 .consume_only(&["org/team1/project1".into()])
1270 .expect("should create project1 consumer");
1271
1272 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1274 project1_consumer.assert_next_wait();
1275 }
1276
1277 #[tokio::test]
1278 async fn test_select_with_non_matching_prefix() {
1279 let origin = Origin::produce();
1280
1281 let limited_producer = origin
1283 .publish_only(&["allowed/path".into()])
1284 .expect("should create limited producer");
1285
1286 assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1288
1289 assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1291 }
1292
1293 #[tokio::test]
1296 async fn test_with_root_trailing_slash_consumer() {
1297 let origin = Origin::produce();
1298
1299 let prefix = "some_prefix/".to_string();
1301 let mut consumer = origin.consume().with_root(prefix).unwrap();
1302
1303 let b = origin.create_broadcast("some_prefix/test").unwrap();
1304 consumer.assert_next("test", &b.consume());
1305 }
1306
1307 #[tokio::test]
1309 async fn test_with_root_trailing_slash_producer() {
1310 let origin = Origin::produce();
1311
1312 let prefix = "some_prefix/".to_string();
1314 let rooted = origin.with_root(prefix).unwrap();
1315
1316 let b = rooted.create_broadcast("test").unwrap();
1317
1318 let mut consumer = rooted.consume();
1319 consumer.assert_next("test", &b.consume());
1320 }
1321
1322 #[tokio::test]
1324 async fn test_with_root_trailing_slash_unannounce() {
1325 let origin = Origin::produce();
1326
1327 let prefix = "some_prefix/".to_string();
1328 let mut consumer = origin.consume().with_root(prefix).unwrap();
1329
1330 let b = origin.create_broadcast("some_prefix/test").unwrap();
1331 consumer.assert_next("test", &b.consume());
1332
1333 drop(b);
1335 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1336
1337 consumer.assert_next_none("test");
1339 }
1340
1341 #[tokio::test]
1342 async fn test_select_maintains_access_with_wider_prefix() {
1343 let origin = Origin::produce();
1344 let broadcast1 = Broadcast::produce();
1345 let broadcast2 = Broadcast::produce();
1346
1347 let demo_producer = origin.with_root("demo").expect("should create demo root");
1349 let user_producer = demo_producer
1350 .publish_only(&["worm-node".into(), "foobar".into()])
1351 .expect("should create user producer");
1352
1353 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1355 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1356
1357 let mut consumer = user_producer
1359 .consume_only(&["".into()])
1360 .expect("consume_only with empty prefix should not fail when user has specific permissions");
1361
1362 let a1 = consumer.try_announced().expect("expected first announcement");
1364 let a2 = consumer.try_announced().expect("expected second announcement");
1365 consumer.assert_next_wait();
1366
1367 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1368 paths.sort();
1369 assert_eq!(paths, ["foobar", "worm-node/data"]);
1370
1371 let mut narrow_consumer = user_producer
1373 .consume_only(&["worm-node".into()])
1374 .expect("should be able to narrow scope to worm-node");
1375
1376 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1377 narrow_consumer.assert_next_wait(); }
1379
1380 #[tokio::test]
1381 async fn test_duplicate_prefixes_deduped() {
1382 let origin = Origin::produce();
1383 let broadcast = Broadcast::produce();
1384
1385 let producer = origin
1387 .publish_only(&["demo".into(), "demo".into()])
1388 .expect("should create producer");
1389
1390 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1391
1392 let mut consumer = producer.consume();
1393 consumer.assert_next("demo/stream", &broadcast.consume());
1394 consumer.assert_next_wait();
1395 }
1396
1397 #[tokio::test]
1398 async fn test_overlapping_prefixes_deduped() {
1399 let origin = Origin::produce();
1400 let broadcast = Broadcast::produce();
1401
1402 let producer = origin
1404 .publish_only(&["demo".into(), "demo/foo".into()])
1405 .expect("should create producer");
1406
1407 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1409
1410 let mut consumer = producer.consume();
1411 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1412 consumer.assert_next_wait();
1413 }
1414
1415 #[tokio::test]
1416 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1417 let origin = Origin::produce();
1418 let broadcast = Broadcast::produce();
1419
1420 let producer = origin
1422 .publish_only(&["demo".into(), "demo/foo".into()])
1423 .expect("should create producer");
1424
1425 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1426
1427 let mut consumer = producer.consume();
1428 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1430 consumer.assert_next_wait();
1431 }
1432
1433 #[tokio::test]
1434 async fn test_allowed_returns_deduped_prefixes() {
1435 let origin = Origin::produce();
1436
1437 let producer = origin
1438 .publish_only(&["demo".into(), "demo/foo".into(), "anon".into()])
1439 .expect("should create producer");
1440
1441 let allowed: Vec<_> = producer.allowed().collect();
1442 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1443 }
1444}