1use std::{
2 collections::{HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5};
6
7use rand::Rng;
8use tokio::sync::mpsc;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
38 let mut rng = rand::rng();
39 let id = rng.random_range(1..(1u64 << 62));
40 Self { id }
41 }
42
43 pub fn produce(self) -> OriginProducer {
45 OriginProducer::new(self)
46 }
47}
48
49impl From<u64> for Origin {
50 fn from(id: u64) -> Self {
51 Self { id }
52 }
53}
54
55impl fmt::Display for Origin {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 self.id.fmt(f)
58 }
59}
60
61impl<V: Copy> Encode<V> for Origin
62where
63 u64: Encode<V>,
64{
65 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
66 self.id.encode(w, version)
67 }
68}
69
70impl<V: Copy> Decode<V> for Origin
71where
72 u64: Decode<V>,
73{
74 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
75 let id = u64::decode(r, version)?;
76 if id >= 1u64 << 62 {
77 return Err(DecodeError::InvalidValue);
78 }
79 Ok(Self { id })
80 }
81}
82
83pub(crate) const MAX_HOPS: usize = 32;
89
90#[derive(Debug, Clone, Default, PartialEq, Eq)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96pub struct OriginList(Vec<Origin>);
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100#[non_exhaustive]
101pub struct TooManyOrigins;
102
103impl fmt::Display for TooManyOrigins {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(f, "too many origins (max {MAX_HOPS})")
106 }
107}
108
109impl std::error::Error for TooManyOrigins {}
110
111impl From<TooManyOrigins> for DecodeError {
112 fn from(_: TooManyOrigins) -> Self {
113 DecodeError::BoundsExceeded
114 }
115}
116
117impl OriginList {
118 pub fn new() -> Self {
120 Self(Vec::new())
121 }
122
123 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
125 if self.0.len() >= MAX_HOPS {
126 return Err(TooManyOrigins);
127 }
128 self.0.push(origin);
129 Ok(())
130 }
131
132 pub fn contains(&self, origin: &Origin) -> bool {
134 self.0.contains(origin)
135 }
136
137 pub fn len(&self) -> usize {
139 self.0.len()
140 }
141
142 pub fn is_empty(&self) -> bool {
144 self.0.is_empty()
145 }
146
147 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
149 self.0.iter()
150 }
151
152 pub fn as_slice(&self) -> &[Origin] {
154 &self.0
155 }
156}
157
158impl TryFrom<Vec<Origin>> for OriginList {
159 type Error = TooManyOrigins;
160
161 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
162 if v.len() > MAX_HOPS {
163 return Err(TooManyOrigins);
164 }
165 Ok(Self(v))
166 }
167}
168
169impl<'a> IntoIterator for &'a OriginList {
170 type Item = &'a Origin;
171 type IntoIter = std::slice::Iter<'a, Origin>;
172
173 fn into_iter(self) -> Self::IntoIter {
174 self.iter()
175 }
176}
177
178impl<V: Copy> Encode<V> for OriginList
179where
180 u64: Encode<V>,
181 Origin: Encode<V>,
182{
183 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
184 (self.0.len() as u64).encode(w, version)?;
185 for origin in &self.0 {
186 origin.encode(w, version)?;
187 }
188 Ok(())
189 }
190}
191
192impl<V: Copy> Decode<V> for OriginList
193where
194 u64: Decode<V>,
195 Origin: Decode<V>,
196{
197 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
198 let count = u64::decode(r, version)? as usize;
199 if count > MAX_HOPS {
200 return Err(DecodeError::BoundsExceeded);
201 }
202 let mut list = Vec::with_capacity(count);
203 for _ in 0..count {
204 list.push(Origin::decode(r, version)?);
205 }
206 Ok(Self(list))
207 }
208}
209
210static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
213struct ConsumerId(u64);
214
215impl ConsumerId {
216 fn new() -> Self {
217 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
218 }
219}
220
221struct OriginBroadcast {
223 path: PathOwned,
224 active: BroadcastConsumer,
225 backup: VecDeque<BroadcastConsumer>,
226}
227
228#[derive(Clone)]
229struct OriginConsumerNotify {
230 root: PathOwned,
231 tx: mpsc::UnboundedSender<OriginAnnounce>,
232}
233
234impl OriginConsumerNotify {
235 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
236 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
237 self.tx.send((path, Some(broadcast))).expect("consumer closed");
238 }
239
240 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
241 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
242 self.tx.send((path.clone(), None)).expect("consumer closed");
243 self.tx.send((path, Some(broadcast))).expect("consumer closed");
244 }
245
246 fn unannounce(&self, path: impl AsPath) {
247 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
248 self.tx.send((path, None)).expect("consumer closed");
249 }
250}
251
252struct NotifyNode {
253 parent: Option<Lock<NotifyNode>>,
254
255 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
258}
259
260impl NotifyNode {
261 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
262 Self {
263 parent,
264 consumers: HashMap::new(),
265 }
266 }
267
268 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
269 for consumer in self.consumers.values() {
270 consumer.announce(path.as_path(), broadcast.clone());
271 }
272
273 if let Some(parent) = &self.parent {
274 parent.lock().announce(path, broadcast);
275 }
276 }
277
278 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
279 for consumer in self.consumers.values() {
280 consumer.reannounce(path.as_path(), broadcast.clone());
281 }
282
283 if let Some(parent) = &self.parent {
284 parent.lock().reannounce(path, broadcast);
285 }
286 }
287
288 fn unannounce(&mut self, path: impl AsPath) {
289 for consumer in self.consumers.values() {
290 consumer.unannounce(path.as_path());
291 }
292
293 if let Some(parent) = &self.parent {
294 parent.lock().unannounce(path);
295 }
296 }
297}
298
299struct OriginNode {
300 broadcast: Option<OriginBroadcast>,
302
303 nested: HashMap<String, Lock<OriginNode>>,
305
306 notify: Lock<NotifyNode>,
308}
309
310impl OriginNode {
311 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
312 Self {
313 broadcast: None,
314 nested: HashMap::new(),
315 notify: Lock::new(NotifyNode::new(parent)),
316 }
317 }
318
319 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
320 let (dir, rest) = path.next_part().expect("leaf called with empty path");
321
322 let next = self.entry(dir);
323 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
324 }
325
326 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
327 match self.nested.get(dir) {
328 Some(next) => next.clone(),
329 None => {
330 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
331 self.nested.insert(dir.to_string(), next.clone());
332 next
333 }
334 }
335 }
336
337 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
338 let full = full.as_path();
339 let rest = relative.as_path();
340
341 if let Some((dir, relative)) = rest.next_part() {
343 self.entry(dir).lock().publish(&full, broadcast, &relative);
345 } else if let Some(existing) = &mut self.broadcast {
346 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
353 return;
354 }
355
356 if broadcast.hops.len() <= existing.active.hops.len() {
357 let old = existing.active.clone();
358 existing.active = broadcast.clone();
359 existing.backup.push_back(old);
360
361 self.notify.lock().reannounce(full, broadcast);
362 } else {
363 existing.backup.push_back(broadcast.clone());
365 }
366 } else {
367 self.broadcast = Some(OriginBroadcast {
369 path: full.to_owned(),
370 active: broadcast.clone(),
371 backup: VecDeque::new(),
372 });
373 self.notify.lock().announce(full, broadcast);
374 }
375 }
376
377 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
378 self.consume_initial(&mut notify);
379 self.notify.lock().consumers.insert(id, notify);
380 }
381
382 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
383 if let Some(broadcast) = &self.broadcast {
384 notify.announce(&broadcast.path, broadcast.active.clone());
385 }
386
387 for nested in self.nested.values() {
389 nested.lock().consume_initial(notify);
390 }
391 }
392
393 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
394 let rest = rest.as_path();
395
396 if let Some((dir, rest)) = rest.next_part() {
397 let node = self.nested.get(dir)?.lock();
398 node.consume_broadcast(&rest)
399 } else {
400 self.broadcast.as_ref().map(|b| b.active.clone())
401 }
402 }
403
404 fn unconsume(&mut self, id: ConsumerId) {
405 self.notify.lock().consumers.remove(&id).expect("consumer not found");
406 if self.is_empty() {
407 }
410 }
411
412 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
414 let full = full.as_path();
415 let relative = relative.as_path();
416
417 if let Some((dir, relative)) = relative.next_part() {
418 let nested = self.entry(dir);
419 let mut locked = nested.lock();
420 locked.remove(&full, broadcast, &relative);
421
422 if locked.is_empty() {
423 drop(locked);
424 self.nested.remove(dir);
425 }
426 } else {
427 let entry = match &mut self.broadcast {
428 Some(existing) => existing,
429 None => return,
430 };
431
432 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
434 if let Some(pos) = pos {
435 entry.backup.remove(pos);
436 return;
438 }
439
440 assert!(entry.active.is_clone(&broadcast));
442
443 let best = entry
446 .backup
447 .iter()
448 .enumerate()
449 .min_by_key(|(_, b)| b.hops.len())
450 .map(|(i, _)| i);
451 if let Some(idx) = best {
452 let active = entry.backup.remove(idx).expect("index in range");
453 entry.active = active;
454 self.notify.lock().reannounce(full, &entry.active);
455 } else {
456 self.broadcast = None;
458 self.notify.lock().unannounce(full);
459 }
460 }
461 }
462
463 fn is_empty(&self) -> bool {
464 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
465 }
466}
467
468#[derive(Clone)]
469struct OriginNodes {
470 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
471}
472
473impl OriginNodes {
474 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
477 let mut roots = Vec::new();
478
479 for (root, state) in &self.nodes {
480 for prefix in prefixes {
481 if root.has_prefix(prefix) {
482 roots.push((root.to_owned(), state.clone()));
484 continue;
485 }
486
487 if let Some(suffix) = prefix.strip_prefix(root) {
488 let nested = state.lock().leaf(&suffix);
490 roots.push((prefix.to_owned(), nested));
491 }
492 }
493 }
494
495 if roots.is_empty() {
496 None
497 } else {
498 Some(Self { nodes: roots })
499 }
500 }
501
502 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
503 let new_root = new_root.as_path();
504 let mut roots = Vec::new();
505
506 if new_root.is_empty() {
507 return Some(self.clone());
508 }
509
510 for (root, state) in &self.nodes {
511 if let Some(suffix) = root.strip_prefix(&new_root) {
512 roots.push((suffix.to_owned(), state.clone()));
514 } else if let Some(suffix) = new_root.strip_prefix(root) {
515 let nested = state.lock().leaf(&suffix);
518 roots.push(("".into(), nested));
519 }
520 }
521
522 if roots.is_empty() {
523 None
524 } else {
525 Some(Self { nodes: roots })
526 }
527 }
528
529 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
531 let path = path.as_path();
532
533 for (root, state) in &self.nodes {
534 if let Some(suffix) = path.strip_prefix(root) {
535 return Some((state.clone(), suffix.to_owned()));
536 }
537 }
538
539 None
540 }
541}
542
543impl Default for OriginNodes {
544 fn default() -> Self {
545 Self {
546 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
547 }
548 }
549}
550
551pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
553
554#[derive(Clone)]
556pub struct OriginProducer {
557 info: Origin,
561
562 nodes: OriginNodes,
565
566 root: PathOwned,
568}
569
570impl std::ops::Deref for OriginProducer {
571 type Target = Origin;
572
573 fn deref(&self) -> &Self::Target {
574 &self.info
575 }
576}
577
578impl OriginProducer {
579 pub fn new(info: Origin) -> Self {
582 Self {
583 info,
584 nodes: OriginNodes::default(),
585 root: PathOwned::default(),
586 }
587 }
588
589 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
594 let broadcast = Broadcast::new().produce();
595 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
596 }
597
598 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
608 let path = path.as_path();
609
610 if broadcast.hops.contains(&self.info) {
612 return false;
613 }
614
615 let (root, rest) = match self.nodes.get(&path) {
616 Some(root) => root,
617 None => return false,
618 };
619
620 let full = self.root.join(&path);
621
622 root.lock().publish(&full, &broadcast, &rest);
623 let root = root.clone();
624
625 web_async::spawn(async move {
626 broadcast.closed().await;
627 root.lock().remove(&full, broadcast, &rest);
628 });
629
630 true
631 }
632
633 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
639 let prefixes = PathPrefixes::new(prefixes);
640 Some(OriginProducer {
641 info: self.info,
642 nodes: self.nodes.select(&prefixes)?,
643 root: self.root.clone(),
644 })
645 }
646
647 pub fn consume(&self) -> OriginConsumer {
649 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
650 }
651
652 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
657 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
658 let path = path.as_path();
659 let (root, rest) = self.nodes.get(&path)?;
660 let state = root.lock();
661 state.consume_broadcast(&rest)
662 }
663
664 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
669 let prefix = prefix.as_path();
670
671 Some(Self {
672 info: self.info,
673 root: self.root.join(&prefix).to_owned(),
674 nodes: self.nodes.root(&prefix)?,
675 })
676 }
677
678 pub fn root(&self) -> &Path<'_> {
680 &self.root
681 }
682
683 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
686 self.nodes.nodes.iter().map(|(root, _)| root)
687 }
688
689 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
691 self.root.join(path)
692 }
693}
694
695pub struct OriginConsumer {
699 id: ConsumerId,
700 info: Origin,
702 nodes: OriginNodes,
703 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
704
705 root: PathOwned,
707}
708
709impl std::ops::Deref for OriginConsumer {
710 type Target = Origin;
711
712 fn deref(&self) -> &Self::Target {
713 &self.info
714 }
715}
716
717impl OriginConsumer {
718 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
719 let (tx, rx) = mpsc::unbounded_channel();
720
721 let id = ConsumerId::new();
722
723 for (_, state) in &nodes.nodes {
724 let notify = OriginConsumerNotify {
725 root: root.clone(),
726 tx: tx.clone(),
727 };
728 state.lock().consume(id, notify);
729 }
730
731 Self {
732 id,
733 info,
734 nodes,
735 updates: rx,
736 root,
737 }
738 }
739
740 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
748 self.updates.recv().await
749 }
750
751 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
756 self.updates.try_recv().ok()
757 }
758
759 pub fn consume(&self) -> Self {
761 self.clone()
762 }
763
764 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
772 let path = path.as_path();
773 let (root, rest) = self.nodes.get(&path)?;
774 let state = root.lock();
775 state.consume_broadcast(&rest)
776 }
777
778 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
787 let path = path.as_path();
788
789 let mut consumer = self.scope(std::slice::from_ref(&path))?;
791
792 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
796 return None;
797 }
798
799 loop {
800 let (announced, broadcast) = consumer.announced().await?;
801 if announced.as_path() == path {
803 if let Some(broadcast) = broadcast {
804 return Some(broadcast);
805 }
806 }
807 }
808 }
809
810 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
816 let prefixes = PathPrefixes::new(prefixes);
817 Some(OriginConsumer::new(
818 self.info,
819 self.root.clone(),
820 self.nodes.select(&prefixes)?,
821 ))
822 }
823
824 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
829 let prefix = prefix.as_path();
830
831 Some(Self::new(
832 self.info,
833 self.root.join(&prefix).to_owned(),
834 self.nodes.root(&prefix)?,
835 ))
836 }
837
838 pub fn root(&self) -> &Path<'_> {
840 &self.root
841 }
842
843 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
846 self.nodes.nodes.iter().map(|(root, _)| root)
847 }
848
849 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
851 self.root.join(path)
852 }
853}
854
855impl Drop for OriginConsumer {
856 fn drop(&mut self) {
857 for (_, root) in &self.nodes.nodes {
858 root.lock().unconsume(self.id);
859 }
860 }
861}
862
863impl Clone for OriginConsumer {
864 fn clone(&self) -> Self {
865 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
866 }
867}
868
869#[cfg(test)]
870use futures::FutureExt;
871
872#[cfg(test)]
873impl OriginConsumer {
874 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
875 let expected = expected.as_path();
876 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
877 assert_eq!(path, expected, "wrong path");
878 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
879 }
880
881 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
882 let expected = expected.as_path();
883 let (path, active) = self.try_announced().expect("no next");
884 assert_eq!(path, expected, "wrong path");
885 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
886 }
887
888 pub fn assert_next_none(&mut self, expected: impl AsPath) {
889 let expected = expected.as_path();
890 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
891 assert_eq!(path, expected, "wrong path");
892 assert!(active.is_none(), "should be unannounced");
893 }
894
895 pub fn assert_next_wait(&mut self) {
896 if let Some(res) = self.announced().now_or_never() {
897 panic!("next should block: got {:?}", res.map(|(path, _)| path));
898 }
899 }
900
901 }
910
911#[cfg(test)]
912mod tests {
913 use crate::Broadcast;
914
915 use super::*;
916
917 #[test]
918 fn origin_list_push_fails_at_limit() {
919 let mut list = OriginList::new();
920 for _ in 0..MAX_HOPS {
921 list.push(Origin::random()).unwrap();
922 }
923 assert_eq!(list.len(), MAX_HOPS);
924 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
925 }
926
927 #[test]
928 fn origin_list_try_from_vec_enforces_limit() {
929 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
930 assert!(OriginList::try_from(under).is_ok());
931
932 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
933 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
934 }
935
936 #[tokio::test]
937 async fn test_announce() {
938 tokio::time::pause();
939
940 let origin = Origin::random().produce();
941 let broadcast1 = Broadcast::new().produce();
942 let broadcast2 = Broadcast::new().produce();
943
944 let mut consumer1 = origin.consume();
945 consumer1.assert_next_wait();
947
948 origin.publish_broadcast("test1", broadcast1.consume());
950
951 consumer1.assert_next("test1", &broadcast1.consume());
952 consumer1.assert_next_wait();
953
954 let mut consumer2 = origin.consume();
957
958 origin.publish_broadcast("test2", broadcast2.consume());
960
961 consumer1.assert_next("test2", &broadcast2.consume());
962 consumer1.assert_next_wait();
963
964 consumer2.assert_next("test1", &broadcast1.consume());
965 consumer2.assert_next("test2", &broadcast2.consume());
966 consumer2.assert_next_wait();
967
968 drop(broadcast1);
970
971 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
973
974 consumer1.assert_next_none("test1");
976 consumer2.assert_next_none("test1");
977 consumer1.assert_next_wait();
978 consumer2.assert_next_wait();
979
980 let mut consumer3 = origin.consume();
982 consumer3.assert_next("test2", &broadcast2.consume());
983 consumer3.assert_next_wait();
984
985 drop(broadcast2);
987
988 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
990
991 consumer1.assert_next_none("test2");
992 consumer2.assert_next_none("test2");
993 consumer3.assert_next_none("test2");
994
995 }
1001
1002 #[tokio::test]
1003 async fn test_duplicate() {
1004 tokio::time::pause();
1005
1006 let origin = Origin::random().produce();
1007
1008 let broadcast1 = Broadcast::new().produce();
1009 let broadcast2 = Broadcast::new().produce();
1010 let broadcast3 = Broadcast::new().produce();
1011
1012 let consumer1 = broadcast1.consume();
1013 let consumer2 = broadcast2.consume();
1014 let consumer3 = broadcast3.consume();
1015
1016 let mut consumer = origin.consume();
1017
1018 origin.publish_broadcast("test", consumer1.clone());
1019 origin.publish_broadcast("test", consumer2.clone());
1020 origin.publish_broadcast("test", consumer3.clone());
1021 assert!(consumer.get_broadcast("test").is_some());
1022
1023 consumer.assert_next("test", &consumer1);
1025 consumer.assert_next_none("test");
1026 consumer.assert_next("test", &consumer2);
1027 consumer.assert_next_none("test");
1028 consumer.assert_next("test", &consumer3);
1029
1030 drop(broadcast2);
1032
1033 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1035
1036 assert!(consumer.get_broadcast("test").is_some());
1037 consumer.assert_next_wait();
1038
1039 drop(broadcast3);
1041
1042 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1044
1045 assert!(consumer.get_broadcast("test").is_some());
1046 consumer.assert_next_none("test");
1047 consumer.assert_next("test", &consumer1);
1048
1049 drop(broadcast1);
1051
1052 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1054 assert!(consumer.get_broadcast("test").is_none());
1055
1056 consumer.assert_next_none("test");
1057 consumer.assert_next_wait();
1058 }
1059
1060 #[tokio::test]
1061 async fn test_duplicate_reverse() {
1062 tokio::time::pause();
1063
1064 let origin = Origin::random().produce();
1065 let broadcast1 = Broadcast::new().produce();
1066 let broadcast2 = Broadcast::new().produce();
1067
1068 origin.publish_broadcast("test", broadcast1.consume());
1069 origin.publish_broadcast("test", broadcast2.consume());
1070 assert!(origin.consume().get_broadcast("test").is_some());
1071
1072 drop(broadcast2);
1074
1075 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1077 assert!(origin.consume().get_broadcast("test").is_some());
1078
1079 drop(broadcast1);
1080
1081 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1083 assert!(origin.consume().get_broadcast("test").is_none());
1084 }
1085
1086 #[tokio::test]
1087 async fn test_double_publish() {
1088 tokio::time::pause();
1089
1090 let origin = Origin::random().produce();
1091 let broadcast = Broadcast::new().produce();
1092
1093 origin.publish_broadcast("test", broadcast.consume());
1095 origin.publish_broadcast("test", broadcast.consume());
1096
1097 assert!(origin.consume().get_broadcast("test").is_some());
1098
1099 drop(broadcast);
1100
1101 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1103 assert!(origin.consume().get_broadcast("test").is_none());
1104 }
1105 #[tokio::test]
1107 #[should_panic]
1108 async fn test_128() {
1109 let origin = Origin::random().produce();
1110 let broadcast = Broadcast::new().produce();
1111
1112 let mut consumer = origin.consume();
1113 for i in 0..256 {
1114 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1115 }
1116
1117 for i in 0..256 {
1118 consumer.assert_next(format!("test{i}"), &broadcast.consume());
1119 }
1120 }
1121
1122 #[tokio::test]
1123 async fn test_128_fix() {
1124 let origin = Origin::random().produce();
1125 let broadcast = Broadcast::new().produce();
1126
1127 let mut consumer = origin.consume();
1128 for i in 0..256 {
1129 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1130 }
1131
1132 for i in 0..256 {
1133 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
1135 }
1136 }
1137
1138 #[tokio::test]
1139 async fn test_with_root_basic() {
1140 let origin = Origin::random().produce();
1141 let broadcast = Broadcast::new().produce();
1142
1143 let foo_producer = origin.with_root("foo").expect("should create root");
1145 assert_eq!(foo_producer.root().as_str(), "foo");
1146
1147 let mut consumer = origin.consume();
1148
1149 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1151 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1153
1154 let mut foo_consumer = foo_producer.consume();
1156 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1157 }
1158
1159 #[tokio::test]
1160 async fn test_with_root_nested() {
1161 let origin = Origin::random().produce();
1162 let broadcast = Broadcast::new().produce();
1163
1164 let foo_producer = origin.with_root("foo").expect("should create foo root");
1166 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1167 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1168
1169 let mut consumer = origin.consume();
1170
1171 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1173 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1175
1176 let mut foo_bar_consumer = foo_bar_producer.consume();
1178 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1179 }
1180
1181 #[tokio::test]
1182 async fn test_publish_scope_allows() {
1183 let origin = Origin::random().produce();
1184 let broadcast = Broadcast::new().produce();
1185
1186 let limited_producer = origin
1188 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1189 .expect("should create limited producer");
1190
1191 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1193 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1194 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1195
1196 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1198 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1200 }
1201
1202 #[tokio::test]
1203 async fn test_publish_scope_empty() {
1204 let origin = Origin::random().produce();
1205
1206 assert!(origin.scope(&[]).is_none());
1208 }
1209
1210 #[tokio::test]
1211 async fn test_consume_scope_filters() {
1212 let origin = Origin::random().produce();
1213 let broadcast1 = Broadcast::new().produce();
1214 let broadcast2 = Broadcast::new().produce();
1215 let broadcast3 = Broadcast::new().produce();
1216
1217 let mut consumer = origin.consume();
1218
1219 origin.publish_broadcast("allowed", broadcast1.consume());
1221 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1222 origin.publish_broadcast("notallowed", broadcast3.consume());
1223
1224 let mut limited_consumer = origin
1226 .consume()
1227 .scope(&["allowed".into()])
1228 .expect("should create limited consumer");
1229
1230 limited_consumer.assert_next("allowed", &broadcast1.consume());
1232 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1233 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1237 consumer.assert_next("allowed/nested", &broadcast2.consume());
1238 consumer.assert_next("notallowed", &broadcast3.consume());
1239 }
1240
1241 #[tokio::test]
1242 async fn test_consume_scope_multiple_prefixes() {
1243 let origin = Origin::random().produce();
1244 let broadcast1 = Broadcast::new().produce();
1245 let broadcast2 = Broadcast::new().produce();
1246 let broadcast3 = Broadcast::new().produce();
1247
1248 origin.publish_broadcast("foo/test", broadcast1.consume());
1249 origin.publish_broadcast("bar/test", broadcast2.consume());
1250 origin.publish_broadcast("baz/test", broadcast3.consume());
1251
1252 let mut limited_consumer = origin
1254 .consume()
1255 .scope(&["foo".into(), "bar".into()])
1256 .expect("should create limited consumer");
1257
1258 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1260 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1261 limited_consumer.assert_next_wait(); }
1263
1264 #[tokio::test]
1265 async fn test_with_root_and_publish_scope() {
1266 let origin = Origin::random().produce();
1267 let broadcast = Broadcast::new().produce();
1268
1269 let foo_producer = origin.with_root("foo").expect("should create foo root");
1271
1272 let limited_producer = foo_producer
1274 .scope(&["bar".into(), "goop/pee".into()])
1275 .expect("should create limited producer");
1276
1277 let mut consumer = origin.consume();
1278
1279 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1281 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1282 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1283 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1284
1285 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1287 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1289
1290 consumer.assert_next("foo/bar", &broadcast.consume());
1292 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1293 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1294 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1295 }
1296
1297 #[tokio::test]
1298 async fn test_with_root_and_consume_scope() {
1299 let origin = Origin::random().produce();
1300 let broadcast1 = Broadcast::new().produce();
1301 let broadcast2 = Broadcast::new().produce();
1302 let broadcast3 = Broadcast::new().produce();
1303
1304 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1306 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1307 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1308
1309 let foo_producer = origin.with_root("foo").expect("should create foo root");
1311
1312 let mut limited_consumer = foo_producer
1314 .consume()
1315 .scope(&["bar".into(), "goop/pee".into()])
1316 .expect("should create limited consumer");
1317
1318 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1320 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1321 limited_consumer.assert_next_wait(); }
1323
1324 #[tokio::test]
1325 async fn test_with_root_unauthorized() {
1326 let origin = Origin::random().produce();
1327
1328 let limited_producer = origin
1330 .scope(&["allowed".into()])
1331 .expect("should create limited producer");
1332
1333 assert!(limited_producer.with_root("notallowed").is_none());
1335
1336 let allowed_root = limited_producer
1338 .with_root("allowed")
1339 .expect("should create allowed root");
1340 assert_eq!(allowed_root.root().as_str(), "allowed");
1341 }
1342
1343 #[tokio::test]
1344 async fn test_wildcard_permission() {
1345 let origin = Origin::random().produce();
1346 let broadcast = Broadcast::new().produce();
1347
1348 let root_producer = origin.clone();
1350
1351 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1353 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1354
1355 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1357 assert_eq!(foo_producer.root().as_str(), "foo");
1358 }
1359
1360 #[tokio::test]
1361 async fn test_consume_broadcast_with_permissions() {
1362 let origin = Origin::random().produce();
1363 let broadcast1 = Broadcast::new().produce();
1364 let broadcast2 = Broadcast::new().produce();
1365
1366 origin.publish_broadcast("allowed/test", broadcast1.consume());
1367 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1368
1369 let limited_consumer = origin
1371 .consume()
1372 .scope(&["allowed".into()])
1373 .expect("should create limited consumer");
1374
1375 let result = limited_consumer.get_broadcast("allowed/test");
1377 assert!(result.is_some());
1378 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1379
1380 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1382
1383 let consumer = origin.consume();
1385 assert!(consumer.get_broadcast("allowed/test").is_some());
1386 assert!(consumer.get_broadcast("notallowed/test").is_some());
1387 }
1388
1389 #[tokio::test]
1390 async fn test_nested_paths_with_permissions() {
1391 let origin = Origin::random().produce();
1392 let broadcast = Broadcast::new().produce();
1393
1394 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1396
1397 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1399 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1400 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1401
1402 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1404 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1405 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1406 }
1407
1408 #[tokio::test]
1409 async fn test_multiple_consumers_with_different_permissions() {
1410 let origin = Origin::random().produce();
1411 let broadcast1 = Broadcast::new().produce();
1412 let broadcast2 = Broadcast::new().produce();
1413 let broadcast3 = Broadcast::new().produce();
1414
1415 origin.publish_broadcast("foo/test", broadcast1.consume());
1417 origin.publish_broadcast("bar/test", broadcast2.consume());
1418 origin.publish_broadcast("baz/test", broadcast3.consume());
1419
1420 let mut foo_consumer = origin
1422 .consume()
1423 .scope(&["foo".into()])
1424 .expect("should create foo consumer");
1425
1426 let mut bar_consumer = origin
1427 .consume()
1428 .scope(&["bar".into()])
1429 .expect("should create bar consumer");
1430
1431 let mut foobar_consumer = origin
1432 .consume()
1433 .scope(&["foo".into(), "bar".into()])
1434 .expect("should create foobar consumer");
1435
1436 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1438 foo_consumer.assert_next_wait();
1439
1440 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1441 bar_consumer.assert_next_wait();
1442
1443 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1444 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1445 foobar_consumer.assert_next_wait();
1446 }
1447
1448 #[tokio::test]
1449 async fn test_select_with_empty_prefix() {
1450 let origin = Origin::random().produce();
1451 let broadcast1 = Broadcast::new().produce();
1452 let broadcast2 = Broadcast::new().produce();
1453
1454 let demo_producer = origin.with_root("demo").expect("should create demo root");
1456 let limited_producer = demo_producer
1457 .scope(&["worm-node".into(), "foobar".into()])
1458 .expect("should create limited producer");
1459
1460 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1462 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1463
1464 let mut consumer = limited_producer
1466 .consume()
1467 .scope(&["".into()])
1468 .expect("should create consumer with empty prefix");
1469
1470 let a1 = consumer.try_announced().expect("expected first announcement");
1472 let a2 = consumer.try_announced().expect("expected second announcement");
1473 consumer.assert_next_wait();
1474
1475 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1476 paths.sort();
1477 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1478 }
1479
1480 #[tokio::test]
1481 async fn test_select_narrowing_scope() {
1482 let origin = Origin::random().produce();
1483 let broadcast1 = Broadcast::new().produce();
1484 let broadcast2 = Broadcast::new().produce();
1485 let broadcast3 = Broadcast::new().produce();
1486
1487 let demo_producer = origin.with_root("demo").expect("should create demo root");
1489 let limited_producer = demo_producer
1490 .scope(&["worm-node".into(), "foobar".into()])
1491 .expect("should create limited producer");
1492
1493 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1495 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1496 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1497
1498 let mut worm_consumer = limited_producer
1500 .consume()
1501 .scope(&["worm-node".into()])
1502 .expect("should create worm-node consumer");
1503
1504 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1506 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1507 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1511 .consume()
1512 .scope(&["worm-node/foo".into()])
1513 .expect("should create worm-node/foo consumer");
1514
1515 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1516 foo_consumer.assert_next_wait(); }
1518
1519 #[tokio::test]
1520 async fn test_select_multiple_roots_with_empty_prefix() {
1521 let origin = Origin::random().produce();
1522 let broadcast1 = Broadcast::new().produce();
1523 let broadcast2 = Broadcast::new().produce();
1524 let broadcast3 = Broadcast::new().produce();
1525
1526 let limited_producer = origin
1528 .scope(&["app1".into(), "app2".into(), "shared".into()])
1529 .expect("should create limited producer");
1530
1531 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1533 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1534 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1535
1536 let mut consumer = limited_producer
1538 .consume()
1539 .scope(&["".into()])
1540 .expect("should create consumer with empty prefix");
1541
1542 consumer.assert_next("app1/data", &broadcast1.consume());
1544 consumer.assert_next("app2/config", &broadcast2.consume());
1545 consumer.assert_next("shared/resource", &broadcast3.consume());
1546 consumer.assert_next_wait();
1547 }
1548
1549 #[tokio::test]
1550 async fn test_publish_scope_with_empty_prefix() {
1551 let origin = Origin::random().produce();
1552 let broadcast = Broadcast::new().produce();
1553
1554 let limited_producer = origin
1556 .scope(&["services/api".into(), "services/web".into()])
1557 .expect("should create limited producer");
1558
1559 let same_producer = limited_producer
1561 .scope(&["".into()])
1562 .expect("should create producer with empty prefix");
1563
1564 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1566 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1567 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1568 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1569 }
1570
1571 #[tokio::test]
1572 async fn test_select_narrowing_to_deeper_path() {
1573 let origin = Origin::random().produce();
1574 let broadcast1 = Broadcast::new().produce();
1575 let broadcast2 = Broadcast::new().produce();
1576 let broadcast3 = Broadcast::new().produce();
1577
1578 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1580
1581 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1583 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1584 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1585
1586 let mut team2_consumer = limited_producer
1588 .consume()
1589 .scope(&["org/team2".into()])
1590 .expect("should create team2 consumer");
1591
1592 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1593 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1597 .consume()
1598 .scope(&["org/team1/project1".into()])
1599 .expect("should create project1 consumer");
1600
1601 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1603 project1_consumer.assert_next_wait();
1604 }
1605
1606 #[tokio::test]
1607 async fn test_select_with_non_matching_prefix() {
1608 let origin = Origin::random().produce();
1609
1610 let limited_producer = origin
1612 .scope(&["allowed/path".into()])
1613 .expect("should create limited producer");
1614
1615 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1617
1618 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1620 }
1621
1622 #[tokio::test]
1625 async fn test_with_root_trailing_slash_consumer() {
1626 let origin = Origin::random().produce();
1627
1628 let prefix = "some_prefix/".to_string();
1630 let mut consumer = origin.consume().with_root(prefix).unwrap();
1631
1632 let b = origin.create_broadcast("some_prefix/test").unwrap();
1633 consumer.assert_next("test", &b.consume());
1634 }
1635
1636 #[tokio::test]
1638 async fn test_with_root_trailing_slash_producer() {
1639 let origin = Origin::random().produce();
1640
1641 let prefix = "some_prefix/".to_string();
1643 let rooted = origin.with_root(prefix).unwrap();
1644
1645 let b = rooted.create_broadcast("test").unwrap();
1646
1647 let mut consumer = rooted.consume();
1648 consumer.assert_next("test", &b.consume());
1649 }
1650
1651 #[tokio::test]
1653 async fn test_with_root_trailing_slash_unannounce() {
1654 tokio::time::pause();
1655
1656 let origin = Origin::random().produce();
1657
1658 let prefix = "some_prefix/".to_string();
1659 let mut consumer = origin.consume().with_root(prefix).unwrap();
1660
1661 let b = origin.create_broadcast("some_prefix/test").unwrap();
1662 consumer.assert_next("test", &b.consume());
1663
1664 drop(b);
1666 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1667
1668 consumer.assert_next_none("test");
1670 }
1671
1672 #[tokio::test]
1673 async fn test_select_maintains_access_with_wider_prefix() {
1674 let origin = Origin::random().produce();
1675 let broadcast1 = Broadcast::new().produce();
1676 let broadcast2 = Broadcast::new().produce();
1677
1678 let demo_producer = origin.with_root("demo").expect("should create demo root");
1680 let user_producer = demo_producer
1681 .scope(&["worm-node".into(), "foobar".into()])
1682 .expect("should create user producer");
1683
1684 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1686 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1687
1688 let mut consumer = user_producer
1690 .consume()
1691 .scope(&["".into()])
1692 .expect("scope with empty prefix should not fail when user has specific permissions");
1693
1694 let a1 = consumer.try_announced().expect("expected first announcement");
1696 let a2 = consumer.try_announced().expect("expected second announcement");
1697 consumer.assert_next_wait();
1698
1699 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1700 paths.sort();
1701 assert_eq!(paths, ["foobar", "worm-node/data"]);
1702
1703 let mut narrow_consumer = user_producer
1705 .consume()
1706 .scope(&["worm-node".into()])
1707 .expect("should be able to narrow scope to worm-node");
1708
1709 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1710 narrow_consumer.assert_next_wait(); }
1712
1713 #[tokio::test]
1714 async fn test_duplicate_prefixes_deduped() {
1715 let origin = Origin::random().produce();
1716 let broadcast = Broadcast::new().produce();
1717
1718 let producer = origin
1720 .scope(&["demo".into(), "demo".into()])
1721 .expect("should create producer");
1722
1723 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1724
1725 let mut consumer = producer.consume();
1726 consumer.assert_next("demo/stream", &broadcast.consume());
1727 consumer.assert_next_wait();
1728 }
1729
1730 #[tokio::test]
1731 async fn test_overlapping_prefixes_deduped() {
1732 let origin = Origin::random().produce();
1733 let broadcast = Broadcast::new().produce();
1734
1735 let producer = origin
1737 .scope(&["demo".into(), "demo/foo".into()])
1738 .expect("should create producer");
1739
1740 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1742
1743 let mut consumer = producer.consume();
1744 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1745 consumer.assert_next_wait();
1746 }
1747
1748 #[tokio::test]
1749 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1750 let origin = Origin::random().produce();
1751 let broadcast = Broadcast::new().produce();
1752
1753 let producer = origin
1755 .scope(&["demo".into(), "demo/foo".into()])
1756 .expect("should create producer");
1757
1758 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1759
1760 let mut consumer = producer.consume();
1761 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1763 consumer.assert_next_wait();
1764 }
1765
1766 #[tokio::test]
1767 async fn test_allowed_returns_deduped_prefixes() {
1768 let origin = Origin::random().produce();
1769
1770 let producer = origin
1771 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1772 .expect("should create producer");
1773
1774 let allowed: Vec<_> = producer.allowed().collect();
1775 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1776 }
1777
1778 #[tokio::test]
1779 async fn test_announced_broadcast_already_announced() {
1780 let origin = Origin::random().produce();
1781 let broadcast = Broadcast::new().produce();
1782
1783 origin.publish_broadcast("test", broadcast.consume());
1784
1785 let consumer = origin.consume();
1786 let result = consumer.announced_broadcast("test").await.expect("should find it");
1787 assert!(result.is_clone(&broadcast.consume()));
1788 }
1789
1790 #[tokio::test]
1791 async fn test_announced_broadcast_delayed() {
1792 tokio::time::pause();
1793
1794 let origin = Origin::random().produce();
1795 let broadcast = Broadcast::new().produce();
1796
1797 let consumer = origin.consume();
1798
1799 let wait = tokio::spawn({
1801 let consumer = consumer.clone();
1802 async move { consumer.announced_broadcast("test").await }
1803 });
1804
1805 tokio::task::yield_now().await;
1807
1808 origin.publish_broadcast("test", broadcast.consume());
1809
1810 let result = wait.await.unwrap().expect("should find it");
1811 assert!(result.is_clone(&broadcast.consume()));
1812 }
1813
1814 #[tokio::test]
1815 async fn test_announced_broadcast_ignores_unrelated_paths() {
1816 tokio::time::pause();
1817
1818 let origin = Origin::random().produce();
1819 let other = Broadcast::new().produce();
1820 let target = Broadcast::new().produce();
1821
1822 let consumer = origin.consume();
1823
1824 let wait = tokio::spawn({
1825 let consumer = consumer.clone();
1826 async move { consumer.announced_broadcast("target").await }
1827 });
1828
1829 tokio::task::yield_now().await;
1830
1831 origin.publish_broadcast("other", other.consume());
1833 tokio::task::yield_now().await;
1834 assert!(!wait.is_finished(), "must not resolve on unrelated path");
1835
1836 origin.publish_broadcast("target", target.consume());
1837 let result = wait.await.unwrap().expect("should find target");
1838 assert!(result.is_clone(&target.consume()));
1839 }
1840
1841 #[tokio::test]
1842 async fn test_announced_broadcast_skips_nested_paths() {
1843 tokio::time::pause();
1844
1845 let origin = Origin::random().produce();
1846 let nested = Broadcast::new().produce();
1847 let exact = Broadcast::new().produce();
1848
1849 let consumer = origin.consume();
1850
1851 let wait = tokio::spawn({
1852 let consumer = consumer.clone();
1853 async move { consumer.announced_broadcast("foo").await }
1854 });
1855
1856 tokio::task::yield_now().await;
1857
1858 origin.publish_broadcast("foo/bar", nested.consume());
1860 tokio::task::yield_now().await;
1861 assert!(!wait.is_finished(), "must not resolve on a nested path");
1862
1863 origin.publish_broadcast("foo", exact.consume());
1864 let result = wait.await.unwrap().expect("should find foo exactly");
1865 assert!(result.is_clone(&exact.consume()));
1866 }
1867
1868 #[tokio::test]
1869 async fn test_announced_broadcast_disallowed() {
1870 let origin = Origin::random().produce();
1871 let limited = origin
1872 .consume()
1873 .scope(&["allowed".into()])
1874 .expect("should create limited");
1875
1876 assert!(limited.announced_broadcast("notallowed").await.is_none());
1878 }
1879
1880 #[tokio::test]
1881 async fn test_announced_broadcast_scope_too_narrow() {
1882 let origin = Origin::random().produce();
1885 let limited = origin
1886 .consume()
1887 .scope(&["foo/specific".into()])
1888 .expect("should create limited");
1889
1890 let result = limited
1892 .announced_broadcast("foo")
1893 .now_or_never()
1894 .expect("must not block");
1895 assert!(result.is_none());
1896 }
1897}