moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125	}
126
127	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128		match self.nested.get(dir) {
129			Some(next) => next.clone(),
130			None => {
131				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132				self.nested.insert(dir.to_string(), next.clone());
133				next
134			}
135		}
136	}
137
138	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139		let full = full.as_path();
140		let rest = relative.as_path();
141
142		// If the path has a directory component, then publish it to the nested node.
143		if let Some((dir, relative)) = rest.next_part() {
144			// Not using entry to avoid allocating a string most of the time.
145			self.entry(dir).lock().publish(&full, broadcast, &relative);
146		} else if let Some(existing) = &mut self.broadcast {
147			// This node is a leaf with an existing broadcast.
148			let old = existing.active.clone();
149			existing.active = broadcast.clone();
150			existing.backup.push(old);
151
152			self.notify.lock().reannounce(full, broadcast);
153		} else {
154			// This node is a leaf with no existing broadcast.
155			self.broadcast = Some(OriginBroadcast {
156				path: full.to_owned(),
157				active: broadcast.clone(),
158				backup: Vec::new(),
159			});
160			self.notify.lock().announce(full, broadcast);
161		}
162	}
163
164	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165		self.consume_initial(&mut notify);
166		self.notify.lock().consumers.insert(id, notify);
167	}
168
169	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170		if let Some(broadcast) = &self.broadcast {
171			notify.announce(&broadcast.path, broadcast.active.clone());
172		}
173
174		// Recursively subscribe to all nested nodes.
175		for nested in self.nested.values() {
176			nested.lock().consume_initial(notify);
177		}
178	}
179
180	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181		let rest = rest.as_path();
182
183		if let Some((dir, rest)) = rest.next_part() {
184			let node = self.nested.get(dir)?.lock();
185			node.consume_broadcast(&rest)
186		} else {
187			self.broadcast.as_ref().map(|b| b.active.clone())
188		}
189	}
190
191	fn unconsume(&mut self, id: ConsumerId) {
192		self.notify.lock().consumers.remove(&id).expect("consumer not found");
193		if self.is_empty() {
194			//tracing::warn!("TODO: empty node; memory leak");
195			// This happens when consuming a path that is not being broadcasted.
196		}
197	}
198
199	// Returns true if the broadcast should be unannounced.
200	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201		let full = full.as_path();
202		let relative = relative.as_path();
203
204		if let Some((dir, relative)) = relative.next_part() {
205			let nested = self.entry(dir);
206			let mut locked = nested.lock();
207			locked.remove(&full, broadcast, &relative);
208
209			if locked.is_empty() {
210				drop(locked);
211				self.nested.remove(dir);
212			}
213		} else {
214			let entry = match &mut self.broadcast {
215				Some(existing) => existing,
216				None => return,
217			};
218
219			// See if we can remove the broadcast from the backup list.
220			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221			if let Some(pos) = pos {
222				entry.backup.remove(pos);
223				// Nothing else to do
224				return;
225			}
226
227			// Okay so it must be the active broadcast or else we fucked up.
228			assert!(entry.active.is_clone(&broadcast));
229
230			// If there's a backup broadcast, then announce it.
231			if let Some(active) = entry.backup.pop() {
232				entry.active = active;
233				self.notify.lock().reannounce(full, &entry.active);
234			} else {
235				// No more backups, so remove the entry.
236				self.broadcast = None;
237				self.notify.lock().unannounce(full);
238			}
239		}
240	}
241
242	fn is_empty(&self) -> bool {
243		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244	}
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253	// Returns nested roots that match the prefixes.
254	// TODO enforce that prefixes can't overlap.
255	pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
256		let mut roots = Vec::new();
257
258		for (root, state) in &self.nodes {
259			for prefix in prefixes {
260				if root.has_prefix(prefix) {
261					// Keep the existing node if we're allowed to access it.
262					roots.push((root.to_owned(), state.clone()));
263					continue;
264				}
265
266				if let Some(suffix) = prefix.strip_prefix(root) {
267					// If the requested prefix is larger than the allowed prefix, then we further scope it.
268					let nested = state.lock().leaf(&suffix);
269					roots.push((prefix.to_owned(), nested));
270				}
271			}
272		}
273
274		if roots.is_empty() {
275			None
276		} else {
277			Some(Self { nodes: roots })
278		}
279	}
280
281	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282		let new_root = new_root.as_path();
283		let mut roots = Vec::new();
284
285		if new_root.is_empty() {
286			return Some(self.clone());
287		}
288
289		for (root, state) in &self.nodes {
290			if let Some(suffix) = root.strip_prefix(&new_root) {
291				// If the old root is longer than the new root, shorten the keys.
292				roots.push((suffix.to_owned(), state.clone()));
293			} else if let Some(suffix) = new_root.strip_prefix(root) {
294				// If the new root is longer than the old root, add a new root.
295				// NOTE: suffix can't be empty
296				let nested = state.lock().leaf(&suffix);
297				roots.push(("".into(), nested));
298			}
299		}
300
301		if roots.is_empty() {
302			None
303		} else {
304			Some(Self { nodes: roots })
305		}
306	}
307
308	// Returns the root that has this prefix.
309	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310		let path = path.as_path();
311
312		for (root, state) in &self.nodes {
313			if let Some(suffix) = path.strip_prefix(root) {
314				return Some((state.clone(), suffix.to_owned()));
315			}
316		}
317
318		None
319	}
320}
321
322impl Default for OriginNodes {
323	fn default() -> Self {
324		Self {
325			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326		}
327	}
328}
329
330/// A broadcast path and its associated consumer, or None if closed.
331pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333/// A collection of broadcasts that can be published and subscribed to.
334pub struct Origin {}
335
336impl Origin {
337	pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
338		let producer = OriginProducer::default();
339		let consumer = producer.consume();
340		Produce { producer, consumer }
341	}
342}
343
344/// Announces broadcasts to consumers over the network.
345#[derive(Clone, Default)]
346pub struct OriginProducer {
347	// The roots of the tree that we are allowed to publish.
348	// A path of "" means we can publish anything.
349	nodes: OriginNodes,
350
351	/// The prefix that is automatically stripped from all paths.
352	root: PathOwned,
353}
354
355impl OriginProducer {
356	/// Create and publish a new broadcast, returning the producer.
357	///
358	/// This is a helper method when you only want to publish a broadcast to a single origin.
359	/// Returns [None] if the broadcast is not allowed to be published.
360	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
361		let broadcast = Broadcast::produce();
362		self.publish_broadcast(path, broadcast.consumer)
363			.then_some(broadcast.producer)
364	}
365
366	/// Publish a broadcast, announcing it to all consumers.
367	///
368	/// The broadcast will be unannounced when it is closed.
369	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
370	/// If the old broadcast is closed before the new one, then nothing will happen.
371	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
372	///
373	/// Returns false if the broadcast is not allowed to be published.
374	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
375		let path = path.as_path();
376
377		let (root, rest) = match self.nodes.get(&path) {
378			Some(root) => root,
379			None => return false,
380		};
381
382		let full = self.root.join(&path);
383
384		root.lock().publish(&full, &broadcast, &rest);
385		let root = root.clone();
386
387		web_async::spawn(async move {
388			broadcast.closed().await;
389			root.lock().remove(&full, broadcast, &rest);
390		});
391
392		true
393	}
394
395	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
396	///
397	/// Returns None if there are no legal prefixes.
398	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
399		Some(OriginProducer {
400			nodes: self.nodes.select(prefixes)?,
401			root: self.root.clone(),
402		})
403	}
404
405	/// Subscribe to all announced broadcasts.
406	pub fn consume(&self) -> OriginConsumer {
407		OriginConsumer::new(self.root.clone(), self.nodes.clone())
408	}
409
410	/// Subscribe to all announced broadcasts matching the prefix.
411	///
412	/// TODO: Don't use overlapping prefixes or duplicates will be published.
413	///
414	/// Returns None if there are no legal prefixes.
415	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
416		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
417	}
418
419	/// Returns a new OriginProducer that automatically strips out the provided prefix.
420	///
421	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
422	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
423		let prefix = prefix.as_path();
424
425		Some(Self {
426			root: self.root.join(&prefix).to_owned(),
427			nodes: self.nodes.root(&prefix)?,
428		})
429	}
430
431	/// Returns the root that is automatically stripped from all paths.
432	pub fn root(&self) -> &Path<'_> {
433		&self.root
434	}
435
436	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
437		self.nodes.nodes.iter().map(|(root, _)| root)
438	}
439
440	/// Converts a relative path to an absolute path.
441	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
442		self.root.join(path)
443	}
444}
445
446/// Consumes announced broadcasts matching against an optional prefix.
447///
448/// NOTE: Clone is expensive, try to avoid it.
449pub struct OriginConsumer {
450	id: ConsumerId,
451	nodes: OriginNodes,
452	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
453
454	/// A prefix that is automatically stripped from all paths.
455	root: PathOwned,
456}
457
458impl OriginConsumer {
459	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
460		let (tx, rx) = mpsc::unbounded_channel();
461
462		let id = ConsumerId::new();
463
464		for (_, state) in &nodes.nodes {
465			let notify = OriginConsumerNotify {
466				root: root.clone(),
467				tx: tx.clone(),
468			};
469			state.lock().consume(id, notify);
470		}
471
472		Self {
473			id,
474			nodes,
475			updates: rx,
476			root,
477		}
478	}
479
480	/// Returns the next (un)announced broadcast and the absolute path.
481	///
482	/// The broadcast will only be announced if it was previously unannounced.
483	/// The same path won't be announced/unannounced twice, instead it will toggle.
484	/// Returns None if the consumer is closed.
485	///
486	/// Note: The returned path is absolute and will always match this consumer's prefix.
487	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
488		self.updates.recv().await
489	}
490
491	/// Returns the next (un)announced broadcast and the absolute path without blocking.
492	///
493	/// Returns None if there is no update available; NOT because the consumer is closed.
494	/// You have to use `is_closed` to check if the consumer is closed.
495	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
496		self.updates.try_recv().ok()
497	}
498
499	pub fn consume(&self) -> Self {
500		self.clone()
501	}
502
503	/// Get a specific broadcast by path.
504	///
505	/// TODO This should include announcement support.
506	///
507	/// Returns None if the path hasn't been announced yet.
508	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
509		let path = path.as_path();
510		let (root, rest) = self.nodes.get(&path)?;
511		let state = root.lock();
512		state.consume_broadcast(&rest)
513	}
514
515	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
516	///
517	/// Returns None if there are no legal prefixes (would always return None).
518	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
519		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
520	}
521
522	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
523	///
524	/// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard.
525	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
526		let prefix = prefix.as_path();
527
528		Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
529	}
530
531	/// Returns the prefix that is automatically stripped from all paths.
532	pub fn root(&self) -> &Path<'_> {
533		&self.root
534	}
535
536	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
537		self.nodes.nodes.iter().map(|(root, _)| root)
538	}
539
540	/// Converts a relative path to an absolute path.
541	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
542		self.root.join(path)
543	}
544}
545
546impl Drop for OriginConsumer {
547	fn drop(&mut self) {
548		for (_, root) in &self.nodes.nodes {
549			root.lock().unconsume(self.id);
550		}
551	}
552}
553
554impl Clone for OriginConsumer {
555	fn clone(&self) -> Self {
556		OriginConsumer::new(self.root.clone(), self.nodes.clone())
557	}
558}
559
560#[cfg(test)]
561use futures::FutureExt;
562
563#[cfg(test)]
564impl OriginConsumer {
565	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
566		let expected = expected.as_path();
567		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
568		assert_eq!(path, expected, "wrong path");
569		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
570	}
571
572	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
573		let expected = expected.as_path();
574		let (path, active) = self.try_announced().expect("no next");
575		assert_eq!(path, expected, "wrong path");
576		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
577	}
578
579	pub fn assert_next_none(&mut self, expected: impl AsPath) {
580		let expected = expected.as_path();
581		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
582		assert_eq!(path, expected, "wrong path");
583		assert!(active.is_none(), "should be unannounced");
584	}
585
586	pub fn assert_next_wait(&mut self) {
587		if let Some(res) = self.announced().now_or_never() {
588			panic!("next should block: got {:?}", res.map(|(path, _)| path));
589		}
590	}
591
592	/*
593	pub fn assert_next_closed(&mut self) {
594		assert!(
595			self.announced().now_or_never().expect("next blocked").is_none(),
596			"next should be closed"
597		);
598	}
599	*/
600}
601
602#[cfg(test)]
603mod tests {
604	use crate::Broadcast;
605
606	use super::*;
607
608	#[tokio::test]
609	async fn test_announce() {
610		let origin = Origin::produce();
611		let broadcast1 = Broadcast::produce();
612		let broadcast2 = Broadcast::produce();
613
614		let mut consumer1 = origin.consumer;
615		// Make a new consumer that should get it.
616		consumer1.assert_next_wait();
617
618		// Publish the first broadcast.
619		origin.producer.publish_broadcast("test1", broadcast1.consumer);
620
621		consumer1.assert_next("test1", &broadcast1.producer.consume());
622		consumer1.assert_next_wait();
623
624		// Make a new consumer that should get the existing broadcast.
625		// But we don't consume it yet.
626		let mut consumer2 = origin.producer.consume();
627
628		// Publish the second broadcast.
629		origin.producer.publish_broadcast("test2", broadcast2.consumer);
630
631		consumer1.assert_next("test2", &broadcast2.producer.consume());
632		consumer1.assert_next_wait();
633
634		consumer2.assert_next("test1", &broadcast1.producer.consume());
635		consumer2.assert_next("test2", &broadcast2.producer.consume());
636		consumer2.assert_next_wait();
637
638		// Close the first broadcast.
639		drop(broadcast1.producer);
640
641		// Wait for the async task to run.
642		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
643
644		// All consumers should get a None now.
645		consumer1.assert_next_none("test1");
646		consumer2.assert_next_none("test1");
647		consumer1.assert_next_wait();
648		consumer2.assert_next_wait();
649
650		// And a new consumer only gets the last broadcast.
651		let mut consumer3 = origin.producer.consume();
652		consumer3.assert_next("test2", &broadcast2.producer.consume());
653		consumer3.assert_next_wait();
654
655		// Close the other producer and make sure it cleans up
656		drop(broadcast2.producer);
657
658		// Wait for the async task to run.
659		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
660
661		consumer1.assert_next_none("test2");
662		consumer2.assert_next_none("test2");
663		consumer3.assert_next_none("test2");
664
665		/* TODO close the origin consumer when the producer is dropped
666		consumer1.assert_next_closed();
667		consumer2.assert_next_closed();
668		consumer3.assert_next_closed();
669		*/
670	}
671
672	#[tokio::test]
673	async fn test_duplicate() {
674		let mut origin = Origin::produce();
675
676		let broadcast1 = Broadcast::produce();
677		let broadcast2 = Broadcast::produce();
678		let broadcast3 = Broadcast::produce();
679
680		let consumer1 = broadcast1.consumer;
681		let consumer2 = broadcast2.consumer;
682		let consumer3 = broadcast3.consumer;
683
684		origin.producer.publish_broadcast("test", consumer1.clone());
685		origin.producer.publish_broadcast("test", consumer2.clone());
686		origin.producer.publish_broadcast("test", consumer3.clone());
687
688		assert!(origin.consumer.consume_broadcast("test").is_some());
689
690		origin.consumer.assert_next("test", &consumer1);
691		origin.consumer.assert_next_none("test");
692		origin.consumer.assert_next("test", &consumer2);
693		origin.consumer.assert_next_none("test");
694		origin.consumer.assert_next("test", &consumer3);
695
696		// Drop the backup, nothing should change.
697		drop(broadcast2.producer);
698
699		// Wait for the async task to run.
700		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
701
702		assert!(origin.consumer.consume_broadcast("test").is_some());
703		origin.consumer.assert_next_wait();
704
705		// Drop the active, we should reannounce.
706		drop(broadcast3.producer);
707
708		// Wait for the async task to run.
709		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
710
711		assert!(origin.consumer.consume_broadcast("test").is_some());
712		origin.consumer.assert_next_none("test");
713		origin.consumer.assert_next("test", &consumer1);
714
715		// Drop the final broadcast, we should unannounce.
716		drop(broadcast1.producer);
717
718		// Wait for the async task to run.
719		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
720		assert!(origin.consumer.consume_broadcast("test").is_none());
721
722		origin.consumer.assert_next_none("test");
723		origin.consumer.assert_next_wait();
724	}
725
726	#[tokio::test]
727	async fn test_duplicate_reverse() {
728		let origin = Origin::produce();
729		let broadcast1 = Broadcast::produce();
730		let broadcast2 = Broadcast::produce();
731
732		origin.producer.publish_broadcast("test", broadcast1.consumer);
733		origin.producer.publish_broadcast("test", broadcast2.consumer);
734		assert!(origin.consumer.consume_broadcast("test").is_some());
735
736		// This is harder, dropping the new broadcast first.
737		drop(broadcast2.producer);
738
739		// Wait for the cleanup async task to run.
740		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
741		assert!(origin.consumer.consume_broadcast("test").is_some());
742
743		drop(broadcast1.producer);
744
745		// Wait for the cleanup async task to run.
746		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
747		assert!(origin.consumer.consume_broadcast("test").is_none());
748	}
749
750	#[tokio::test]
751	async fn test_double_publish() {
752		let origin = Origin::produce();
753		let broadcast = Broadcast::produce();
754
755		// Ensure it doesn't crash.
756		origin.producer.publish_broadcast("test", broadcast.producer.consume());
757		origin.producer.publish_broadcast("test", broadcast.producer.consume());
758
759		assert!(origin.consumer.consume_broadcast("test").is_some());
760
761		drop(broadcast.producer);
762
763		// Wait for the async task to run.
764		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
765		assert!(origin.consumer.consume_broadcast("test").is_none());
766	}
767	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
768	#[tokio::test]
769	#[should_panic]
770	async fn test_128() {
771		let mut origin = Origin::produce();
772		let broadcast = Broadcast::produce();
773
774		for i in 0..256 {
775			origin
776				.producer
777				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
778		}
779
780		for i in 0..256 {
781			origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
782		}
783	}
784
785	#[tokio::test]
786	async fn test_128_fix() {
787		let mut origin = Origin::produce();
788		let broadcast = Broadcast::produce();
789
790		for i in 0..256 {
791			origin
792				.producer
793				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
794		}
795
796		for i in 0..256 {
797			// try_next does not have the same issue because it's synchronous.
798			origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
799		}
800	}
801
802	#[tokio::test]
803	async fn test_with_root_basic() {
804		let mut origin = Origin::produce();
805		let broadcast = Broadcast::produce();
806
807		// Create a producer with root "/foo"
808		let foo_producer = origin.producer.with_root("foo").expect("should create root");
809		assert_eq!(foo_producer.root().as_str(), "foo");
810
811		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
812		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
813
814		// The original consumer should see the full path
815		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
816
817		// A consumer created from the rooted producer should see the stripped path
818		let mut foo_consumer = foo_producer.consume();
819		foo_consumer.assert_next("bar/baz", &broadcast.consumer);
820	}
821
822	#[tokio::test]
823	async fn test_with_root_nested() {
824		let mut origin = Origin::produce();
825		let broadcast = Broadcast::produce();
826
827		// Create nested roots
828		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
829		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
830		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
831
832		// Publishing to "baz" should actually publish to "foo/bar/baz"
833		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
834
835		// The original consumer sees the full path
836		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
837
838		// Consumer from foo_bar_producer sees just "baz"
839		let mut foo_bar_consumer = foo_bar_producer.consume();
840		foo_bar_consumer.assert_next("baz", &broadcast.consumer);
841	}
842
843	#[tokio::test]
844	async fn test_publish_only_allows() {
845		let origin = Origin::produce();
846		let broadcast = Broadcast::produce();
847
848		// Create a producer that can only publish to "allowed" paths
849		let limited_producer = origin
850			.producer
851			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
852			.expect("should create limited producer");
853
854		// Should be able to publish to allowed paths
855		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
856		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
857		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
858
859		// Should not be able to publish to disallowed paths
860		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
861		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path
862		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
863	}
864
865	#[tokio::test]
866	async fn test_publish_only_empty() {
867		let origin = Origin::produce();
868
869		// Creating a producer with no allowed paths should return None
870		assert!(origin.producer.publish_only(&[]).is_none());
871	}
872
873	#[tokio::test]
874	async fn test_consume_only_filters() {
875		let mut origin = Origin::produce();
876		let broadcast1 = Broadcast::produce();
877		let broadcast2 = Broadcast::produce();
878		let broadcast3 = Broadcast::produce();
879
880		// Publish to different paths
881		origin
882			.producer
883			.publish_broadcast("allowed", broadcast1.consumer.clone());
884		origin
885			.producer
886			.publish_broadcast("allowed/nested", broadcast2.consumer.clone());
887		origin
888			.producer
889			.publish_broadcast("notallowed", broadcast3.consumer.clone());
890
891		// Create a consumer that only sees "allowed" paths
892		let mut limited_consumer = origin
893			.consumer
894			.consume_only(&["allowed".into()])
895			.expect("should create limited consumer");
896
897		// Should only receive broadcasts under "allowed"
898		limited_consumer.assert_next("allowed", &broadcast1.consumer);
899		limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
900		limited_consumer.assert_next_wait(); // Should not see "notallowed"
901
902		// Original consumer should see all
903		origin.consumer.assert_next("allowed", &broadcast1.consumer);
904		origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
905		origin.consumer.assert_next("notallowed", &broadcast3.consumer);
906	}
907
908	#[tokio::test]
909	async fn test_consume_only_multiple_prefixes() {
910		let origin = Origin::produce();
911		let broadcast1 = Broadcast::produce();
912		let broadcast2 = Broadcast::produce();
913		let broadcast3 = Broadcast::produce();
914
915		origin
916			.producer
917			.publish_broadcast("foo/test", broadcast1.consumer.clone());
918		origin
919			.producer
920			.publish_broadcast("bar/test", broadcast2.consumer.clone());
921		origin
922			.producer
923			.publish_broadcast("baz/test", broadcast3.consumer.clone());
924
925		// Consumer that only sees "foo" and "bar" paths
926		let mut limited_consumer = origin
927			.consumer
928			.consume_only(&["foo".into(), "bar".into()])
929			.expect("should create limited consumer");
930
931		limited_consumer.assert_next("foo/test", &broadcast1.consumer);
932		limited_consumer.assert_next("bar/test", &broadcast2.consumer);
933		limited_consumer.assert_next_wait(); // Should not see "baz/test"
934	}
935
936	#[tokio::test]
937	async fn test_with_root_and_publish_only() {
938		let mut origin = Origin::produce();
939		let broadcast = Broadcast::produce();
940
941		// User connects to /foo root
942		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
943
944		// Limit them to publish only to "bar" and "goop/pee" within /foo
945		let limited_producer = foo_producer
946			.publish_only(&["bar".into(), "goop/pee".into()])
947			.expect("should create limited producer");
948
949		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
950		assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
951		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
952		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
953		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
954
955		// Should not be able to publish outside allowed paths
956		assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
957		assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed
958		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
959
960		// Original consumer sees full paths
961		origin.consumer.assert_next("foo/bar", &broadcast.consumer);
962		origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
963		origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
964		origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
965	}
966
967	#[tokio::test]
968	async fn test_with_root_and_consume_only() {
969		let origin = Origin::produce();
970		let broadcast1 = Broadcast::produce();
971		let broadcast2 = Broadcast::produce();
972		let broadcast3 = Broadcast::produce();
973
974		// Publish broadcasts
975		origin
976			.producer
977			.publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
978		origin
979			.producer
980			.publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
981		origin
982			.producer
983			.publish_broadcast("foo/other/test", broadcast3.consumer.clone());
984
985		// User connects to /foo root
986		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
987
988		// Create consumer limited to "bar" and "goop/pee" within /foo
989		let mut limited_consumer = foo_producer
990			.consume_only(&["bar".into(), "goop/pee".into()])
991			.expect("should create limited consumer");
992
993		// Should only see allowed paths (without foo prefix)
994		limited_consumer.assert_next("bar/test", &broadcast1.consumer);
995		limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
996		limited_consumer.assert_next_wait(); // Should not see "other/test"
997	}
998
999	#[tokio::test]
1000	async fn test_with_root_unauthorized() {
1001		let origin = Origin::produce();
1002
1003		// First limit the producer to specific paths
1004		let limited_producer = origin
1005			.producer
1006			.publish_only(&["allowed".into()])
1007			.expect("should create limited producer");
1008
1009		// Trying to create a root outside allowed paths should fail
1010		assert!(limited_producer.with_root("notallowed").is_none());
1011
1012		// But creating a root within allowed paths should work
1013		let allowed_root = limited_producer
1014			.with_root("allowed")
1015			.expect("should create allowed root");
1016		assert_eq!(allowed_root.root().as_str(), "allowed");
1017	}
1018
1019	#[tokio::test]
1020	async fn test_wildcard_permission() {
1021		let origin = Origin::produce();
1022		let broadcast = Broadcast::produce();
1023
1024		// Producer with root access (empty string means wildcard)
1025		let root_producer = origin.producer.clone();
1026
1027		// Should be able to publish anywhere
1028		assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1029		assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1030
1031		// Can create any root
1032		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1033		assert_eq!(foo_producer.root().as_str(), "foo");
1034	}
1035
1036	#[tokio::test]
1037	async fn test_consume_broadcast_with_permissions() {
1038		let origin = Origin::produce();
1039		let broadcast1 = Broadcast::produce();
1040		let broadcast2 = Broadcast::produce();
1041
1042		origin
1043			.producer
1044			.publish_broadcast("allowed/test", broadcast1.consumer.clone());
1045		origin
1046			.producer
1047			.publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1048
1049		// Create limited consumer
1050		let limited_consumer = origin
1051			.consumer
1052			.consume_only(&["allowed".into()])
1053			.expect("should create limited consumer");
1054
1055		// Should be able to get allowed broadcast
1056		let result = limited_consumer.consume_broadcast("allowed/test");
1057		assert!(result.is_some());
1058		assert!(result.unwrap().is_clone(&broadcast1.consumer));
1059
1060		// Should not be able to get disallowed broadcast
1061		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1062
1063		// Original consumer can get both
1064		assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1065		assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1066	}
1067
1068	#[tokio::test]
1069	async fn test_nested_paths_with_permissions() {
1070		let origin = Origin::produce();
1071		let broadcast = Broadcast::produce();
1072
1073		// Create producer limited to "a/b/c"
1074		let limited_producer = origin
1075			.producer
1076			.publish_only(&["a/b/c".into()])
1077			.expect("should create limited producer");
1078
1079		// Should be able to publish to exact path and nested paths
1080		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1081		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1082		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1083
1084		// Should not be able to publish to parent or sibling paths
1085		assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1086		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1087		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1088	}
1089
1090	#[tokio::test]
1091	async fn test_multiple_consumers_with_different_permissions() {
1092		let origin = Origin::produce();
1093		let broadcast1 = Broadcast::produce();
1094		let broadcast2 = Broadcast::produce();
1095		let broadcast3 = Broadcast::produce();
1096
1097		// Publish to different paths
1098		origin
1099			.producer
1100			.publish_broadcast("foo/test", broadcast1.consumer.clone());
1101		origin
1102			.producer
1103			.publish_broadcast("bar/test", broadcast2.consumer.clone());
1104		origin
1105			.producer
1106			.publish_broadcast("baz/test", broadcast3.consumer.clone());
1107
1108		// Create consumers with different permissions
1109		let mut foo_consumer = origin
1110			.consumer
1111			.consume_only(&["foo".into()])
1112			.expect("should create foo consumer");
1113
1114		let mut bar_consumer = origin
1115			.consumer
1116			.consume_only(&["bar".into()])
1117			.expect("should create bar consumer");
1118
1119		let mut foobar_consumer = origin
1120			.consumer
1121			.consume_only(&["foo".into(), "bar".into()])
1122			.expect("should create foobar consumer");
1123
1124		// Each consumer should only see their allowed paths
1125		foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1126		foo_consumer.assert_next_wait();
1127
1128		bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1129		bar_consumer.assert_next_wait();
1130
1131		foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1132		foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1133		foobar_consumer.assert_next_wait();
1134	}
1135
1136	#[tokio::test]
1137	async fn test_select_with_empty_prefix() {
1138		let origin = Origin::produce();
1139		let broadcast1 = Broadcast::produce();
1140		let broadcast2 = Broadcast::produce();
1141
1142		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1143		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1144		let limited_producer = demo_producer
1145			.publish_only(&["worm-node".into(), "foobar".into()])
1146			.expect("should create limited producer");
1147
1148		// Publish some broadcasts
1149		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1150		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1151
1152		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1153		let mut consumer = limited_producer
1154			.consume_only(&["".into()])
1155			.expect("should create consumer with empty prefix");
1156
1157		// Should still see both broadcasts
1158		consumer.assert_next("worm-node/test", &broadcast1.consumer);
1159		consumer.assert_next("foobar/test", &broadcast2.consumer);
1160		consumer.assert_next_wait();
1161	}
1162
1163	#[tokio::test]
1164	async fn test_select_narrowing_scope() {
1165		let origin = Origin::produce();
1166		let broadcast1 = Broadcast::produce();
1167		let broadcast2 = Broadcast::produce();
1168		let broadcast3 = Broadcast::produce();
1169
1170		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1171		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1172		let limited_producer = demo_producer
1173			.publish_only(&["worm-node".into(), "foobar".into()])
1174			.expect("should create limited producer");
1175
1176		// Publish broadcasts at different levels
1177		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1178		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1179		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1180
1181		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1182		let mut worm_consumer = limited_producer
1183			.consume_only(&["worm-node".into()])
1184			.expect("should create worm-node consumer");
1185
1186		// Should see worm-node content with paths stripped to ""
1187		worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1188		worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1189		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1190
1191		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1192		let mut foo_consumer = limited_producer
1193			.consume_only(&["worm-node/foo".into()])
1194			.expect("should create worm-node/foo consumer");
1195
1196		foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1197		foo_consumer.assert_next_wait(); // Should NOT see other content
1198	}
1199
1200	#[tokio::test]
1201	async fn test_select_multiple_roots_with_empty_prefix() {
1202		let origin = Origin::produce();
1203		let broadcast1 = Broadcast::produce();
1204		let broadcast2 = Broadcast::produce();
1205		let broadcast3 = Broadcast::produce();
1206
1207		// Producer with multiple allowed roots
1208		let limited_producer = origin
1209			.producer
1210			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1211			.expect("should create limited producer");
1212
1213		// Publish to each root
1214		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1215		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1216		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1217
1218		// consume_only with empty prefix should maintain all roots
1219		let mut consumer = limited_producer
1220			.consume_only(&["".into()])
1221			.expect("should create consumer with empty prefix");
1222
1223		// Should see all broadcasts from all roots
1224		consumer.assert_next("app1/data", &broadcast1.consumer);
1225		consumer.assert_next("app2/config", &broadcast2.consumer);
1226		consumer.assert_next("shared/resource", &broadcast3.consumer);
1227		consumer.assert_next_wait();
1228	}
1229
1230	#[tokio::test]
1231	async fn test_publish_only_with_empty_prefix() {
1232		let origin = Origin::produce();
1233		let broadcast = Broadcast::produce();
1234
1235		// Producer with specific allowed paths
1236		let limited_producer = origin
1237			.producer
1238			.publish_only(&["services/api".into(), "services/web".into()])
1239			.expect("should create limited producer");
1240
1241		// publish_only with empty prefix should keep the same restrictions
1242		let same_producer = limited_producer
1243			.publish_only(&["".into()])
1244			.expect("should create producer with empty prefix");
1245
1246		// Should still have the same publishing restrictions
1247		assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1248		assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1249		assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1250		assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1251	}
1252
1253	#[tokio::test]
1254	async fn test_select_narrowing_to_deeper_path() {
1255		let origin = Origin::produce();
1256		let broadcast1 = Broadcast::produce();
1257		let broadcast2 = Broadcast::produce();
1258		let broadcast3 = Broadcast::produce();
1259
1260		// Producer with broad permission
1261		let limited_producer = origin
1262			.producer
1263			.publish_only(&["org".into()])
1264			.expect("should create limited producer");
1265
1266		// Publish at various depths
1267		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1268		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1269		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1270
1271		// Narrow down to team1 only
1272		let mut team1_consumer = limited_producer
1273			.consume_only(&["org/team2".into()])
1274			.expect("should create team1 consumer");
1275
1276		team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1277		team1_consumer.assert_next_wait(); // Should NOT see team1 content
1278
1279		// Further narrow down to team1/project1
1280		let mut project1_consumer = limited_producer
1281			.consume_only(&["org/team1/project1".into()])
1282			.expect("should create project1 consumer");
1283
1284		// Should only see project1 content at root
1285		project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1286		project1_consumer.assert_next_wait();
1287	}
1288
1289	#[tokio::test]
1290	async fn test_select_with_non_matching_prefix() {
1291		let origin = Origin::produce();
1292
1293		// Producer with specific allowed paths
1294		let limited_producer = origin
1295			.producer
1296			.publish_only(&["allowed/path".into()])
1297			.expect("should create limited producer");
1298
1299		// Trying to consume_only with a completely different prefix should return None
1300		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1301
1302		// Similarly for publish_only
1303		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1304	}
1305
1306	#[tokio::test]
1307	async fn test_select_maintains_access_with_wider_prefix() {
1308		let origin = Origin::produce();
1309		let broadcast1 = Broadcast::produce();
1310		let broadcast2 = Broadcast::produce();
1311
1312		// Setup: user with root "demo" allowed to subscribe to specific paths
1313		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1314		let user_producer = demo_producer
1315			.publish_only(&["worm-node".into(), "foobar".into()])
1316			.expect("should create user producer");
1317
1318		// Publish some data
1319		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1320		assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1321
1322		// Key test: consume_only with "" should maintain access to allowed roots
1323		let mut consumer = user_producer
1324			.consume_only(&["".into()])
1325			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1326
1327		// Should still receive broadcasts from allowed paths
1328		consumer.assert_next("worm-node/data", &broadcast1.consumer);
1329		consumer.assert_next("foobar", &broadcast2.consumer);
1330		consumer.assert_next_wait();
1331
1332		// Also test that we can still narrow the scope
1333		let mut narrow_consumer = user_producer
1334			.consume_only(&["worm-node".into()])
1335			.expect("should be able to narrow scope to worm-node");
1336
1337		narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1338		narrow_consumer.assert_next_wait(); // Should not see foobar
1339	}
1340}