Skip to main content

moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125	}
126
127	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128		match self.nested.get(dir) {
129			Some(next) => next.clone(),
130			None => {
131				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132				self.nested.insert(dir.to_string(), next.clone());
133				next
134			}
135		}
136	}
137
138	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139		let full = full.as_path();
140		let rest = relative.as_path();
141
142		// If the path has a directory component, then publish it to the nested node.
143		if let Some((dir, relative)) = rest.next_part() {
144			// Not using entry to avoid allocating a string most of the time.
145			self.entry(dir).lock().publish(&full, broadcast, &relative);
146		} else if let Some(existing) = &mut self.broadcast {
147			// This node is a leaf with an existing broadcast.
148			let old = existing.active.clone();
149			existing.active = broadcast.clone();
150			existing.backup.push(old);
151
152			self.notify.lock().reannounce(full, broadcast);
153		} else {
154			// This node is a leaf with no existing broadcast.
155			self.broadcast = Some(OriginBroadcast {
156				path: full.to_owned(),
157				active: broadcast.clone(),
158				backup: Vec::new(),
159			});
160			self.notify.lock().announce(full, broadcast);
161		}
162	}
163
164	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165		self.consume_initial(&mut notify);
166		self.notify.lock().consumers.insert(id, notify);
167	}
168
169	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170		if let Some(broadcast) = &self.broadcast {
171			notify.announce(&broadcast.path, broadcast.active.clone());
172		}
173
174		// Recursively subscribe to all nested nodes.
175		for nested in self.nested.values() {
176			nested.lock().consume_initial(notify);
177		}
178	}
179
180	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181		let rest = rest.as_path();
182
183		if let Some((dir, rest)) = rest.next_part() {
184			let node = self.nested.get(dir)?.lock();
185			node.consume_broadcast(&rest)
186		} else {
187			self.broadcast.as_ref().map(|b| b.active.clone())
188		}
189	}
190
191	fn unconsume(&mut self, id: ConsumerId) {
192		self.notify.lock().consumers.remove(&id).expect("consumer not found");
193		if self.is_empty() {
194			//tracing::warn!("TODO: empty node; memory leak");
195			// This happens when consuming a path that is not being broadcasted.
196		}
197	}
198
199	// Returns true if the broadcast should be unannounced.
200	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201		let full = full.as_path();
202		let relative = relative.as_path();
203
204		if let Some((dir, relative)) = relative.next_part() {
205			let nested = self.entry(dir);
206			let mut locked = nested.lock();
207			locked.remove(&full, broadcast, &relative);
208
209			if locked.is_empty() {
210				drop(locked);
211				self.nested.remove(dir);
212			}
213		} else {
214			let entry = match &mut self.broadcast {
215				Some(existing) => existing,
216				None => return,
217			};
218
219			// See if we can remove the broadcast from the backup list.
220			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221			if let Some(pos) = pos {
222				entry.backup.remove(pos);
223				// Nothing else to do
224				return;
225			}
226
227			// Okay so it must be the active broadcast or else we fucked up.
228			assert!(entry.active.is_clone(&broadcast));
229
230			// If there's a backup broadcast, then announce it.
231			if let Some(active) = entry.backup.pop() {
232				entry.active = active;
233				self.notify.lock().reannounce(full, &entry.active);
234			} else {
235				// No more backups, so remove the entry.
236				self.broadcast = None;
237				self.notify.lock().unannounce(full);
238			}
239		}
240	}
241
242	fn is_empty(&self) -> bool {
243		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244	}
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253	// Returns nested roots that match the prefixes.
254	// PathPrefixes guarantees no duplicates or overlapping prefixes.
255	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
256		let mut roots = Vec::new();
257
258		for (root, state) in &self.nodes {
259			for prefix in prefixes {
260				if root.has_prefix(prefix) {
261					// Keep the existing node if we're allowed to access it.
262					roots.push((root.to_owned(), state.clone()));
263					continue;
264				}
265
266				if let Some(suffix) = prefix.strip_prefix(root) {
267					// If the requested prefix is larger than the allowed prefix, then we further scope it.
268					let nested = state.lock().leaf(&suffix);
269					roots.push((prefix.to_owned(), nested));
270				}
271			}
272		}
273
274		if roots.is_empty() {
275			None
276		} else {
277			Some(Self { nodes: roots })
278		}
279	}
280
281	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282		let new_root = new_root.as_path();
283		let mut roots = Vec::new();
284
285		if new_root.is_empty() {
286			return Some(self.clone());
287		}
288
289		for (root, state) in &self.nodes {
290			if let Some(suffix) = root.strip_prefix(&new_root) {
291				// If the old root is longer than the new root, shorten the keys.
292				roots.push((suffix.to_owned(), state.clone()));
293			} else if let Some(suffix) = new_root.strip_prefix(root) {
294				// If the new root is longer than the old root, add a new root.
295				// NOTE: suffix can't be empty
296				let nested = state.lock().leaf(&suffix);
297				roots.push(("".into(), nested));
298			}
299		}
300
301		if roots.is_empty() {
302			None
303		} else {
304			Some(Self { nodes: roots })
305		}
306	}
307
308	// Returns the root that has this prefix.
309	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310		let path = path.as_path();
311
312		for (root, state) in &self.nodes {
313			if let Some(suffix) = path.strip_prefix(root) {
314				return Some((state.clone(), suffix.to_owned()));
315			}
316		}
317
318		None
319	}
320}
321
322impl Default for OriginNodes {
323	fn default() -> Self {
324		Self {
325			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326		}
327	}
328}
329
330/// A broadcast path and its associated consumer, or None if closed.
331pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333/// A collection of broadcasts that can be published and subscribed to.
334pub struct Origin {}
335
336impl Origin {
337	pub fn produce() -> OriginProducer {
338		OriginProducer::new()
339	}
340}
341
342/// Announces broadcasts to consumers over the network.
343#[derive(Clone, Default)]
344pub struct OriginProducer {
345	// The roots of the tree that we are allowed to publish.
346	// A path of "" means we can publish anything.
347	nodes: OriginNodes,
348
349	/// The prefix that is automatically stripped from all paths.
350	root: PathOwned,
351}
352
353impl OriginProducer {
354	pub fn new() -> Self {
355		Self::default()
356	}
357
358	/// Create and publish a new broadcast, returning the producer.
359	///
360	/// This is a helper method when you only want to publish a broadcast to a single origin.
361	/// Returns [None] if the broadcast is not allowed to be published.
362	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
363		let broadcast = Broadcast::produce();
364		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
365	}
366
367	/// Publish a broadcast, announcing it to all consumers.
368	///
369	/// The broadcast will be unannounced when it is closed.
370	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
371	/// If the old broadcast is closed before the new one, then nothing will happen.
372	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
373	///
374	/// Returns false if the broadcast is not allowed to be published.
375	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
376		let path = path.as_path();
377
378		let (root, rest) = match self.nodes.get(&path) {
379			Some(root) => root,
380			None => return false,
381		};
382
383		let full = self.root.join(&path);
384
385		root.lock().publish(&full, &broadcast, &rest);
386		let root = root.clone();
387
388		web_async::spawn(async move {
389			broadcast.closed().await;
390			root.lock().remove(&full, broadcast, &rest);
391		});
392
393		true
394	}
395
396	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
397	///
398	/// Returns None if there are no legal prefixes.
399	// TODO accept PathPrefixes instead of &[Path]
400	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
401		let prefixes = PathPrefixes::new(prefixes);
402		Some(OriginProducer {
403			nodes: self.nodes.select(&prefixes)?,
404			root: self.root.clone(),
405		})
406	}
407
408	/// Subscribe to all announced broadcasts.
409	pub fn consume(&self) -> OriginConsumer {
410		OriginConsumer::new(self.root.clone(), self.nodes.clone())
411	}
412
413	/// Subscribe to all announced broadcasts matching the prefix.
414	///
415	/// Returns None if there are no legal prefixes.
416	// TODO accept PathPrefixes instead of &[Path]
417	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
418		let prefixes = PathPrefixes::new(prefixes);
419		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
420	}
421
422	/// Subscribe to a specific broadcast.
423	///
424	/// Returns None if the broadcast is not found.
425	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
426		let path = path.as_path();
427		let (root, rest) = self.nodes.get(&path)?;
428		let state = root.lock();
429		state.consume_broadcast(&rest)
430	}
431
432	/// Returns a new OriginProducer that automatically strips out the provided prefix.
433	///
434	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
435	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
436		let prefix = prefix.as_path();
437
438		Some(Self {
439			root: self.root.join(&prefix).to_owned(),
440			nodes: self.nodes.root(&prefix)?,
441		})
442	}
443
444	/// Returns the root that is automatically stripped from all paths.
445	pub fn root(&self) -> &Path<'_> {
446		&self.root
447	}
448
449	// TODO return PathPrefixes
450	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
451		self.nodes.nodes.iter().map(|(root, _)| root)
452	}
453
454	/// Converts a relative path to an absolute path.
455	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
456		self.root.join(path)
457	}
458}
459
460/// Consumes announced broadcasts matching against an optional prefix.
461///
462/// NOTE: Clone is expensive, try to avoid it.
463pub struct OriginConsumer {
464	id: ConsumerId,
465	nodes: OriginNodes,
466	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
467
468	/// A prefix that is automatically stripped from all paths.
469	root: PathOwned,
470}
471
472impl OriginConsumer {
473	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
474		let (tx, rx) = mpsc::unbounded_channel();
475
476		let id = ConsumerId::new();
477
478		for (_, state) in &nodes.nodes {
479			let notify = OriginConsumerNotify {
480				root: root.clone(),
481				tx: tx.clone(),
482			};
483			state.lock().consume(id, notify);
484		}
485
486		Self {
487			id,
488			nodes,
489			updates: rx,
490			root,
491		}
492	}
493
494	/// Returns the next (un)announced broadcast and the absolute path.
495	///
496	/// The broadcast will only be announced if it was previously unannounced.
497	/// The same path won't be announced/unannounced twice, instead it will toggle.
498	/// Returns None if the consumer is closed.
499	///
500	/// Note: The returned path is absolute and will always match this consumer's prefix.
501	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
502		self.updates.recv().await
503	}
504
505	/// Returns the next (un)announced broadcast and the absolute path without blocking.
506	///
507	/// Returns None if there is no update available; NOT because the consumer is closed.
508	/// You have to use `is_closed` to check if the consumer is closed.
509	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
510		self.updates.try_recv().ok()
511	}
512
513	pub fn consume(&self) -> Self {
514		self.clone()
515	}
516
517	/// Get a specific broadcast by path.
518	///
519	/// TODO This should include announcement support.
520	///
521	/// Returns None if the path hasn't been announced yet.
522	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
523		let path = path.as_path();
524		let (root, rest) = self.nodes.get(&path)?;
525		let state = root.lock();
526		state.consume_broadcast(&rest)
527	}
528
529	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
530	///
531	/// Returns None if there are no legal prefixes (would always return None).
532	// TODO accept PathPrefixes instead of &[Path]
533	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
534		let prefixes = PathPrefixes::new(prefixes);
535		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
536	}
537
538	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
539	///
540	/// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard.
541	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
542		let prefix = prefix.as_path();
543
544		Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
545	}
546
547	/// Returns the prefix that is automatically stripped from all paths.
548	pub fn root(&self) -> &Path<'_> {
549		&self.root
550	}
551
552	// TODO return PathPrefixes
553	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
554		self.nodes.nodes.iter().map(|(root, _)| root)
555	}
556
557	/// Converts a relative path to an absolute path.
558	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
559		self.root.join(path)
560	}
561}
562
563impl Drop for OriginConsumer {
564	fn drop(&mut self) {
565		for (_, root) in &self.nodes.nodes {
566			root.lock().unconsume(self.id);
567		}
568	}
569}
570
571impl Clone for OriginConsumer {
572	fn clone(&self) -> Self {
573		OriginConsumer::new(self.root.clone(), self.nodes.clone())
574	}
575}
576
577#[cfg(test)]
578use futures::FutureExt;
579
580#[cfg(test)]
581impl OriginConsumer {
582	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
583		let expected = expected.as_path();
584		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
585		assert_eq!(path, expected, "wrong path");
586		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
587	}
588
589	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
590		let expected = expected.as_path();
591		let (path, active) = self.try_announced().expect("no next");
592		assert_eq!(path, expected, "wrong path");
593		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
594	}
595
596	pub fn assert_next_none(&mut self, expected: impl AsPath) {
597		let expected = expected.as_path();
598		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
599		assert_eq!(path, expected, "wrong path");
600		assert!(active.is_none(), "should be unannounced");
601	}
602
603	pub fn assert_next_wait(&mut self) {
604		if let Some(res) = self.announced().now_or_never() {
605			panic!("next should block: got {:?}", res.map(|(path, _)| path));
606		}
607	}
608
609	/*
610	pub fn assert_next_closed(&mut self) {
611		assert!(
612			self.announced().now_or_never().expect("next blocked").is_none(),
613			"next should be closed"
614		);
615	}
616	*/
617}
618
619#[cfg(test)]
620mod tests {
621	use crate::Broadcast;
622
623	use super::*;
624
625	#[tokio::test]
626	async fn test_announce() {
627		let origin = Origin::produce();
628		let broadcast1 = Broadcast::produce();
629		let broadcast2 = Broadcast::produce();
630
631		let mut consumer1 = origin.consume();
632		// Make a new consumer that should get it.
633		consumer1.assert_next_wait();
634
635		// Publish the first broadcast.
636		origin.publish_broadcast("test1", broadcast1.consume());
637
638		consumer1.assert_next("test1", &broadcast1.consume());
639		consumer1.assert_next_wait();
640
641		// Make a new consumer that should get the existing broadcast.
642		// But we don't consume it yet.
643		let mut consumer2 = origin.consume();
644
645		// Publish the second broadcast.
646		origin.publish_broadcast("test2", broadcast2.consume());
647
648		consumer1.assert_next("test2", &broadcast2.consume());
649		consumer1.assert_next_wait();
650
651		consumer2.assert_next("test1", &broadcast1.consume());
652		consumer2.assert_next("test2", &broadcast2.consume());
653		consumer2.assert_next_wait();
654
655		// Close the first broadcast.
656		drop(broadcast1);
657
658		// Wait for the async task to run.
659		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
660
661		// All consumers should get a None now.
662		consumer1.assert_next_none("test1");
663		consumer2.assert_next_none("test1");
664		consumer1.assert_next_wait();
665		consumer2.assert_next_wait();
666
667		// And a new consumer only gets the last broadcast.
668		let mut consumer3 = origin.consume();
669		consumer3.assert_next("test2", &broadcast2.consume());
670		consumer3.assert_next_wait();
671
672		// Close the other producer and make sure it cleans up
673		drop(broadcast2);
674
675		// Wait for the async task to run.
676		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
677
678		consumer1.assert_next_none("test2");
679		consumer2.assert_next_none("test2");
680		consumer3.assert_next_none("test2");
681
682		/* TODO close the origin consumer when the producer is dropped
683		consumer1.assert_next_closed();
684		consumer2.assert_next_closed();
685		consumer3.assert_next_closed();
686		*/
687	}
688
689	#[tokio::test]
690	async fn test_duplicate() {
691		let origin = Origin::produce();
692
693		let broadcast1 = Broadcast::produce();
694		let broadcast2 = Broadcast::produce();
695		let broadcast3 = Broadcast::produce();
696
697		let consumer1 = broadcast1.consume();
698		let consumer2 = broadcast2.consume();
699		let consumer3 = broadcast3.consume();
700
701		let mut consumer = origin.consume();
702
703		origin.publish_broadcast("test", consumer1.clone());
704		origin.publish_broadcast("test", consumer2.clone());
705		origin.publish_broadcast("test", consumer3.clone());
706		assert!(consumer.consume_broadcast("test").is_some());
707
708		consumer.assert_next("test", &consumer1);
709		consumer.assert_next_none("test");
710		consumer.assert_next("test", &consumer2);
711		consumer.assert_next_none("test");
712		consumer.assert_next("test", &consumer3);
713
714		// Drop the backup, nothing should change.
715		drop(broadcast2);
716
717		// Wait for the async task to run.
718		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
719
720		assert!(consumer.consume_broadcast("test").is_some());
721		consumer.assert_next_wait();
722
723		// Drop the active, we should reannounce.
724		drop(broadcast3);
725
726		// Wait for the async task to run.
727		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
728
729		assert!(consumer.consume_broadcast("test").is_some());
730		consumer.assert_next_none("test");
731		consumer.assert_next("test", &consumer1);
732
733		// Drop the final broadcast, we should unannounce.
734		drop(broadcast1);
735
736		// Wait for the async task to run.
737		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
738		assert!(consumer.consume_broadcast("test").is_none());
739
740		consumer.assert_next_none("test");
741		consumer.assert_next_wait();
742	}
743
744	#[tokio::test]
745	async fn test_duplicate_reverse() {
746		let origin = Origin::produce();
747		let broadcast1 = Broadcast::produce();
748		let broadcast2 = Broadcast::produce();
749
750		origin.publish_broadcast("test", broadcast1.consume());
751		origin.publish_broadcast("test", broadcast2.consume());
752		assert!(origin.consume_broadcast("test").is_some());
753
754		// This is harder, dropping the new broadcast first.
755		drop(broadcast2);
756
757		// Wait for the cleanup async task to run.
758		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
759		assert!(origin.consume_broadcast("test").is_some());
760
761		drop(broadcast1);
762
763		// Wait for the cleanup async task to run.
764		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
765		assert!(origin.consume_broadcast("test").is_none());
766	}
767
768	#[tokio::test]
769	async fn test_double_publish() {
770		let origin = Origin::produce();
771		let broadcast = Broadcast::produce();
772
773		// Ensure it doesn't crash.
774		origin.publish_broadcast("test", broadcast.consume());
775		origin.publish_broadcast("test", broadcast.consume());
776
777		assert!(origin.consume_broadcast("test").is_some());
778
779		drop(broadcast);
780
781		// Wait for the async task to run.
782		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
783		assert!(origin.consume_broadcast("test").is_none());
784	}
785	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
786	#[tokio::test]
787	#[should_panic]
788	async fn test_128() {
789		let origin = Origin::produce();
790		let broadcast = Broadcast::produce();
791
792		let mut consumer = origin.consume();
793		for i in 0..256 {
794			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
795		}
796
797		for i in 0..256 {
798			consumer.assert_next(format!("test{i}"), &broadcast.consume());
799		}
800	}
801
802	#[tokio::test]
803	async fn test_128_fix() {
804		let origin = Origin::produce();
805		let broadcast = Broadcast::produce();
806
807		let mut consumer = origin.consume();
808		for i in 0..256 {
809			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
810		}
811
812		for i in 0..256 {
813			// try_next does not have the same issue because it's synchronous.
814			consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
815		}
816	}
817
818	#[tokio::test]
819	async fn test_with_root_basic() {
820		let origin = Origin::produce();
821		let broadcast = Broadcast::produce();
822
823		// Create a producer with root "/foo"
824		let foo_producer = origin.with_root("foo").expect("should create root");
825		assert_eq!(foo_producer.root().as_str(), "foo");
826
827		let mut consumer = origin.consume();
828
829		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
830		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
831		// The original consumer should see the full path
832		consumer.assert_next("foo/bar/baz", &broadcast.consume());
833
834		// A consumer created from the rooted producer should see the stripped path
835		let mut foo_consumer = foo_producer.consume();
836		foo_consumer.assert_next("bar/baz", &broadcast.consume());
837	}
838
839	#[tokio::test]
840	async fn test_with_root_nested() {
841		let origin = Origin::produce();
842		let broadcast = Broadcast::produce();
843
844		// Create nested roots
845		let foo_producer = origin.with_root("foo").expect("should create foo root");
846		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
847		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
848
849		let mut consumer = origin.consume();
850
851		// Publishing to "baz" should actually publish to "foo/bar/baz"
852		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
853		// The original consumer sees the full path
854		consumer.assert_next("foo/bar/baz", &broadcast.consume());
855
856		// Consumer from foo_bar_producer sees just "baz"
857		let mut foo_bar_consumer = foo_bar_producer.consume();
858		foo_bar_consumer.assert_next("baz", &broadcast.consume());
859	}
860
861	#[tokio::test]
862	async fn test_publish_only_allows() {
863		let origin = Origin::produce();
864		let broadcast = Broadcast::produce();
865
866		// Create a producer that can only publish to "allowed" paths
867		let limited_producer = origin
868			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
869			.expect("should create limited producer");
870
871		// Should be able to publish to allowed paths
872		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
873		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
874		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
875
876		// Should not be able to publish to disallowed paths
877		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
878		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
879		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
880	}
881
882	#[tokio::test]
883	async fn test_publish_only_empty() {
884		let origin = Origin::produce();
885
886		// Creating a producer with no allowed paths should return None
887		assert!(origin.publish_only(&[]).is_none());
888	}
889
890	#[tokio::test]
891	async fn test_consume_only_filters() {
892		let origin = Origin::produce();
893		let broadcast1 = Broadcast::produce();
894		let broadcast2 = Broadcast::produce();
895		let broadcast3 = Broadcast::produce();
896
897		let mut consumer = origin.consume();
898
899		// Publish to different paths
900		origin.publish_broadcast("allowed", broadcast1.consume());
901		origin.publish_broadcast("allowed/nested", broadcast2.consume());
902		origin.publish_broadcast("notallowed", broadcast3.consume());
903
904		// Create a consumer that only sees "allowed" paths
905		let mut limited_consumer = origin
906			.consume_only(&["allowed".into()])
907			.expect("should create limited consumer");
908
909		// Should only receive broadcasts under "allowed"
910		limited_consumer.assert_next("allowed", &broadcast1.consume());
911		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
912		limited_consumer.assert_next_wait(); // Should not see "notallowed"
913
914		// Unscoped consumer should see all
915		consumer.assert_next("allowed", &broadcast1.consume());
916		consumer.assert_next("allowed/nested", &broadcast2.consume());
917		consumer.assert_next("notallowed", &broadcast3.consume());
918	}
919
920	#[tokio::test]
921	async fn test_consume_only_multiple_prefixes() {
922		let origin = Origin::produce();
923		let broadcast1 = Broadcast::produce();
924		let broadcast2 = Broadcast::produce();
925		let broadcast3 = Broadcast::produce();
926
927		origin.publish_broadcast("foo/test", broadcast1.consume());
928		origin.publish_broadcast("bar/test", broadcast2.consume());
929		origin.publish_broadcast("baz/test", broadcast3.consume());
930
931		// Consumer that only sees "foo" and "bar" paths
932		let mut limited_consumer = origin
933			.consume_only(&["foo".into(), "bar".into()])
934			.expect("should create limited consumer");
935
936		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
937		limited_consumer.assert_next("bar/test", &broadcast2.consume());
938		limited_consumer.assert_next("foo/test", &broadcast1.consume());
939		limited_consumer.assert_next_wait(); // Should not see "baz/test"
940	}
941
942	#[tokio::test]
943	async fn test_with_root_and_publish_only() {
944		let origin = Origin::produce();
945		let broadcast = Broadcast::produce();
946
947		// User connects to /foo root
948		let foo_producer = origin.with_root("foo").expect("should create foo root");
949
950		// Limit them to publish only to "bar" and "goop/pee" within /foo
951		let limited_producer = foo_producer
952			.publish_only(&["bar".into(), "goop/pee".into()])
953			.expect("should create limited producer");
954
955		let mut consumer = origin.consume();
956
957		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
958		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
959		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
960		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
961		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
962
963		// Should not be able to publish outside allowed paths
964		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
965		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
966		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
967
968		// Original consumer sees full paths
969		consumer.assert_next("foo/bar", &broadcast.consume());
970		consumer.assert_next("foo/bar/nested", &broadcast.consume());
971		consumer.assert_next("foo/goop/pee", &broadcast.consume());
972		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
973	}
974
975	#[tokio::test]
976	async fn test_with_root_and_consume_only() {
977		let origin = Origin::produce();
978		let broadcast1 = Broadcast::produce();
979		let broadcast2 = Broadcast::produce();
980		let broadcast3 = Broadcast::produce();
981
982		// Publish broadcasts
983		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
984		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
985		origin.publish_broadcast("foo/other/test", broadcast3.consume());
986
987		// User connects to /foo root
988		let foo_producer = origin.with_root("foo").expect("should create foo root");
989
990		// Create consumer limited to "bar" and "goop/pee" within /foo
991		let mut limited_consumer = foo_producer
992			.consume_only(&["bar".into(), "goop/pee".into()])
993			.expect("should create limited consumer");
994
995		// Should only see allowed paths (without foo prefix)
996		limited_consumer.assert_next("bar/test", &broadcast1.consume());
997		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
998		limited_consumer.assert_next_wait(); // Should not see "other/test"
999	}
1000
1001	#[tokio::test]
1002	async fn test_with_root_unauthorized() {
1003		let origin = Origin::produce();
1004
1005		// First limit the producer to specific paths
1006		let limited_producer = origin
1007			.publish_only(&["allowed".into()])
1008			.expect("should create limited producer");
1009
1010		// Trying to create a root outside allowed paths should fail
1011		assert!(limited_producer.with_root("notallowed").is_none());
1012
1013		// But creating a root within allowed paths should work
1014		let allowed_root = limited_producer
1015			.with_root("allowed")
1016			.expect("should create allowed root");
1017		assert_eq!(allowed_root.root().as_str(), "allowed");
1018	}
1019
1020	#[tokio::test]
1021	async fn test_wildcard_permission() {
1022		let origin = Origin::produce();
1023		let broadcast = Broadcast::produce();
1024
1025		// Producer with root access (empty string means wildcard)
1026		let root_producer = origin.clone();
1027
1028		// Should be able to publish anywhere
1029		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1030		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1031
1032		// Can create any root
1033		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1034		assert_eq!(foo_producer.root().as_str(), "foo");
1035	}
1036
1037	#[tokio::test]
1038	async fn test_consume_broadcast_with_permissions() {
1039		let origin = Origin::produce();
1040		let broadcast1 = Broadcast::produce();
1041		let broadcast2 = Broadcast::produce();
1042
1043		origin.publish_broadcast("allowed/test", broadcast1.consume());
1044		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1045
1046		// Create limited consumer
1047		let limited_consumer = origin
1048			.consume_only(&["allowed".into()])
1049			.expect("should create limited consumer");
1050
1051		// Should be able to get allowed broadcast
1052		let result = limited_consumer.consume_broadcast("allowed/test");
1053		assert!(result.is_some());
1054		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1055
1056		// Should not be able to get disallowed broadcast
1057		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1058
1059		// Original consumer can get both
1060		let consumer = origin.consume();
1061		assert!(consumer.consume_broadcast("allowed/test").is_some());
1062		assert!(consumer.consume_broadcast("notallowed/test").is_some());
1063	}
1064
1065	#[tokio::test]
1066	async fn test_nested_paths_with_permissions() {
1067		let origin = Origin::produce();
1068		let broadcast = Broadcast::produce();
1069
1070		// Create producer limited to "a/b/c"
1071		let limited_producer = origin
1072			.publish_only(&["a/b/c".into()])
1073			.expect("should create limited producer");
1074
1075		// Should be able to publish to exact path and nested paths
1076		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1077		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1078		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1079
1080		// Should not be able to publish to parent or sibling paths
1081		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1082		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1083		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1084	}
1085
1086	#[tokio::test]
1087	async fn test_multiple_consumers_with_different_permissions() {
1088		let origin = Origin::produce();
1089		let broadcast1 = Broadcast::produce();
1090		let broadcast2 = Broadcast::produce();
1091		let broadcast3 = Broadcast::produce();
1092
1093		// Publish to different paths
1094		origin.publish_broadcast("foo/test", broadcast1.consume());
1095		origin.publish_broadcast("bar/test", broadcast2.consume());
1096		origin.publish_broadcast("baz/test", broadcast3.consume());
1097
1098		// Create consumers with different permissions
1099		let mut foo_consumer = origin
1100			.consume_only(&["foo".into()])
1101			.expect("should create foo consumer");
1102
1103		let mut bar_consumer = origin
1104			.consume_only(&["bar".into()])
1105			.expect("should create bar consumer");
1106
1107		let mut foobar_consumer = origin
1108			.consume_only(&["foo".into(), "bar".into()])
1109			.expect("should create foobar consumer");
1110
1111		// Each consumer should only see their allowed paths
1112		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1113		foo_consumer.assert_next_wait();
1114
1115		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1116		bar_consumer.assert_next_wait();
1117
1118		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1119		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1120		foobar_consumer.assert_next_wait();
1121	}
1122
1123	#[tokio::test]
1124	async fn test_select_with_empty_prefix() {
1125		let origin = Origin::produce();
1126		let broadcast1 = Broadcast::produce();
1127		let broadcast2 = Broadcast::produce();
1128
1129		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1130		let demo_producer = origin.with_root("demo").expect("should create demo root");
1131		let limited_producer = demo_producer
1132			.publish_only(&["worm-node".into(), "foobar".into()])
1133			.expect("should create limited producer");
1134
1135		// Publish some broadcasts
1136		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1137		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1138
1139		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1140		let mut consumer = limited_producer
1141			.consume_only(&["".into()])
1142			.expect("should create consumer with empty prefix");
1143
1144		// Should see both broadcasts (order depends on PathPrefixes sort)
1145		let a1 = consumer.try_announced().expect("expected first announcement");
1146		let a2 = consumer.try_announced().expect("expected second announcement");
1147		consumer.assert_next_wait();
1148
1149		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1150		paths.sort();
1151		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1152	}
1153
1154	#[tokio::test]
1155	async fn test_select_narrowing_scope() {
1156		let origin = Origin::produce();
1157		let broadcast1 = Broadcast::produce();
1158		let broadcast2 = Broadcast::produce();
1159		let broadcast3 = Broadcast::produce();
1160
1161		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1162		let demo_producer = origin.with_root("demo").expect("should create demo root");
1163		let limited_producer = demo_producer
1164			.publish_only(&["worm-node".into(), "foobar".into()])
1165			.expect("should create limited producer");
1166
1167		// Publish broadcasts at different levels
1168		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1169		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1170		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1171
1172		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1173		let mut worm_consumer = limited_producer
1174			.consume_only(&["worm-node".into()])
1175			.expect("should create worm-node consumer");
1176
1177		// Should see worm-node content with paths stripped to ""
1178		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1179		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1180		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1181
1182		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1183		let mut foo_consumer = limited_producer
1184			.consume_only(&["worm-node/foo".into()])
1185			.expect("should create worm-node/foo consumer");
1186
1187		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1188		foo_consumer.assert_next_wait(); // Should NOT see other content
1189	}
1190
1191	#[tokio::test]
1192	async fn test_select_multiple_roots_with_empty_prefix() {
1193		let origin = Origin::produce();
1194		let broadcast1 = Broadcast::produce();
1195		let broadcast2 = Broadcast::produce();
1196		let broadcast3 = Broadcast::produce();
1197
1198		// Producer with multiple allowed roots
1199		let limited_producer = origin
1200			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1201			.expect("should create limited producer");
1202
1203		// Publish to each root
1204		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1205		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1206		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1207
1208		// consume_only with empty prefix should maintain all roots
1209		let mut consumer = limited_producer
1210			.consume_only(&["".into()])
1211			.expect("should create consumer with empty prefix");
1212
1213		// Should see all broadcasts from all roots
1214		consumer.assert_next("app1/data", &broadcast1.consume());
1215		consumer.assert_next("app2/config", &broadcast2.consume());
1216		consumer.assert_next("shared/resource", &broadcast3.consume());
1217		consumer.assert_next_wait();
1218	}
1219
1220	#[tokio::test]
1221	async fn test_publish_only_with_empty_prefix() {
1222		let origin = Origin::produce();
1223		let broadcast = Broadcast::produce();
1224
1225		// Producer with specific allowed paths
1226		let limited_producer = origin
1227			.publish_only(&["services/api".into(), "services/web".into()])
1228			.expect("should create limited producer");
1229
1230		// publish_only with empty prefix should keep the same restrictions
1231		let same_producer = limited_producer
1232			.publish_only(&["".into()])
1233			.expect("should create producer with empty prefix");
1234
1235		// Should still have the same publishing restrictions
1236		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1237		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1238		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1239		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1240	}
1241
1242	#[tokio::test]
1243	async fn test_select_narrowing_to_deeper_path() {
1244		let origin = Origin::produce();
1245		let broadcast1 = Broadcast::produce();
1246		let broadcast2 = Broadcast::produce();
1247		let broadcast3 = Broadcast::produce();
1248
1249		// Producer with broad permission
1250		let limited_producer = origin
1251			.publish_only(&["org".into()])
1252			.expect("should create limited producer");
1253
1254		// Publish at various depths
1255		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1256		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1257		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1258
1259		// Narrow down to team2 only
1260		let mut team2_consumer = limited_producer
1261			.consume_only(&["org/team2".into()])
1262			.expect("should create team2 consumer");
1263
1264		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1265		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1266
1267		// Further narrow down to team1/project1
1268		let mut project1_consumer = limited_producer
1269			.consume_only(&["org/team1/project1".into()])
1270			.expect("should create project1 consumer");
1271
1272		// Should only see project1 content at root
1273		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1274		project1_consumer.assert_next_wait();
1275	}
1276
1277	#[tokio::test]
1278	async fn test_select_with_non_matching_prefix() {
1279		let origin = Origin::produce();
1280
1281		// Producer with specific allowed paths
1282		let limited_producer = origin
1283			.publish_only(&["allowed/path".into()])
1284			.expect("should create limited producer");
1285
1286		// Trying to consume_only with a completely different prefix should return None
1287		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1288
1289		// Similarly for publish_only
1290		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1291	}
1292
1293	// Regression test for https://github.com/moq-dev/moq/issues/910
1294	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1295	#[tokio::test]
1296	async fn test_with_root_trailing_slash_consumer() {
1297		let origin = Origin::produce();
1298
1299		// Use an owned String so the trailing slash is NOT normalized away.
1300		let prefix = "some_prefix/".to_string();
1301		let mut consumer = origin.consume().with_root(prefix).unwrap();
1302
1303		let b = origin.create_broadcast("some_prefix/test").unwrap();
1304		consumer.assert_next("test", &b.consume());
1305	}
1306
1307	// Same issue but for the producer side of with_root
1308	#[tokio::test]
1309	async fn test_with_root_trailing_slash_producer() {
1310		let origin = Origin::produce();
1311
1312		// Use an owned String so the trailing slash is NOT normalized away.
1313		let prefix = "some_prefix/".to_string();
1314		let rooted = origin.with_root(prefix).unwrap();
1315
1316		let b = rooted.create_broadcast("test").unwrap();
1317
1318		let mut consumer = rooted.consume();
1319		consumer.assert_next("test", &b.consume());
1320	}
1321
1322	// Verify unannounce also doesn't panic with trailing slash
1323	#[tokio::test]
1324	async fn test_with_root_trailing_slash_unannounce() {
1325		let origin = Origin::produce();
1326
1327		let prefix = "some_prefix/".to_string();
1328		let mut consumer = origin.consume().with_root(prefix).unwrap();
1329
1330		let b = origin.create_broadcast("some_prefix/test").unwrap();
1331		consumer.assert_next("test", &b.consume());
1332
1333		// Drop the broadcast producer to trigger unannounce
1334		drop(b);
1335		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1336
1337		// unannounce also calls strip_prefix(&self.root).unwrap()
1338		consumer.assert_next_none("test");
1339	}
1340
1341	#[tokio::test]
1342	async fn test_select_maintains_access_with_wider_prefix() {
1343		let origin = Origin::produce();
1344		let broadcast1 = Broadcast::produce();
1345		let broadcast2 = Broadcast::produce();
1346
1347		// Setup: user with root "demo" allowed to subscribe to specific paths
1348		let demo_producer = origin.with_root("demo").expect("should create demo root");
1349		let user_producer = demo_producer
1350			.publish_only(&["worm-node".into(), "foobar".into()])
1351			.expect("should create user producer");
1352
1353		// Publish some data
1354		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1355		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1356
1357		// Key test: consume_only with "" should maintain access to allowed roots
1358		let mut consumer = user_producer
1359			.consume_only(&["".into()])
1360			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1361
1362		// Should still receive broadcasts from allowed paths (order not guaranteed)
1363		let a1 = consumer.try_announced().expect("expected first announcement");
1364		let a2 = consumer.try_announced().expect("expected second announcement");
1365		consumer.assert_next_wait();
1366
1367		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1368		paths.sort();
1369		assert_eq!(paths, ["foobar", "worm-node/data"]);
1370
1371		// Also test that we can still narrow the scope
1372		let mut narrow_consumer = user_producer
1373			.consume_only(&["worm-node".into()])
1374			.expect("should be able to narrow scope to worm-node");
1375
1376		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1377		narrow_consumer.assert_next_wait(); // Should not see foobar
1378	}
1379
1380	#[tokio::test]
1381	async fn test_duplicate_prefixes_deduped() {
1382		let origin = Origin::produce();
1383		let broadcast = Broadcast::produce();
1384
1385		// publish_only with duplicate prefixes should work (deduped internally)
1386		let producer = origin
1387			.publish_only(&["demo".into(), "demo".into()])
1388			.expect("should create producer");
1389
1390		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1391
1392		let mut consumer = producer.consume();
1393		consumer.assert_next("demo/stream", &broadcast.consume());
1394		consumer.assert_next_wait();
1395	}
1396
1397	#[tokio::test]
1398	async fn test_overlapping_prefixes_deduped() {
1399		let origin = Origin::produce();
1400		let broadcast = Broadcast::produce();
1401
1402		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1403		let producer = origin
1404			.publish_only(&["demo".into(), "demo/foo".into()])
1405			.expect("should create producer");
1406
1407		// Can still publish under "demo/bar" since "demo" covers everything
1408		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1409
1410		let mut consumer = producer.consume();
1411		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1412		consumer.assert_next_wait();
1413	}
1414
1415	#[tokio::test]
1416	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1417		let origin = Origin::produce();
1418		let broadcast = Broadcast::produce();
1419
1420		// Both "demo" and "demo/foo" are requested — should only have one node
1421		let producer = origin
1422			.publish_only(&["demo".into(), "demo/foo".into()])
1423			.expect("should create producer");
1424
1425		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1426
1427		let mut consumer = producer.consume();
1428		// Should only get ONE announcement (not two from overlapping nodes)
1429		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1430		consumer.assert_next_wait();
1431	}
1432
1433	#[tokio::test]
1434	async fn test_allowed_returns_deduped_prefixes() {
1435		let origin = Origin::produce();
1436
1437		let producer = origin
1438			.publish_only(&["demo".into(), "demo/foo".into(), "anon".into()])
1439			.expect("should create producer");
1440
1441		let allowed: Vec<_> = producer.allowed().collect();
1442		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1443	}
1444}