Skip to main content

moq_lite/model/
origin.rs

1use std::{
2	collections::{HashMap, VecDeque},
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: VecDeque<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125	}
126
127	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128		match self.nested.get(dir) {
129			Some(next) => next.clone(),
130			None => {
131				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132				self.nested.insert(dir.to_string(), next.clone());
133				next
134			}
135		}
136	}
137
138	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139		let full = full.as_path();
140		let rest = relative.as_path();
141
142		// If the path has a directory component, then publish it to the nested node.
143		if let Some((dir, relative)) = rest.next_part() {
144			// Not using entry to avoid allocating a string most of the time.
145			self.entry(dir).lock().publish(&full, broadcast, &relative);
146		} else if let Some(existing) = &mut self.broadcast {
147			// This node is a leaf with an existing broadcast.
148			// Keep the older broadcast active; queue the new one as a backup.
149			// This avoids reannouncing and potentially disrupting subscribers.
150			existing.backup.push_back(broadcast.clone());
151		} else {
152			// This node is a leaf with no existing broadcast.
153			self.broadcast = Some(OriginBroadcast {
154				path: full.to_owned(),
155				active: broadcast.clone(),
156				backup: VecDeque::new(),
157			});
158			self.notify.lock().announce(full, broadcast);
159		}
160	}
161
162	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
163		self.consume_initial(&mut notify);
164		self.notify.lock().consumers.insert(id, notify);
165	}
166
167	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
168		if let Some(broadcast) = &self.broadcast {
169			notify.announce(&broadcast.path, broadcast.active.clone());
170		}
171
172		// Recursively subscribe to all nested nodes.
173		for nested in self.nested.values() {
174			nested.lock().consume_initial(notify);
175		}
176	}
177
178	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
179		let rest = rest.as_path();
180
181		if let Some((dir, rest)) = rest.next_part() {
182			let node = self.nested.get(dir)?.lock();
183			node.consume_broadcast(&rest)
184		} else {
185			self.broadcast.as_ref().map(|b| b.active.clone())
186		}
187	}
188
189	fn unconsume(&mut self, id: ConsumerId) {
190		self.notify.lock().consumers.remove(&id).expect("consumer not found");
191		if self.is_empty() {
192			//tracing::warn!("TODO: empty node; memory leak");
193			// This happens when consuming a path that is not being broadcasted.
194		}
195	}
196
197	// Returns true if the broadcast should be unannounced.
198	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
199		let full = full.as_path();
200		let relative = relative.as_path();
201
202		if let Some((dir, relative)) = relative.next_part() {
203			let nested = self.entry(dir);
204			let mut locked = nested.lock();
205			locked.remove(&full, broadcast, &relative);
206
207			if locked.is_empty() {
208				drop(locked);
209				self.nested.remove(dir);
210			}
211		} else {
212			let entry = match &mut self.broadcast {
213				Some(existing) => existing,
214				None => return,
215			};
216
217			// See if we can remove the broadcast from the backup list.
218			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
219			if let Some(pos) = pos {
220				entry.backup.remove(pos);
221				// Nothing else to do
222				return;
223			}
224
225			// Okay so it must be the active broadcast or else we fucked up.
226			assert!(entry.active.is_clone(&broadcast));
227
228			// If there's a backup broadcast, promote the oldest one.
229			if let Some(next) = entry.backup.pop_front() {
230				entry.active = next;
231				self.notify.lock().reannounce(full, &entry.active);
232			} else {
233				// No more backups, so remove the entry.
234				self.broadcast = None;
235				self.notify.lock().unannounce(full);
236			}
237		}
238	}
239
240	fn is_empty(&self) -> bool {
241		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
242	}
243}
244
245#[derive(Clone)]
246struct OriginNodes {
247	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
248}
249
250impl OriginNodes {
251	// Returns nested roots that match the prefixes.
252	// PathPrefixes guarantees no duplicates or overlapping prefixes.
253	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
254		let mut roots = Vec::new();
255
256		for (root, state) in &self.nodes {
257			for prefix in prefixes {
258				if root.has_prefix(prefix) {
259					// Keep the existing node if we're allowed to access it.
260					roots.push((root.to_owned(), state.clone()));
261					continue;
262				}
263
264				if let Some(suffix) = prefix.strip_prefix(root) {
265					// If the requested prefix is larger than the allowed prefix, then we further scope it.
266					let nested = state.lock().leaf(&suffix);
267					roots.push((prefix.to_owned(), nested));
268				}
269			}
270		}
271
272		if roots.is_empty() {
273			None
274		} else {
275			Some(Self { nodes: roots })
276		}
277	}
278
279	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
280		let new_root = new_root.as_path();
281		let mut roots = Vec::new();
282
283		if new_root.is_empty() {
284			return Some(self.clone());
285		}
286
287		for (root, state) in &self.nodes {
288			if let Some(suffix) = root.strip_prefix(&new_root) {
289				// If the old root is longer than the new root, shorten the keys.
290				roots.push((suffix.to_owned(), state.clone()));
291			} else if let Some(suffix) = new_root.strip_prefix(root) {
292				// If the new root is longer than the old root, add a new root.
293				// NOTE: suffix can't be empty
294				let nested = state.lock().leaf(&suffix);
295				roots.push(("".into(), nested));
296			}
297		}
298
299		if roots.is_empty() {
300			None
301		} else {
302			Some(Self { nodes: roots })
303		}
304	}
305
306	// Returns the root that has this prefix.
307	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
308		let path = path.as_path();
309
310		for (root, state) in &self.nodes {
311			if let Some(suffix) = path.strip_prefix(root) {
312				return Some((state.clone(), suffix.to_owned()));
313			}
314		}
315
316		None
317	}
318}
319
320impl Default for OriginNodes {
321	fn default() -> Self {
322		Self {
323			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
324		}
325	}
326}
327
328/// A broadcast path and its associated consumer, or None if closed.
329pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
330
331/// A collection of broadcasts that can be published and subscribed to.
332pub struct Origin {}
333
334impl Origin {
335	pub fn produce() -> OriginProducer {
336		OriginProducer::new()
337	}
338}
339
340/// Announces broadcasts to consumers over the network.
341#[derive(Clone, Default)]
342pub struct OriginProducer {
343	// The roots of the tree that we are allowed to publish.
344	// A path of "" means we can publish anything.
345	nodes: OriginNodes,
346
347	/// The prefix that is automatically stripped from all paths.
348	root: PathOwned,
349}
350
351impl OriginProducer {
352	pub fn new() -> Self {
353		Self::default()
354	}
355
356	/// Create and publish a new broadcast, returning the producer.
357	///
358	/// This is a helper method when you only want to publish a broadcast to a single origin.
359	/// Returns [None] if the broadcast is not allowed to be published.
360	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
361		let broadcast = Broadcast::produce();
362		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
363	}
364
365	/// Publish a broadcast, announcing it to all consumers.
366	///
367	/// The broadcast will be unannounced when it is closed.
368	/// If there is already a broadcast with the same path, then the older broadcast remains active
369	/// and the new one is queued as a backup (no reannounce is triggered).
370	/// When the active broadcast closes, the oldest queued backup is promoted and reannounced.
371	/// A queued backup that closes before it is promoted is silently dropped with no announcement.
372	///
373	/// Returns false if the broadcast is not allowed to be published.
374	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
375		let path = path.as_path();
376
377		let (root, rest) = match self.nodes.get(&path) {
378			Some(root) => root,
379			None => return false,
380		};
381
382		let full = self.root.join(&path);
383
384		root.lock().publish(&full, &broadcast, &rest);
385		let root = root.clone();
386
387		web_async::spawn(async move {
388			broadcast.closed().await;
389			root.lock().remove(&full, broadcast, &rest);
390		});
391
392		true
393	}
394
395	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
396	///
397	/// Returns None if there are no legal prefixes.
398	// TODO accept PathPrefixes instead of &[Path]
399	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
400		let prefixes = PathPrefixes::new(prefixes);
401		Some(OriginProducer {
402			nodes: self.nodes.select(&prefixes)?,
403			root: self.root.clone(),
404		})
405	}
406
407	/// Subscribe to all announced broadcasts.
408	pub fn consume(&self) -> OriginConsumer {
409		OriginConsumer::new(self.root.clone(), self.nodes.clone())
410	}
411
412	/// Subscribe to all announced broadcasts matching the prefix.
413	///
414	/// Returns None if there are no legal prefixes.
415	// TODO accept PathPrefixes instead of &[Path]
416	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
417		let prefixes = PathPrefixes::new(prefixes);
418		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
419	}
420
421	/// Subscribe to a specific broadcast.
422	///
423	/// Returns None if the broadcast is not found.
424	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
425		let path = path.as_path();
426		let (root, rest) = self.nodes.get(&path)?;
427		let state = root.lock();
428		state.consume_broadcast(&rest)
429	}
430
431	/// Returns a new OriginProducer that automatically strips out the provided prefix.
432	///
433	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
434	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
435		let prefix = prefix.as_path();
436
437		Some(Self {
438			root: self.root.join(&prefix).to_owned(),
439			nodes: self.nodes.root(&prefix)?,
440		})
441	}
442
443	/// Returns the root that is automatically stripped from all paths.
444	pub fn root(&self) -> &Path<'_> {
445		&self.root
446	}
447
448	// TODO return PathPrefixes
449	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
450		self.nodes.nodes.iter().map(|(root, _)| root)
451	}
452
453	/// Converts a relative path to an absolute path.
454	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
455		self.root.join(path)
456	}
457}
458
459/// Consumes announced broadcasts matching against an optional prefix.
460///
461/// NOTE: Clone is expensive, try to avoid it.
462pub struct OriginConsumer {
463	id: ConsumerId,
464	nodes: OriginNodes,
465	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
466
467	/// A prefix that is automatically stripped from all paths.
468	root: PathOwned,
469}
470
471impl OriginConsumer {
472	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
473		let (tx, rx) = mpsc::unbounded_channel();
474
475		let id = ConsumerId::new();
476
477		for (_, state) in &nodes.nodes {
478			let notify = OriginConsumerNotify {
479				root: root.clone(),
480				tx: tx.clone(),
481			};
482			state.lock().consume(id, notify);
483		}
484
485		Self {
486			id,
487			nodes,
488			updates: rx,
489			root,
490		}
491	}
492
493	/// Returns the next (un)announced broadcast and the absolute path.
494	///
495	/// The broadcast will only be announced if it was previously unannounced.
496	/// The same path won't be announced/unannounced twice, instead it will toggle.
497	/// Returns None if the consumer is closed.
498	///
499	/// Note: The returned path is absolute and will always match this consumer's prefix.
500	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
501		self.updates.recv().await
502	}
503
504	/// Returns the next (un)announced broadcast and the absolute path without blocking.
505	///
506	/// Returns None if there is no update available; NOT because the consumer is closed.
507	/// You have to use `is_closed` to check if the consumer is closed.
508	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
509		self.updates.try_recv().ok()
510	}
511
512	pub fn consume(&self) -> Self {
513		self.clone()
514	}
515
516	/// Get a specific broadcast by path.
517	///
518	/// TODO This should include announcement support.
519	///
520	/// Returns None if the path hasn't been announced yet.
521	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
522		let path = path.as_path();
523		let (root, rest) = self.nodes.get(&path)?;
524		let state = root.lock();
525		state.consume_broadcast(&rest)
526	}
527
528	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
529	///
530	/// Returns None if there are no legal prefixes (would always return None).
531	// TODO accept PathPrefixes instead of &[Path]
532	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
533		let prefixes = PathPrefixes::new(prefixes);
534		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(&prefixes)?))
535	}
536
537	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
538	///
539	/// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard.
540	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
541		let prefix = prefix.as_path();
542
543		Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
544	}
545
546	/// Returns the prefix that is automatically stripped from all paths.
547	pub fn root(&self) -> &Path<'_> {
548		&self.root
549	}
550
551	// TODO return PathPrefixes
552	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
553		self.nodes.nodes.iter().map(|(root, _)| root)
554	}
555
556	/// Converts a relative path to an absolute path.
557	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
558		self.root.join(path)
559	}
560}
561
562impl Drop for OriginConsumer {
563	fn drop(&mut self) {
564		for (_, root) in &self.nodes.nodes {
565			root.lock().unconsume(self.id);
566		}
567	}
568}
569
570impl Clone for OriginConsumer {
571	fn clone(&self) -> Self {
572		OriginConsumer::new(self.root.clone(), self.nodes.clone())
573	}
574}
575
576#[cfg(test)]
577use futures::FutureExt;
578
579#[cfg(test)]
580impl OriginConsumer {
581	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
582		let expected = expected.as_path();
583		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
584		assert_eq!(path, expected, "wrong path");
585		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
586	}
587
588	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
589		let expected = expected.as_path();
590		let (path, active) = self.try_announced().expect("no next");
591		assert_eq!(path, expected, "wrong path");
592		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
593	}
594
595	pub fn assert_next_none(&mut self, expected: impl AsPath) {
596		let expected = expected.as_path();
597		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
598		assert_eq!(path, expected, "wrong path");
599		assert!(active.is_none(), "should be unannounced");
600	}
601
602	pub fn assert_next_wait(&mut self) {
603		if let Some(res) = self.announced().now_or_never() {
604			panic!("next should block: got {:?}", res.map(|(path, _)| path));
605		}
606	}
607
608	/*
609	pub fn assert_next_closed(&mut self) {
610		assert!(
611			self.announced().now_or_never().expect("next blocked").is_none(),
612			"next should be closed"
613		);
614	}
615	*/
616}
617
618#[cfg(test)]
619mod tests {
620	use crate::Broadcast;
621
622	use super::*;
623
624	#[tokio::test]
625	async fn test_announce() {
626		tokio::time::pause();
627
628		let origin = Origin::produce();
629		let broadcast1 = Broadcast::produce();
630		let broadcast2 = Broadcast::produce();
631
632		let mut consumer1 = origin.consume();
633		// Make a new consumer that should get it.
634		consumer1.assert_next_wait();
635
636		// Publish the first broadcast.
637		origin.publish_broadcast("test1", broadcast1.consume());
638
639		consumer1.assert_next("test1", &broadcast1.consume());
640		consumer1.assert_next_wait();
641
642		// Make a new consumer that should get the existing broadcast.
643		// But we don't consume it yet.
644		let mut consumer2 = origin.consume();
645
646		// Publish the second broadcast.
647		origin.publish_broadcast("test2", broadcast2.consume());
648
649		consumer1.assert_next("test2", &broadcast2.consume());
650		consumer1.assert_next_wait();
651
652		consumer2.assert_next("test1", &broadcast1.consume());
653		consumer2.assert_next("test2", &broadcast2.consume());
654		consumer2.assert_next_wait();
655
656		// Close the first broadcast.
657		drop(broadcast1);
658
659		// Wait for the async task to run.
660		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
661
662		// All consumers should get a None now.
663		consumer1.assert_next_none("test1");
664		consumer2.assert_next_none("test1");
665		consumer1.assert_next_wait();
666		consumer2.assert_next_wait();
667
668		// And a new consumer only gets the last broadcast.
669		let mut consumer3 = origin.consume();
670		consumer3.assert_next("test2", &broadcast2.consume());
671		consumer3.assert_next_wait();
672
673		// Close the other producer and make sure it cleans up
674		drop(broadcast2);
675
676		// Wait for the async task to run.
677		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
678
679		consumer1.assert_next_none("test2");
680		consumer2.assert_next_none("test2");
681		consumer3.assert_next_none("test2");
682
683		/* TODO close the origin consumer when the producer is dropped
684		consumer1.assert_next_closed();
685		consumer2.assert_next_closed();
686		consumer3.assert_next_closed();
687		*/
688	}
689
690	#[tokio::test]
691	async fn test_duplicate() {
692		tokio::time::pause();
693
694		let origin = Origin::produce();
695
696		let broadcast1 = Broadcast::produce();
697		let broadcast2 = Broadcast::produce();
698		let broadcast3 = Broadcast::produce();
699
700		let consumer1 = broadcast1.consume();
701		let consumer2 = broadcast2.consume();
702		let consumer3 = broadcast3.consume();
703
704		let mut consumer = origin.consume();
705
706		origin.publish_broadcast("test", consumer1.clone());
707		origin.publish_broadcast("test", consumer2.clone());
708		origin.publish_broadcast("test", consumer3.clone());
709		assert!(consumer.consume_broadcast("test").is_some());
710
711		// Only the oldest broadcast is announced; later publishes go to the backup queue.
712		consumer.assert_next("test", &consumer1);
713		consumer.assert_next_wait();
714
715		// Drop a backup, nothing should change.
716		drop(broadcast2);
717
718		// Wait for the async task to run.
719		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
720
721		assert!(consumer.consume_broadcast("test").is_some());
722		consumer.assert_next_wait();
723
724		// Drop the active, we should reannounce with the oldest remaining backup.
725		drop(broadcast1);
726
727		// Wait for the async task to run.
728		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
729
730		assert!(consumer.consume_broadcast("test").is_some());
731		consumer.assert_next_none("test");
732		consumer.assert_next("test", &consumer3);
733
734		// Drop the final broadcast, we should unannounce.
735		drop(broadcast3);
736
737		// Wait for the async task to run.
738		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
739		assert!(consumer.consume_broadcast("test").is_none());
740
741		consumer.assert_next_none("test");
742		consumer.assert_next_wait();
743	}
744
745	#[tokio::test]
746	async fn test_duplicate_fifo_order() {
747		tokio::time::pause();
748
749		let origin = Origin::produce();
750
751		let broadcast1 = Broadcast::produce();
752		let broadcast2 = Broadcast::produce();
753		let broadcast3 = Broadcast::produce();
754
755		let consumer1 = broadcast1.consume();
756		let consumer2 = broadcast2.consume();
757		let consumer3 = broadcast3.consume();
758
759		let mut consumer = origin.consume();
760
761		origin.publish_broadcast("test", consumer1.clone());
762		origin.publish_broadcast("test", consumer2.clone());
763		origin.publish_broadcast("test", consumer3.clone());
764
765		// The oldest broadcast is active; the rest are queued in publish order.
766		consumer.assert_next("test", &consumer1);
767		consumer.assert_next_wait();
768
769		// Drop the active; the next-oldest (not the newest) should be promoted.
770		drop(broadcast1);
771		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
772		consumer.assert_next_none("test");
773		consumer.assert_next("test", &consumer2);
774		consumer.assert_next_wait();
775
776		// Drop the now-active; the remaining backup is promoted.
777		drop(broadcast2);
778		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
779		consumer.assert_next_none("test");
780		consumer.assert_next("test", &consumer3);
781		consumer.assert_next_wait();
782
783		// Drop the last broadcast; the entry is fully unannounced.
784		drop(broadcast3);
785		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
786		consumer.assert_next_none("test");
787		consumer.assert_next_wait();
788	}
789
790	#[tokio::test]
791	async fn test_duplicate_reverse() {
792		tokio::time::pause();
793
794		let origin = Origin::produce();
795		let broadcast1 = Broadcast::produce();
796		let broadcast2 = Broadcast::produce();
797
798		origin.publish_broadcast("test", broadcast1.consume());
799		origin.publish_broadcast("test", broadcast2.consume());
800		assert!(origin.consume_broadcast("test").is_some());
801
802		// This is harder, dropping the new broadcast first.
803		drop(broadcast2);
804
805		// Wait for the cleanup async task to run.
806		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
807		assert!(origin.consume_broadcast("test").is_some());
808
809		drop(broadcast1);
810
811		// Wait for the cleanup async task to run.
812		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
813		assert!(origin.consume_broadcast("test").is_none());
814	}
815
816	#[tokio::test]
817	async fn test_double_publish() {
818		tokio::time::pause();
819
820		let origin = Origin::produce();
821		let broadcast = Broadcast::produce();
822
823		// Ensure it doesn't crash.
824		origin.publish_broadcast("test", broadcast.consume());
825		origin.publish_broadcast("test", broadcast.consume());
826
827		assert!(origin.consume_broadcast("test").is_some());
828
829		drop(broadcast);
830
831		// Wait for the async task to run.
832		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
833		assert!(origin.consume_broadcast("test").is_none());
834	}
835	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
836	#[tokio::test]
837	#[should_panic]
838	async fn test_128() {
839		let origin = Origin::produce();
840		let broadcast = Broadcast::produce();
841
842		let mut consumer = origin.consume();
843		for i in 0..256 {
844			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
845		}
846
847		for i in 0..256 {
848			consumer.assert_next(format!("test{i}"), &broadcast.consume());
849		}
850	}
851
852	#[tokio::test]
853	async fn test_128_fix() {
854		let origin = Origin::produce();
855		let broadcast = Broadcast::produce();
856
857		let mut consumer = origin.consume();
858		for i in 0..256 {
859			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
860		}
861
862		for i in 0..256 {
863			// try_next does not have the same issue because it's synchronous.
864			consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
865		}
866	}
867
868	#[tokio::test]
869	async fn test_with_root_basic() {
870		let origin = Origin::produce();
871		let broadcast = Broadcast::produce();
872
873		// Create a producer with root "/foo"
874		let foo_producer = origin.with_root("foo").expect("should create root");
875		assert_eq!(foo_producer.root().as_str(), "foo");
876
877		let mut consumer = origin.consume();
878
879		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
880		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
881		// The original consumer should see the full path
882		consumer.assert_next("foo/bar/baz", &broadcast.consume());
883
884		// A consumer created from the rooted producer should see the stripped path
885		let mut foo_consumer = foo_producer.consume();
886		foo_consumer.assert_next("bar/baz", &broadcast.consume());
887	}
888
889	#[tokio::test]
890	async fn test_with_root_nested() {
891		let origin = Origin::produce();
892		let broadcast = Broadcast::produce();
893
894		// Create nested roots
895		let foo_producer = origin.with_root("foo").expect("should create foo root");
896		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
897		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
898
899		let mut consumer = origin.consume();
900
901		// Publishing to "baz" should actually publish to "foo/bar/baz"
902		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
903		// The original consumer sees the full path
904		consumer.assert_next("foo/bar/baz", &broadcast.consume());
905
906		// Consumer from foo_bar_producer sees just "baz"
907		let mut foo_bar_consumer = foo_bar_producer.consume();
908		foo_bar_consumer.assert_next("baz", &broadcast.consume());
909	}
910
911	#[tokio::test]
912	async fn test_publish_only_allows() {
913		let origin = Origin::produce();
914		let broadcast = Broadcast::produce();
915
916		// Create a producer that can only publish to "allowed" paths
917		let limited_producer = origin
918			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
919			.expect("should create limited producer");
920
921		// Should be able to publish to allowed paths
922		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
923		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
924		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
925
926		// Should not be able to publish to disallowed paths
927		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
928		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
929		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
930	}
931
932	#[tokio::test]
933	async fn test_publish_only_empty() {
934		let origin = Origin::produce();
935
936		// Creating a producer with no allowed paths should return None
937		assert!(origin.publish_only(&[]).is_none());
938	}
939
940	#[tokio::test]
941	async fn test_consume_only_filters() {
942		let origin = Origin::produce();
943		let broadcast1 = Broadcast::produce();
944		let broadcast2 = Broadcast::produce();
945		let broadcast3 = Broadcast::produce();
946
947		let mut consumer = origin.consume();
948
949		// Publish to different paths
950		origin.publish_broadcast("allowed", broadcast1.consume());
951		origin.publish_broadcast("allowed/nested", broadcast2.consume());
952		origin.publish_broadcast("notallowed", broadcast3.consume());
953
954		// Create a consumer that only sees "allowed" paths
955		let mut limited_consumer = origin
956			.consume_only(&["allowed".into()])
957			.expect("should create limited consumer");
958
959		// Should only receive broadcasts under "allowed"
960		limited_consumer.assert_next("allowed", &broadcast1.consume());
961		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
962		limited_consumer.assert_next_wait(); // Should not see "notallowed"
963
964		// Unscoped consumer should see all
965		consumer.assert_next("allowed", &broadcast1.consume());
966		consumer.assert_next("allowed/nested", &broadcast2.consume());
967		consumer.assert_next("notallowed", &broadcast3.consume());
968	}
969
970	#[tokio::test]
971	async fn test_consume_only_multiple_prefixes() {
972		let origin = Origin::produce();
973		let broadcast1 = Broadcast::produce();
974		let broadcast2 = Broadcast::produce();
975		let broadcast3 = Broadcast::produce();
976
977		origin.publish_broadcast("foo/test", broadcast1.consume());
978		origin.publish_broadcast("bar/test", broadcast2.consume());
979		origin.publish_broadcast("baz/test", broadcast3.consume());
980
981		// Consumer that only sees "foo" and "bar" paths
982		let mut limited_consumer = origin
983			.consume_only(&["foo".into(), "bar".into()])
984			.expect("should create limited consumer");
985
986		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
987		limited_consumer.assert_next("bar/test", &broadcast2.consume());
988		limited_consumer.assert_next("foo/test", &broadcast1.consume());
989		limited_consumer.assert_next_wait(); // Should not see "baz/test"
990	}
991
992	#[tokio::test]
993	async fn test_with_root_and_publish_only() {
994		let origin = Origin::produce();
995		let broadcast = Broadcast::produce();
996
997		// User connects to /foo root
998		let foo_producer = origin.with_root("foo").expect("should create foo root");
999
1000		// Limit them to publish only to "bar" and "goop/pee" within /foo
1001		let limited_producer = foo_producer
1002			.publish_only(&["bar".into(), "goop/pee".into()])
1003			.expect("should create limited producer");
1004
1005		let mut consumer = origin.consume();
1006
1007		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1008		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1009		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1010		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1011		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1012
1013		// Should not be able to publish outside allowed paths
1014		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1015		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1016		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1017
1018		// Original consumer sees full paths
1019		consumer.assert_next("foo/bar", &broadcast.consume());
1020		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1021		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1022		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1023	}
1024
1025	#[tokio::test]
1026	async fn test_with_root_and_consume_only() {
1027		let origin = Origin::produce();
1028		let broadcast1 = Broadcast::produce();
1029		let broadcast2 = Broadcast::produce();
1030		let broadcast3 = Broadcast::produce();
1031
1032		// Publish broadcasts
1033		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1034		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1035		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1036
1037		// User connects to /foo root
1038		let foo_producer = origin.with_root("foo").expect("should create foo root");
1039
1040		// Create consumer limited to "bar" and "goop/pee" within /foo
1041		let mut limited_consumer = foo_producer
1042			.consume_only(&["bar".into(), "goop/pee".into()])
1043			.expect("should create limited consumer");
1044
1045		// Should only see allowed paths (without foo prefix)
1046		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1047		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1048		limited_consumer.assert_next_wait(); // Should not see "other/test"
1049	}
1050
1051	#[tokio::test]
1052	async fn test_with_root_unauthorized() {
1053		let origin = Origin::produce();
1054
1055		// First limit the producer to specific paths
1056		let limited_producer = origin
1057			.publish_only(&["allowed".into()])
1058			.expect("should create limited producer");
1059
1060		// Trying to create a root outside allowed paths should fail
1061		assert!(limited_producer.with_root("notallowed").is_none());
1062
1063		// But creating a root within allowed paths should work
1064		let allowed_root = limited_producer
1065			.with_root("allowed")
1066			.expect("should create allowed root");
1067		assert_eq!(allowed_root.root().as_str(), "allowed");
1068	}
1069
1070	#[tokio::test]
1071	async fn test_wildcard_permission() {
1072		let origin = Origin::produce();
1073		let broadcast = Broadcast::produce();
1074
1075		// Producer with root access (empty string means wildcard)
1076		let root_producer = origin.clone();
1077
1078		// Should be able to publish anywhere
1079		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1080		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1081
1082		// Can create any root
1083		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1084		assert_eq!(foo_producer.root().as_str(), "foo");
1085	}
1086
1087	#[tokio::test]
1088	async fn test_consume_broadcast_with_permissions() {
1089		let origin = Origin::produce();
1090		let broadcast1 = Broadcast::produce();
1091		let broadcast2 = Broadcast::produce();
1092
1093		origin.publish_broadcast("allowed/test", broadcast1.consume());
1094		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1095
1096		// Create limited consumer
1097		let limited_consumer = origin
1098			.consume_only(&["allowed".into()])
1099			.expect("should create limited consumer");
1100
1101		// Should be able to get allowed broadcast
1102		let result = limited_consumer.consume_broadcast("allowed/test");
1103		assert!(result.is_some());
1104		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1105
1106		// Should not be able to get disallowed broadcast
1107		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1108
1109		// Original consumer can get both
1110		let consumer = origin.consume();
1111		assert!(consumer.consume_broadcast("allowed/test").is_some());
1112		assert!(consumer.consume_broadcast("notallowed/test").is_some());
1113	}
1114
1115	#[tokio::test]
1116	async fn test_nested_paths_with_permissions() {
1117		let origin = Origin::produce();
1118		let broadcast = Broadcast::produce();
1119
1120		// Create producer limited to "a/b/c"
1121		let limited_producer = origin
1122			.publish_only(&["a/b/c".into()])
1123			.expect("should create limited producer");
1124
1125		// Should be able to publish to exact path and nested paths
1126		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1127		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1128		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1129
1130		// Should not be able to publish to parent or sibling paths
1131		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1132		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1133		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1134	}
1135
1136	#[tokio::test]
1137	async fn test_multiple_consumers_with_different_permissions() {
1138		let origin = Origin::produce();
1139		let broadcast1 = Broadcast::produce();
1140		let broadcast2 = Broadcast::produce();
1141		let broadcast3 = Broadcast::produce();
1142
1143		// Publish to different paths
1144		origin.publish_broadcast("foo/test", broadcast1.consume());
1145		origin.publish_broadcast("bar/test", broadcast2.consume());
1146		origin.publish_broadcast("baz/test", broadcast3.consume());
1147
1148		// Create consumers with different permissions
1149		let mut foo_consumer = origin
1150			.consume_only(&["foo".into()])
1151			.expect("should create foo consumer");
1152
1153		let mut bar_consumer = origin
1154			.consume_only(&["bar".into()])
1155			.expect("should create bar consumer");
1156
1157		let mut foobar_consumer = origin
1158			.consume_only(&["foo".into(), "bar".into()])
1159			.expect("should create foobar consumer");
1160
1161		// Each consumer should only see their allowed paths
1162		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1163		foo_consumer.assert_next_wait();
1164
1165		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1166		bar_consumer.assert_next_wait();
1167
1168		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1169		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1170		foobar_consumer.assert_next_wait();
1171	}
1172
1173	#[tokio::test]
1174	async fn test_select_with_empty_prefix() {
1175		let origin = Origin::produce();
1176		let broadcast1 = Broadcast::produce();
1177		let broadcast2 = Broadcast::produce();
1178
1179		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1180		let demo_producer = origin.with_root("demo").expect("should create demo root");
1181		let limited_producer = demo_producer
1182			.publish_only(&["worm-node".into(), "foobar".into()])
1183			.expect("should create limited producer");
1184
1185		// Publish some broadcasts
1186		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1187		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1188
1189		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1190		let mut consumer = limited_producer
1191			.consume_only(&["".into()])
1192			.expect("should create consumer with empty prefix");
1193
1194		// Should see both broadcasts (order depends on PathPrefixes sort)
1195		let a1 = consumer.try_announced().expect("expected first announcement");
1196		let a2 = consumer.try_announced().expect("expected second announcement");
1197		consumer.assert_next_wait();
1198
1199		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1200		paths.sort();
1201		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1202	}
1203
1204	#[tokio::test]
1205	async fn test_select_narrowing_scope() {
1206		let origin = Origin::produce();
1207		let broadcast1 = Broadcast::produce();
1208		let broadcast2 = Broadcast::produce();
1209		let broadcast3 = Broadcast::produce();
1210
1211		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1212		let demo_producer = origin.with_root("demo").expect("should create demo root");
1213		let limited_producer = demo_producer
1214			.publish_only(&["worm-node".into(), "foobar".into()])
1215			.expect("should create limited producer");
1216
1217		// Publish broadcasts at different levels
1218		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1219		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1220		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1221
1222		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1223		let mut worm_consumer = limited_producer
1224			.consume_only(&["worm-node".into()])
1225			.expect("should create worm-node consumer");
1226
1227		// Should see worm-node content with paths stripped to ""
1228		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1229		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1230		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1231
1232		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1233		let mut foo_consumer = limited_producer
1234			.consume_only(&["worm-node/foo".into()])
1235			.expect("should create worm-node/foo consumer");
1236
1237		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1238		foo_consumer.assert_next_wait(); // Should NOT see other content
1239	}
1240
1241	#[tokio::test]
1242	async fn test_select_multiple_roots_with_empty_prefix() {
1243		let origin = Origin::produce();
1244		let broadcast1 = Broadcast::produce();
1245		let broadcast2 = Broadcast::produce();
1246		let broadcast3 = Broadcast::produce();
1247
1248		// Producer with multiple allowed roots
1249		let limited_producer = origin
1250			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1251			.expect("should create limited producer");
1252
1253		// Publish to each root
1254		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1255		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1256		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1257
1258		// consume_only with empty prefix should maintain all roots
1259		let mut consumer = limited_producer
1260			.consume_only(&["".into()])
1261			.expect("should create consumer with empty prefix");
1262
1263		// Should see all broadcasts from all roots
1264		consumer.assert_next("app1/data", &broadcast1.consume());
1265		consumer.assert_next("app2/config", &broadcast2.consume());
1266		consumer.assert_next("shared/resource", &broadcast3.consume());
1267		consumer.assert_next_wait();
1268	}
1269
1270	#[tokio::test]
1271	async fn test_publish_only_with_empty_prefix() {
1272		let origin = Origin::produce();
1273		let broadcast = Broadcast::produce();
1274
1275		// Producer with specific allowed paths
1276		let limited_producer = origin
1277			.publish_only(&["services/api".into(), "services/web".into()])
1278			.expect("should create limited producer");
1279
1280		// publish_only with empty prefix should keep the same restrictions
1281		let same_producer = limited_producer
1282			.publish_only(&["".into()])
1283			.expect("should create producer with empty prefix");
1284
1285		// Should still have the same publishing restrictions
1286		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1287		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1288		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1289		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1290	}
1291
1292	#[tokio::test]
1293	async fn test_select_narrowing_to_deeper_path() {
1294		let origin = Origin::produce();
1295		let broadcast1 = Broadcast::produce();
1296		let broadcast2 = Broadcast::produce();
1297		let broadcast3 = Broadcast::produce();
1298
1299		// Producer with broad permission
1300		let limited_producer = origin
1301			.publish_only(&["org".into()])
1302			.expect("should create limited producer");
1303
1304		// Publish at various depths
1305		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1306		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1307		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1308
1309		// Narrow down to team2 only
1310		let mut team2_consumer = limited_producer
1311			.consume_only(&["org/team2".into()])
1312			.expect("should create team2 consumer");
1313
1314		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1315		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1316
1317		// Further narrow down to team1/project1
1318		let mut project1_consumer = limited_producer
1319			.consume_only(&["org/team1/project1".into()])
1320			.expect("should create project1 consumer");
1321
1322		// Should only see project1 content at root
1323		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1324		project1_consumer.assert_next_wait();
1325	}
1326
1327	#[tokio::test]
1328	async fn test_select_with_non_matching_prefix() {
1329		let origin = Origin::produce();
1330
1331		// Producer with specific allowed paths
1332		let limited_producer = origin
1333			.publish_only(&["allowed/path".into()])
1334			.expect("should create limited producer");
1335
1336		// Trying to consume_only with a completely different prefix should return None
1337		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1338
1339		// Similarly for publish_only
1340		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1341	}
1342
1343	// Regression test for https://github.com/moq-dev/moq/issues/910
1344	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1345	#[tokio::test]
1346	async fn test_with_root_trailing_slash_consumer() {
1347		let origin = Origin::produce();
1348
1349		// Use an owned String so the trailing slash is NOT normalized away.
1350		let prefix = "some_prefix/".to_string();
1351		let mut consumer = origin.consume().with_root(prefix).unwrap();
1352
1353		let b = origin.create_broadcast("some_prefix/test").unwrap();
1354		consumer.assert_next("test", &b.consume());
1355	}
1356
1357	// Same issue but for the producer side of with_root
1358	#[tokio::test]
1359	async fn test_with_root_trailing_slash_producer() {
1360		let origin = Origin::produce();
1361
1362		// Use an owned String so the trailing slash is NOT normalized away.
1363		let prefix = "some_prefix/".to_string();
1364		let rooted = origin.with_root(prefix).unwrap();
1365
1366		let b = rooted.create_broadcast("test").unwrap();
1367
1368		let mut consumer = rooted.consume();
1369		consumer.assert_next("test", &b.consume());
1370	}
1371
1372	// Verify unannounce also doesn't panic with trailing slash
1373	#[tokio::test]
1374	async fn test_with_root_trailing_slash_unannounce() {
1375		tokio::time::pause();
1376
1377		let origin = Origin::produce();
1378
1379		let prefix = "some_prefix/".to_string();
1380		let mut consumer = origin.consume().with_root(prefix).unwrap();
1381
1382		let b = origin.create_broadcast("some_prefix/test").unwrap();
1383		consumer.assert_next("test", &b.consume());
1384
1385		// Drop the broadcast producer to trigger unannounce
1386		drop(b);
1387		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1388
1389		// unannounce also calls strip_prefix(&self.root).unwrap()
1390		consumer.assert_next_none("test");
1391	}
1392
1393	#[tokio::test]
1394	async fn test_select_maintains_access_with_wider_prefix() {
1395		let origin = Origin::produce();
1396		let broadcast1 = Broadcast::produce();
1397		let broadcast2 = Broadcast::produce();
1398
1399		// Setup: user with root "demo" allowed to subscribe to specific paths
1400		let demo_producer = origin.with_root("demo").expect("should create demo root");
1401		let user_producer = demo_producer
1402			.publish_only(&["worm-node".into(), "foobar".into()])
1403			.expect("should create user producer");
1404
1405		// Publish some data
1406		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1407		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1408
1409		// Key test: consume_only with "" should maintain access to allowed roots
1410		let mut consumer = user_producer
1411			.consume_only(&["".into()])
1412			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1413
1414		// Should still receive broadcasts from allowed paths (order not guaranteed)
1415		let a1 = consumer.try_announced().expect("expected first announcement");
1416		let a2 = consumer.try_announced().expect("expected second announcement");
1417		consumer.assert_next_wait();
1418
1419		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1420		paths.sort();
1421		assert_eq!(paths, ["foobar", "worm-node/data"]);
1422
1423		// Also test that we can still narrow the scope
1424		let mut narrow_consumer = user_producer
1425			.consume_only(&["worm-node".into()])
1426			.expect("should be able to narrow scope to worm-node");
1427
1428		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1429		narrow_consumer.assert_next_wait(); // Should not see foobar
1430	}
1431
1432	#[tokio::test]
1433	async fn test_duplicate_prefixes_deduped() {
1434		let origin = Origin::produce();
1435		let broadcast = Broadcast::produce();
1436
1437		// publish_only with duplicate prefixes should work (deduped internally)
1438		let producer = origin
1439			.publish_only(&["demo".into(), "demo".into()])
1440			.expect("should create producer");
1441
1442		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1443
1444		let mut consumer = producer.consume();
1445		consumer.assert_next("demo/stream", &broadcast.consume());
1446		consumer.assert_next_wait();
1447	}
1448
1449	#[tokio::test]
1450	async fn test_overlapping_prefixes_deduped() {
1451		let origin = Origin::produce();
1452		let broadcast = Broadcast::produce();
1453
1454		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1455		let producer = origin
1456			.publish_only(&["demo".into(), "demo/foo".into()])
1457			.expect("should create producer");
1458
1459		// Can still publish under "demo/bar" since "demo" covers everything
1460		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1461
1462		let mut consumer = producer.consume();
1463		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1464		consumer.assert_next_wait();
1465	}
1466
1467	#[tokio::test]
1468	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1469		let origin = Origin::produce();
1470		let broadcast = Broadcast::produce();
1471
1472		// Both "demo" and "demo/foo" are requested — should only have one node
1473		let producer = origin
1474			.publish_only(&["demo".into(), "demo/foo".into()])
1475			.expect("should create producer");
1476
1477		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1478
1479		let mut consumer = producer.consume();
1480		// Should only get ONE announcement (not two from overlapping nodes)
1481		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1482		consumer.assert_next_wait();
1483	}
1484
1485	#[tokio::test]
1486	async fn test_allowed_returns_deduped_prefixes() {
1487		let origin = Origin::produce();
1488
1489		let producer = origin
1490			.publish_only(&["demo".into(), "demo/foo".into(), "anon".into()])
1491			.expect("should create producer");
1492
1493		let allowed: Vec<_> = producer.allowed().collect();
1494		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1495	}
1496}