moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() {
125			next
126		} else {
127			next.lock().leaf(&rest)
128		}
129	}
130
131	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
132		match self.nested.get(dir) {
133			Some(next) => next.clone(),
134			None => {
135				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
136				self.nested.insert(dir.to_string(), next.clone());
137				next
138			}
139		}
140	}
141
142	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
143		let full = full.as_path();
144		let rest = relative.as_path();
145
146		// If the path has a directory component, then publish it to the nested node.
147		if let Some((dir, relative)) = rest.next_part() {
148			// Not using entry to avoid allocating a string most of the time.
149			self.entry(dir).lock().publish(&full, broadcast, &relative);
150		} else if let Some(existing) = &mut self.broadcast {
151			// This node is a leaf with an existing broadcast.
152			let old = existing.active.clone();
153			existing.active = broadcast.clone();
154			existing.backup.push(old);
155
156			self.notify.lock().reannounce(full, broadcast);
157		} else {
158			// This node is a leaf with no existing broadcast.
159			self.broadcast = Some(OriginBroadcast {
160				path: full.to_owned(),
161				active: broadcast.clone(),
162				backup: Vec::new(),
163			});
164			self.notify.lock().announce(full, broadcast);
165		}
166	}
167
168	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
169		self.consume_initial(&mut notify);
170		self.notify.lock().consumers.insert(id, notify);
171	}
172
173	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
174		if let Some(broadcast) = &self.broadcast {
175			notify.announce(&broadcast.path, broadcast.active.clone());
176		}
177
178		// Recursively subscribe to all nested nodes.
179		for nested in self.nested.values() {
180			nested.lock().consume_initial(notify);
181		}
182	}
183
184	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
185		let rest = rest.as_path();
186
187		if let Some((dir, rest)) = rest.next_part() {
188			let node = self.nested.get(dir)?.lock();
189			node.consume_broadcast(&rest)
190		} else {
191			self.broadcast.as_ref().map(|b| b.active.clone())
192		}
193	}
194
195	fn unconsume(&mut self, id: ConsumerId) {
196		self.notify.lock().consumers.remove(&id).expect("consumer not found");
197		if self.is_empty() {
198			//tracing::warn!("TODO: empty node; memory leak");
199			// This happens when consuming a path that is not being broadcasted.
200		}
201	}
202
203	// Returns true if the broadcast should be unannounced.
204	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
205		let full = full.as_path();
206		let relative = relative.as_path();
207
208		if let Some((dir, relative)) = relative.next_part() {
209			let nested = self.entry(dir);
210			let mut locked = nested.lock();
211			locked.remove(&full, broadcast, &relative);
212
213			if locked.is_empty() {
214				drop(locked);
215				self.nested.remove(dir);
216			}
217		} else {
218			let entry = match &mut self.broadcast {
219				Some(existing) => existing,
220				None => return,
221			};
222
223			// See if we can remove the broadcast from the backup list.
224			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
225			if let Some(pos) = pos {
226				entry.backup.remove(pos);
227				// Nothing else to do
228				return;
229			}
230
231			// Okay so it must be the active broadcast or else we fucked up.
232			assert!(entry.active.is_clone(&broadcast));
233
234			// If there's a backup broadcast, then announce it.
235			if let Some(active) = entry.backup.pop() {
236				entry.active = active;
237				self.notify.lock().reannounce(full, &entry.active);
238			} else {
239				// No more backups, so remove the entry.
240				self.broadcast = None;
241				self.notify.lock().unannounce(full);
242			}
243		}
244	}
245
246	fn is_empty(&self) -> bool {
247		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
248	}
249}
250
251#[derive(Clone)]
252struct OriginNodes {
253	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
254}
255
256impl OriginNodes {
257	// Returns nested roots that match the prefixes.
258	// TODO enforce that prefixes can't overlap.
259	pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
260		let mut roots = Vec::new();
261
262		for (root, state) in &self.nodes {
263			for prefix in prefixes {
264				if root.has_prefix(prefix) {
265					// Keep the existing node if we're allowed to access it.
266					roots.push((root.to_owned(), state.clone()));
267					continue;
268				}
269
270				if let Some(suffix) = prefix.strip_prefix(root) {
271					// If the requested prefix is larger than the allowed prefix, then we further scope it.
272					let nested = state.lock().leaf(&suffix);
273					roots.push((prefix.to_owned(), nested));
274				}
275			}
276		}
277
278		if roots.is_empty() {
279			None
280		} else {
281			Some(Self { nodes: roots })
282		}
283	}
284
285	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
286		let new_root = new_root.as_path();
287		let mut roots = Vec::new();
288
289		if new_root.is_empty() {
290			return Some(self.clone());
291		}
292
293		for (root, state) in &self.nodes {
294			if let Some(suffix) = root.strip_prefix(&new_root) {
295				// If the old root is longer than the new root, shorten the keys.
296				roots.push((suffix.to_owned(), state.clone()));
297			} else if let Some(suffix) = new_root.strip_prefix(root) {
298				// If the new root is longer than the old root, add a new root.
299				// NOTE: suffix can't be empty
300				let nested = state.lock().leaf(&suffix);
301				roots.push(("".into(), nested));
302			}
303		}
304
305		if roots.is_empty() {
306			None
307		} else {
308			Some(Self { nodes: roots })
309		}
310	}
311
312	// Returns the root that has this prefix.
313	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
314		let path = path.as_path();
315
316		for (root, state) in &self.nodes {
317			if let Some(suffix) = path.strip_prefix(root) {
318				return Some((state.clone(), suffix.to_owned()));
319			}
320		}
321
322		None
323	}
324}
325
326impl Default for OriginNodes {
327	fn default() -> Self {
328		Self {
329			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
330		}
331	}
332}
333
334/// A broadcast path and its associated consumer, or None if closed.
335pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
336
337pub struct Origin {}
338
339impl Origin {
340	pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
341		let producer = OriginProducer::default();
342		let consumer = producer.consume();
343		Produce { producer, consumer }
344	}
345}
346
347/// Announces broadcasts to consumers over the network.
348#[derive(Clone, Default)]
349pub struct OriginProducer {
350	// The roots of the tree that we are allowed to publish.
351	// A path of "" means we can publish anything.
352	nodes: OriginNodes,
353
354	/// The prefix that is automatically stripped from all paths.
355	root: PathOwned,
356}
357
358impl OriginProducer {
359	/// Publish a broadcast, announcing it to all consumers.
360	///
361	/// The broadcast will be unannounced when it is closed.
362	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
363	/// If the old broadcast is closed before the new one, then nothing will happen.
364	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
365	///
366	/// Returns false if the broadcast is not allowed to be published.
367	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
368		let path = path.as_path();
369
370		let (root, rest) = match self.nodes.get(&path) {
371			Some(root) => root,
372			None => return false,
373		};
374
375		let full = self.root.join(&path);
376
377		root.lock().publish(&full, &broadcast, &rest);
378		let root = root.clone();
379
380		web_async::spawn(async move {
381			broadcast.closed().await;
382			root.lock().remove(&full, broadcast, &rest);
383		});
384
385		true
386	}
387
388	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
389	///
390	/// Returns None if there are no legal prefixes.
391	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
392		Some(OriginProducer {
393			nodes: self.nodes.select(prefixes)?,
394			root: self.root.clone(),
395		})
396	}
397
398	/// Subscribe to all announced broadcasts.
399	pub fn consume(&self) -> OriginConsumer {
400		OriginConsumer::new(self.root.clone(), self.nodes.clone())
401	}
402
403	/// Subscribe to all announced broadcasts matching the prefix.
404	///
405	/// TODO: Don't use overlapping prefixes or duplicates will be published.
406	///
407	/// Returns None if there are no legal prefixes.
408	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
409		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
410	}
411
412	/// Returns a new OriginProducer that automatically strips out the provided prefix.
413	///
414	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
415	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
416		let prefix = prefix.as_path();
417
418		Some(Self {
419			root: self.root.join(&prefix).to_owned(),
420			nodes: self.nodes.root(&prefix)?,
421		})
422	}
423
424	/// Returns the root that is automatically stripped from all paths.
425	pub fn root(&self) -> &Path<'_> {
426		&self.root
427	}
428
429	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
430		self.nodes.nodes.iter().map(|(root, _)| root)
431	}
432
433	/// Converts a relative path to an absolute path.
434	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
435		self.root.join(path)
436	}
437}
438
439/// Consumes announced broadcasts matching against an optional prefix.
440///
441/// NOTE: Clone is expensive, try to avoid it.
442pub struct OriginConsumer {
443	id: ConsumerId,
444	nodes: OriginNodes,
445	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
446
447	/// A prefix that is automatically stripped from all paths.
448	root: PathOwned,
449}
450
451impl OriginConsumer {
452	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
453		let (tx, rx) = mpsc::unbounded_channel();
454
455		let id = ConsumerId::new();
456
457		for (_, state) in &nodes.nodes {
458			let notify = OriginConsumerNotify {
459				root: root.clone(),
460				tx: tx.clone(),
461			};
462			state.lock().consume(id, notify);
463		}
464
465		Self {
466			id,
467			nodes,
468			updates: rx,
469			root,
470		}
471	}
472
473	/// Returns the next (un)announced broadcast and the absolute path.
474	///
475	/// The broadcast will only be announced if it was previously unannounced.
476	/// The same path won't be announced/unannounced twice, instead it will toggle.
477	/// Returns None if the consumer is closed.
478	///
479	/// Note: The returned path is absolute and will always match this consumer's prefix.
480	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
481		self.updates.recv().await
482	}
483
484	/// Returns the next (un)announced broadcast and the absolute path without blocking.
485	///
486	/// Returns None if there is no update available; NOT because the consumer is closed.
487	/// You have to use `is_closed` to check if the consumer is closed.
488	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
489		self.updates.try_recv().ok()
490	}
491
492	pub fn consume(&self) -> Self {
493		self.clone()
494	}
495
496	/// Get a specific broadcast by path.
497	///
498	/// TODO This should include announcement support.
499	///
500	/// Returns None if the path hasn't been announced yet.
501	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
502		let path = path.as_path();
503		let (root, rest) = self.nodes.get(&path)?;
504		let state = root.lock();
505		state.consume_broadcast(&rest)
506	}
507
508	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
509	///
510	/// Returns None if there are no legal prefixes (would always return None).
511	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
512		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
513	}
514
515	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
516	///
517	/// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard.
518	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
519		let prefix = prefix.as_path();
520
521		Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
522	}
523
524	/// Returns the prefix that is automatically stripped from all paths.
525	pub fn root(&self) -> &Path<'_> {
526		&self.root
527	}
528
529	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
530		self.nodes.nodes.iter().map(|(root, _)| root)
531	}
532
533	/// Converts a relative path to an absolute path.
534	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
535		self.root.join(path)
536	}
537}
538
539impl Drop for OriginConsumer {
540	fn drop(&mut self) {
541		for (_, root) in &self.nodes.nodes {
542			root.lock().unconsume(self.id);
543		}
544	}
545}
546
547impl Clone for OriginConsumer {
548	fn clone(&self) -> Self {
549		OriginConsumer::new(self.root.clone(), self.nodes.clone())
550	}
551}
552
553#[cfg(test)]
554use futures::FutureExt;
555
556#[cfg(test)]
557impl OriginConsumer {
558	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
559		let expected = expected.as_path();
560		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
561		assert_eq!(path, expected, "wrong path");
562		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
563	}
564
565	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
566		let expected = expected.as_path();
567		let (path, active) = self.try_announced().expect("no next");
568		assert_eq!(path, expected, "wrong path");
569		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
570	}
571
572	pub fn assert_next_none(&mut self, expected: impl AsPath) {
573		let expected = expected.as_path();
574		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
575		assert_eq!(path, expected, "wrong path");
576		assert!(active.is_none(), "should be unannounced");
577	}
578
579	pub fn assert_next_wait(&mut self) {
580		if let Some(res) = self.announced().now_or_never() {
581			panic!("next should block: got {:?}", res.map(|(path, _)| path));
582		}
583	}
584
585	/*
586	pub fn assert_next_closed(&mut self) {
587		assert!(
588			self.announced().now_or_never().expect("next blocked").is_none(),
589			"next should be closed"
590		);
591	}
592	*/
593}
594
595#[cfg(test)]
596mod tests {
597	use crate::Broadcast;
598
599	use super::*;
600
601	#[tokio::test]
602	async fn test_announce() {
603		let origin = Origin::produce();
604		let broadcast1 = Broadcast::produce();
605		let broadcast2 = Broadcast::produce();
606
607		let mut consumer1 = origin.consumer;
608		// Make a new consumer that should get it.
609		consumer1.assert_next_wait();
610
611		// Publish the first broadcast.
612		origin.producer.publish_broadcast("test1", broadcast1.consumer);
613
614		consumer1.assert_next("test1", &broadcast1.producer.consume());
615		consumer1.assert_next_wait();
616
617		// Make a new consumer that should get the existing broadcast.
618		// But we don't consume it yet.
619		let mut consumer2 = origin.producer.consume();
620
621		// Publish the second broadcast.
622		origin.producer.publish_broadcast("test2", broadcast2.consumer);
623
624		consumer1.assert_next("test2", &broadcast2.producer.consume());
625		consumer1.assert_next_wait();
626
627		consumer2.assert_next("test1", &broadcast1.producer.consume());
628		consumer2.assert_next("test2", &broadcast2.producer.consume());
629		consumer2.assert_next_wait();
630
631		// Close the first broadcast.
632		drop(broadcast1.producer);
633
634		// Wait for the async task to run.
635		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
636
637		// All consumers should get a None now.
638		consumer1.assert_next_none("test1");
639		consumer2.assert_next_none("test1");
640		consumer1.assert_next_wait();
641		consumer2.assert_next_wait();
642
643		// And a new consumer only gets the last broadcast.
644		let mut consumer3 = origin.producer.consume();
645		consumer3.assert_next("test2", &broadcast2.producer.consume());
646		consumer3.assert_next_wait();
647
648		// Close the other producer and make sure it cleans up
649		drop(broadcast2.producer);
650
651		// Wait for the async task to run.
652		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
653
654		consumer1.assert_next_none("test2");
655		consumer2.assert_next_none("test2");
656		consumer3.assert_next_none("test2");
657
658		/* TODO close the origin consumer when the producer is dropped
659		consumer1.assert_next_closed();
660		consumer2.assert_next_closed();
661		consumer3.assert_next_closed();
662		*/
663	}
664
665	#[tokio::test]
666	async fn test_duplicate() {
667		let mut origin = Origin::produce();
668
669		let broadcast1 = Broadcast::produce();
670		let broadcast2 = Broadcast::produce();
671		let broadcast3 = Broadcast::produce();
672
673		let consumer1 = broadcast1.consumer;
674		let consumer2 = broadcast2.consumer;
675		let consumer3 = broadcast3.consumer;
676
677		origin.producer.publish_broadcast("test", consumer1.clone());
678		origin.producer.publish_broadcast("test", consumer2.clone());
679		origin.producer.publish_broadcast("test", consumer3.clone());
680
681		assert!(origin.consumer.consume_broadcast("test").is_some());
682
683		origin.consumer.assert_next("test", &consumer1);
684		origin.consumer.assert_next_none("test");
685		origin.consumer.assert_next("test", &consumer2);
686		origin.consumer.assert_next_none("test");
687		origin.consumer.assert_next("test", &consumer3);
688
689		// Drop the backup, nothing should change.
690		drop(broadcast2.producer);
691
692		// Wait for the async task to run.
693		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
694
695		assert!(origin.consumer.consume_broadcast("test").is_some());
696		origin.consumer.assert_next_wait();
697
698		// Drop the active, we should reannounce.
699		drop(broadcast3.producer);
700
701		// Wait for the async task to run.
702		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
703
704		assert!(origin.consumer.consume_broadcast("test").is_some());
705		origin.consumer.assert_next_none("test");
706		origin.consumer.assert_next("test", &consumer1);
707
708		// Drop the final broadcast, we should unannounce.
709		drop(broadcast1.producer);
710
711		// Wait for the async task to run.
712		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
713		assert!(origin.consumer.consume_broadcast("test").is_none());
714
715		origin.consumer.assert_next_none("test");
716		origin.consumer.assert_next_wait();
717	}
718
719	#[tokio::test]
720	async fn test_duplicate_reverse() {
721		let origin = Origin::produce();
722		let broadcast1 = Broadcast::produce();
723		let broadcast2 = Broadcast::produce();
724
725		origin.producer.publish_broadcast("test", broadcast1.consumer);
726		origin.producer.publish_broadcast("test", broadcast2.consumer);
727		assert!(origin.consumer.consume_broadcast("test").is_some());
728
729		// This is harder, dropping the new broadcast first.
730		drop(broadcast2.producer);
731
732		// Wait for the cleanup async task to run.
733		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
734		assert!(origin.consumer.consume_broadcast("test").is_some());
735
736		drop(broadcast1.producer);
737
738		// Wait for the cleanup async task to run.
739		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
740		assert!(origin.consumer.consume_broadcast("test").is_none());
741	}
742
743	#[tokio::test]
744	async fn test_double_publish() {
745		let origin = Origin::produce();
746		let broadcast = Broadcast::produce();
747
748		// Ensure it doesn't crash.
749		origin.producer.publish_broadcast("test", broadcast.producer.consume());
750		origin.producer.publish_broadcast("test", broadcast.producer.consume());
751
752		assert!(origin.consumer.consume_broadcast("test").is_some());
753
754		drop(broadcast.producer);
755
756		// Wait for the async task to run.
757		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
758		assert!(origin.consumer.consume_broadcast("test").is_none());
759	}
760	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
761	#[tokio::test]
762	#[should_panic]
763	async fn test_128() {
764		let mut origin = Origin::produce();
765		let broadcast = Broadcast::produce();
766
767		for i in 0..256 {
768			origin
769				.producer
770				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
771		}
772
773		for i in 0..256 {
774			origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
775		}
776	}
777
778	#[tokio::test]
779	async fn test_128_fix() {
780		let mut origin = Origin::produce();
781		let broadcast = Broadcast::produce();
782
783		for i in 0..256 {
784			origin
785				.producer
786				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
787		}
788
789		for i in 0..256 {
790			// try_next does not have the same issue because it's synchronous.
791			origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
792		}
793	}
794
795	#[tokio::test]
796	async fn test_with_root_basic() {
797		let mut origin = Origin::produce();
798		let broadcast = Broadcast::produce();
799
800		// Create a producer with root "/foo"
801		let foo_producer = origin.producer.with_root("foo").expect("should create root");
802		assert_eq!(foo_producer.root().as_str(), "foo");
803
804		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
805		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
806
807		// The original consumer should see the full path
808		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
809
810		// A consumer created from the rooted producer should see the stripped path
811		let mut foo_consumer = foo_producer.consume();
812		foo_consumer.assert_next("bar/baz", &broadcast.consumer);
813	}
814
815	#[tokio::test]
816	async fn test_with_root_nested() {
817		let mut origin = Origin::produce();
818		let broadcast = Broadcast::produce();
819
820		// Create nested roots
821		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
822		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
823		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
824
825		// Publishing to "baz" should actually publish to "foo/bar/baz"
826		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
827
828		// The original consumer sees the full path
829		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
830
831		// Consumer from foo_bar_producer sees just "baz"
832		let mut foo_bar_consumer = foo_bar_producer.consume();
833		foo_bar_consumer.assert_next("baz", &broadcast.consumer);
834	}
835
836	#[tokio::test]
837	async fn test_publish_only_allows() {
838		let origin = Origin::produce();
839		let broadcast = Broadcast::produce();
840
841		// Create a producer that can only publish to "allowed" paths
842		let limited_producer = origin
843			.producer
844			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
845			.expect("should create limited producer");
846
847		// Should be able to publish to allowed paths
848		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
849		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
850		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
851
852		// Should not be able to publish to disallowed paths
853		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
854		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path
855		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
856	}
857
858	#[tokio::test]
859	async fn test_publish_only_empty() {
860		let origin = Origin::produce();
861
862		// Creating a producer with no allowed paths should return None
863		assert!(origin.producer.publish_only(&[]).is_none());
864	}
865
866	#[tokio::test]
867	async fn test_consume_only_filters() {
868		let mut origin = Origin::produce();
869		let broadcast1 = Broadcast::produce();
870		let broadcast2 = Broadcast::produce();
871		let broadcast3 = Broadcast::produce();
872
873		// Publish to different paths
874		origin
875			.producer
876			.publish_broadcast("allowed", broadcast1.consumer.clone());
877		origin
878			.producer
879			.publish_broadcast("allowed/nested", broadcast2.consumer.clone());
880		origin
881			.producer
882			.publish_broadcast("notallowed", broadcast3.consumer.clone());
883
884		// Create a consumer that only sees "allowed" paths
885		let mut limited_consumer = origin
886			.consumer
887			.consume_only(&["allowed".into()])
888			.expect("should create limited consumer");
889
890		// Should only receive broadcasts under "allowed"
891		limited_consumer.assert_next("allowed", &broadcast1.consumer);
892		limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
893		limited_consumer.assert_next_wait(); // Should not see "notallowed"
894
895		// Original consumer should see all
896		origin.consumer.assert_next("allowed", &broadcast1.consumer);
897		origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
898		origin.consumer.assert_next("notallowed", &broadcast3.consumer);
899	}
900
901	#[tokio::test]
902	async fn test_consume_only_multiple_prefixes() {
903		let origin = Origin::produce();
904		let broadcast1 = Broadcast::produce();
905		let broadcast2 = Broadcast::produce();
906		let broadcast3 = Broadcast::produce();
907
908		origin
909			.producer
910			.publish_broadcast("foo/test", broadcast1.consumer.clone());
911		origin
912			.producer
913			.publish_broadcast("bar/test", broadcast2.consumer.clone());
914		origin
915			.producer
916			.publish_broadcast("baz/test", broadcast3.consumer.clone());
917
918		// Consumer that only sees "foo" and "bar" paths
919		let mut limited_consumer = origin
920			.consumer
921			.consume_only(&["foo".into(), "bar".into()])
922			.expect("should create limited consumer");
923
924		limited_consumer.assert_next("foo/test", &broadcast1.consumer);
925		limited_consumer.assert_next("bar/test", &broadcast2.consumer);
926		limited_consumer.assert_next_wait(); // Should not see "baz/test"
927	}
928
929	#[tokio::test]
930	async fn test_with_root_and_publish_only() {
931		let mut origin = Origin::produce();
932		let broadcast = Broadcast::produce();
933
934		// User connects to /foo root
935		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
936
937		// Limit them to publish only to "bar" and "goop/pee" within /foo
938		let limited_producer = foo_producer
939			.publish_only(&["bar".into(), "goop/pee".into()])
940			.expect("should create limited producer");
941
942		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
943		assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
944		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
945		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
946		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
947
948		// Should not be able to publish outside allowed paths
949		assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
950		assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed
951		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
952
953		// Original consumer sees full paths
954		origin.consumer.assert_next("foo/bar", &broadcast.consumer);
955		origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
956		origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
957		origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
958	}
959
960	#[tokio::test]
961	async fn test_with_root_and_consume_only() {
962		let origin = Origin::produce();
963		let broadcast1 = Broadcast::produce();
964		let broadcast2 = Broadcast::produce();
965		let broadcast3 = Broadcast::produce();
966
967		// Publish broadcasts
968		origin
969			.producer
970			.publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
971		origin
972			.producer
973			.publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
974		origin
975			.producer
976			.publish_broadcast("foo/other/test", broadcast3.consumer.clone());
977
978		// User connects to /foo root
979		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
980
981		// Create consumer limited to "bar" and "goop/pee" within /foo
982		let mut limited_consumer = foo_producer
983			.consume_only(&["bar".into(), "goop/pee".into()])
984			.expect("should create limited consumer");
985
986		// Should only see allowed paths (without foo prefix)
987		limited_consumer.assert_next("bar/test", &broadcast1.consumer);
988		limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
989		limited_consumer.assert_next_wait(); // Should not see "other/test"
990	}
991
992	#[tokio::test]
993	async fn test_with_root_unauthorized() {
994		let origin = Origin::produce();
995
996		// First limit the producer to specific paths
997		let limited_producer = origin
998			.producer
999			.publish_only(&["allowed".into()])
1000			.expect("should create limited producer");
1001
1002		// Trying to create a root outside allowed paths should fail
1003		assert!(limited_producer.with_root("notallowed").is_none());
1004
1005		// But creating a root within allowed paths should work
1006		let allowed_root = limited_producer
1007			.with_root("allowed")
1008			.expect("should create allowed root");
1009		assert_eq!(allowed_root.root().as_str(), "allowed");
1010	}
1011
1012	#[tokio::test]
1013	async fn test_wildcard_permission() {
1014		let origin = Origin::produce();
1015		let broadcast = Broadcast::produce();
1016
1017		// Producer with root access (empty string means wildcard)
1018		let root_producer = origin.producer.clone();
1019
1020		// Should be able to publish anywhere
1021		assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1022		assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1023
1024		// Can create any root
1025		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1026		assert_eq!(foo_producer.root().as_str(), "foo");
1027	}
1028
1029	#[tokio::test]
1030	async fn test_consume_broadcast_with_permissions() {
1031		let origin = Origin::produce();
1032		let broadcast1 = Broadcast::produce();
1033		let broadcast2 = Broadcast::produce();
1034
1035		origin
1036			.producer
1037			.publish_broadcast("allowed/test", broadcast1.consumer.clone());
1038		origin
1039			.producer
1040			.publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1041
1042		// Create limited consumer
1043		let limited_consumer = origin
1044			.consumer
1045			.consume_only(&["allowed".into()])
1046			.expect("should create limited consumer");
1047
1048		// Should be able to get allowed broadcast
1049		let result = limited_consumer.consume_broadcast("allowed/test");
1050		assert!(result.is_some());
1051		assert!(result.unwrap().is_clone(&broadcast1.consumer));
1052
1053		// Should not be able to get disallowed broadcast
1054		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1055
1056		// Original consumer can get both
1057		assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1058		assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1059	}
1060
1061	#[tokio::test]
1062	async fn test_nested_paths_with_permissions() {
1063		let origin = Origin::produce();
1064		let broadcast = Broadcast::produce();
1065
1066		// Create producer limited to "a/b/c"
1067		let limited_producer = origin
1068			.producer
1069			.publish_only(&["a/b/c".into()])
1070			.expect("should create limited producer");
1071
1072		// Should be able to publish to exact path and nested paths
1073		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1074		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1075		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1076
1077		// Should not be able to publish to parent or sibling paths
1078		assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1079		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1080		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1081	}
1082
1083	#[tokio::test]
1084	async fn test_multiple_consumers_with_different_permissions() {
1085		let origin = Origin::produce();
1086		let broadcast1 = Broadcast::produce();
1087		let broadcast2 = Broadcast::produce();
1088		let broadcast3 = Broadcast::produce();
1089
1090		// Publish to different paths
1091		origin
1092			.producer
1093			.publish_broadcast("foo/test", broadcast1.consumer.clone());
1094		origin
1095			.producer
1096			.publish_broadcast("bar/test", broadcast2.consumer.clone());
1097		origin
1098			.producer
1099			.publish_broadcast("baz/test", broadcast3.consumer.clone());
1100
1101		// Create consumers with different permissions
1102		let mut foo_consumer = origin
1103			.consumer
1104			.consume_only(&["foo".into()])
1105			.expect("should create foo consumer");
1106
1107		let mut bar_consumer = origin
1108			.consumer
1109			.consume_only(&["bar".into()])
1110			.expect("should create bar consumer");
1111
1112		let mut foobar_consumer = origin
1113			.consumer
1114			.consume_only(&["foo".into(), "bar".into()])
1115			.expect("should create foobar consumer");
1116
1117		// Each consumer should only see their allowed paths
1118		foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1119		foo_consumer.assert_next_wait();
1120
1121		bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1122		bar_consumer.assert_next_wait();
1123
1124		foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1125		foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1126		foobar_consumer.assert_next_wait();
1127	}
1128
1129	#[tokio::test]
1130	async fn test_select_with_empty_prefix() {
1131		let origin = Origin::produce();
1132		let broadcast1 = Broadcast::produce();
1133		let broadcast2 = Broadcast::produce();
1134
1135		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1136		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1137		let limited_producer = demo_producer
1138			.publish_only(&["worm-node".into(), "foobar".into()])
1139			.expect("should create limited producer");
1140
1141		// Publish some broadcasts
1142		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1143		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1144
1145		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1146		let mut consumer = limited_producer
1147			.consume_only(&["".into()])
1148			.expect("should create consumer with empty prefix");
1149
1150		// Should still see both broadcasts
1151		consumer.assert_next("worm-node/test", &broadcast1.consumer);
1152		consumer.assert_next("foobar/test", &broadcast2.consumer);
1153		consumer.assert_next_wait();
1154	}
1155
1156	#[tokio::test]
1157	async fn test_select_narrowing_scope() {
1158		let origin = Origin::produce();
1159		let broadcast1 = Broadcast::produce();
1160		let broadcast2 = Broadcast::produce();
1161		let broadcast3 = Broadcast::produce();
1162
1163		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1164		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1165		let limited_producer = demo_producer
1166			.publish_only(&["worm-node".into(), "foobar".into()])
1167			.expect("should create limited producer");
1168
1169		// Publish broadcasts at different levels
1170		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1171		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1172		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1173
1174		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1175		let mut worm_consumer = limited_producer
1176			.consume_only(&["worm-node".into()])
1177			.expect("should create worm-node consumer");
1178
1179		// Should see worm-node content with paths stripped to ""
1180		worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1181		worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1182		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1183
1184		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1185		let mut foo_consumer = limited_producer
1186			.consume_only(&["worm-node/foo".into()])
1187			.expect("should create worm-node/foo consumer");
1188
1189		foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1190		foo_consumer.assert_next_wait(); // Should NOT see other content
1191	}
1192
1193	#[tokio::test]
1194	async fn test_select_multiple_roots_with_empty_prefix() {
1195		let origin = Origin::produce();
1196		let broadcast1 = Broadcast::produce();
1197		let broadcast2 = Broadcast::produce();
1198		let broadcast3 = Broadcast::produce();
1199
1200		// Producer with multiple allowed roots
1201		let limited_producer = origin
1202			.producer
1203			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1204			.expect("should create limited producer");
1205
1206		// Publish to each root
1207		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1208		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1209		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1210
1211		// consume_only with empty prefix should maintain all roots
1212		let mut consumer = limited_producer
1213			.consume_only(&["".into()])
1214			.expect("should create consumer with empty prefix");
1215
1216		// Should see all broadcasts from all roots
1217		consumer.assert_next("app1/data", &broadcast1.consumer);
1218		consumer.assert_next("app2/config", &broadcast2.consumer);
1219		consumer.assert_next("shared/resource", &broadcast3.consumer);
1220		consumer.assert_next_wait();
1221	}
1222
1223	#[tokio::test]
1224	async fn test_publish_only_with_empty_prefix() {
1225		let origin = Origin::produce();
1226		let broadcast = Broadcast::produce();
1227
1228		// Producer with specific allowed paths
1229		let limited_producer = origin
1230			.producer
1231			.publish_only(&["services/api".into(), "services/web".into()])
1232			.expect("should create limited producer");
1233
1234		// publish_only with empty prefix should keep the same restrictions
1235		let same_producer = limited_producer
1236			.publish_only(&["".into()])
1237			.expect("should create producer with empty prefix");
1238
1239		// Should still have the same publishing restrictions
1240		assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1241		assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1242		assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1243		assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1244	}
1245
1246	#[tokio::test]
1247	async fn test_select_narrowing_to_deeper_path() {
1248		let origin = Origin::produce();
1249		let broadcast1 = Broadcast::produce();
1250		let broadcast2 = Broadcast::produce();
1251		let broadcast3 = Broadcast::produce();
1252
1253		// Producer with broad permission
1254		let limited_producer = origin
1255			.producer
1256			.publish_only(&["org".into()])
1257			.expect("should create limited producer");
1258
1259		// Publish at various depths
1260		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1261		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1262		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1263
1264		// Narrow down to team1 only
1265		let mut team1_consumer = limited_producer
1266			.consume_only(&["org/team2".into()])
1267			.expect("should create team1 consumer");
1268
1269		team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1270		team1_consumer.assert_next_wait(); // Should NOT see team1 content
1271
1272		// Further narrow down to team1/project1
1273		let mut project1_consumer = limited_producer
1274			.consume_only(&["org/team1/project1".into()])
1275			.expect("should create project1 consumer");
1276
1277		// Should only see project1 content at root
1278		project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1279		project1_consumer.assert_next_wait();
1280	}
1281
1282	#[tokio::test]
1283	async fn test_select_with_non_matching_prefix() {
1284		let origin = Origin::produce();
1285
1286		// Producer with specific allowed paths
1287		let limited_producer = origin
1288			.producer
1289			.publish_only(&["allowed/path".into()])
1290			.expect("should create limited producer");
1291
1292		// Trying to consume_only with a completely different prefix should return None
1293		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1294
1295		// Similarly for publish_only
1296		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1297	}
1298
1299	#[tokio::test]
1300	async fn test_select_maintains_access_with_wider_prefix() {
1301		let origin = Origin::produce();
1302		let broadcast1 = Broadcast::produce();
1303		let broadcast2 = Broadcast::produce();
1304
1305		// Setup: user with root "demo" allowed to subscribe to specific paths
1306		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1307		let user_producer = demo_producer
1308			.publish_only(&["worm-node".into(), "foobar".into()])
1309			.expect("should create user producer");
1310
1311		// Publish some data
1312		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1313		assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1314
1315		// Key test: consume_only with "" should maintain access to allowed roots
1316		let mut consumer = user_producer
1317			.consume_only(&["".into()])
1318			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1319
1320		// Should still receive broadcasts from allowed paths
1321		consumer.assert_next("worm-node/data", &broadcast1.consumer);
1322		consumer.assert_next("foobar", &broadcast2.consumer);
1323		consumer.assert_next_wait();
1324
1325		// Also test that we can still narrow the scope
1326		let mut narrow_consumer = user_producer
1327			.consume_only(&["worm-node".into()])
1328			.expect("should be able to narrow scope to worm-node");
1329
1330		narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1331		narrow_consumer.assert_next_wait(); // Should not see foobar
1332	}
1333}