Skip to main content

moq_lite/model/
origin.rs

1use std::{
2	collections::{HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5};
6
7use rand::Rng;
8use tokio::sync::mpsc;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero 62-bit id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	pub fn random() -> Self {
38		let mut rng = rand::rng();
39		let id = rng.random_range(1..(1u64 << 62));
40		Self { id }
41	}
42
43	/// Consume this [Origin] to create a producer that carries its id.
44	pub fn produce(self) -> OriginProducer {
45		OriginProducer::new(self)
46	}
47}
48
49impl From<u64> for Origin {
50	fn from(id: u64) -> Self {
51		Self { id }
52	}
53}
54
55impl fmt::Display for Origin {
56	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57		self.id.fmt(f)
58	}
59}
60
61impl<V: Copy> Encode<V> for Origin
62where
63	u64: Encode<V>,
64{
65	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
66		self.id.encode(w, version)
67	}
68}
69
70impl<V: Copy> Decode<V> for Origin
71where
72	u64: Decode<V>,
73{
74	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
75		let id = u64::decode(r, version)?;
76		if id >= 1u64 << 62 {
77			return Err(DecodeError::InvalidValue);
78		}
79		Ok(Self { id })
80	}
81}
82
83/// Maximum number of origins (hops) an [`OriginList`] can hold.
84///
85/// Caps pathological or loop-induced announcements at a reasonable cluster
86/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
87/// silently truncating.
88pub(crate) const MAX_HOPS: usize = 32;
89
90/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
91///
92/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
93/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
94#[derive(Debug, Clone, Default, PartialEq, Eq)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96pub struct OriginList(Vec<Origin>);
97
98/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100#[non_exhaustive]
101pub struct TooManyOrigins;
102
103impl fmt::Display for TooManyOrigins {
104	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105		write!(f, "too many origins (max {MAX_HOPS})")
106	}
107}
108
109impl std::error::Error for TooManyOrigins {}
110
111impl From<TooManyOrigins> for DecodeError {
112	fn from(_: TooManyOrigins) -> Self {
113		DecodeError::BoundsExceeded
114	}
115}
116
117impl OriginList {
118	/// Create an empty list.
119	pub fn new() -> Self {
120		Self(Vec::new())
121	}
122
123	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
124	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
125		if self.0.len() >= MAX_HOPS {
126			return Err(TooManyOrigins);
127		}
128		self.0.push(origin);
129		Ok(())
130	}
131
132	/// Returns true if any entry matches `origin`.
133	pub fn contains(&self, origin: &Origin) -> bool {
134		self.0.contains(origin)
135	}
136
137	/// Number of entries currently in the list (always `<= MAX_HOPS`).
138	pub fn len(&self) -> usize {
139		self.0.len()
140	}
141
142	/// Whether the list contains no entries.
143	pub fn is_empty(&self) -> bool {
144		self.0.is_empty()
145	}
146
147	/// Iterate over the entries in hop order (oldest first).
148	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
149		self.0.iter()
150	}
151
152	/// Borrow the entries as a slice.
153	pub fn as_slice(&self) -> &[Origin] {
154		&self.0
155	}
156}
157
158impl TryFrom<Vec<Origin>> for OriginList {
159	type Error = TooManyOrigins;
160
161	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
162		if v.len() > MAX_HOPS {
163			return Err(TooManyOrigins);
164		}
165		Ok(Self(v))
166	}
167}
168
169impl<'a> IntoIterator for &'a OriginList {
170	type Item = &'a Origin;
171	type IntoIter = std::slice::Iter<'a, Origin>;
172
173	fn into_iter(self) -> Self::IntoIter {
174		self.iter()
175	}
176}
177
178impl<V: Copy> Encode<V> for OriginList
179where
180	u64: Encode<V>,
181	Origin: Encode<V>,
182{
183	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
184		(self.0.len() as u64).encode(w, version)?;
185		for origin in &self.0 {
186			origin.encode(w, version)?;
187		}
188		Ok(())
189	}
190}
191
192impl<V: Copy> Decode<V> for OriginList
193where
194	u64: Decode<V>,
195	Origin: Decode<V>,
196{
197	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
198		let count = u64::decode(r, version)? as usize;
199		if count > MAX_HOPS {
200			return Err(DecodeError::BoundsExceeded);
201		}
202		let mut list = Vec::with_capacity(count);
203		for _ in 0..count {
204			list.push(Origin::decode(r, version)?);
205		}
206		Ok(Self(list))
207	}
208}
209
210static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
213struct ConsumerId(u64);
214
215impl ConsumerId {
216	fn new() -> Self {
217		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
218	}
219}
220
221// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
222struct OriginBroadcast {
223	path: PathOwned,
224	active: BroadcastConsumer,
225	backup: VecDeque<BroadcastConsumer>,
226}
227
228#[derive(Clone)]
229struct OriginConsumerNotify {
230	root: PathOwned,
231	tx: mpsc::UnboundedSender<OriginAnnounce>,
232}
233
234impl OriginConsumerNotify {
235	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
236		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
237		self.tx.send((path, Some(broadcast))).expect("consumer closed");
238	}
239
240	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
241		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
242		self.tx.send((path.clone(), None)).expect("consumer closed");
243		self.tx.send((path, Some(broadcast))).expect("consumer closed");
244	}
245
246	fn unannounce(&self, path: impl AsPath) {
247		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
248		self.tx.send((path, None)).expect("consumer closed");
249	}
250}
251
252struct NotifyNode {
253	parent: Option<Lock<NotifyNode>>,
254
255	// Consumers that are subscribed to this node.
256	// We store a consumer ID so we can remove it easily when it closes.
257	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
258}
259
260impl NotifyNode {
261	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
262		Self {
263			parent,
264			consumers: HashMap::new(),
265		}
266	}
267
268	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
269		for consumer in self.consumers.values() {
270			consumer.announce(path.as_path(), broadcast.clone());
271		}
272
273		if let Some(parent) = &self.parent {
274			parent.lock().announce(path, broadcast);
275		}
276	}
277
278	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
279		for consumer in self.consumers.values() {
280			consumer.reannounce(path.as_path(), broadcast.clone());
281		}
282
283		if let Some(parent) = &self.parent {
284			parent.lock().reannounce(path, broadcast);
285		}
286	}
287
288	fn unannounce(&mut self, path: impl AsPath) {
289		for consumer in self.consumers.values() {
290			consumer.unannounce(path.as_path());
291		}
292
293		if let Some(parent) = &self.parent {
294			parent.lock().unannounce(path);
295		}
296	}
297}
298
299struct OriginNode {
300	// The broadcast that is published to this node.
301	broadcast: Option<OriginBroadcast>,
302
303	// Nested nodes, one level down the tree.
304	nested: HashMap<String, Lock<OriginNode>>,
305
306	// Unfortunately, to notify consumers we need to traverse back up the tree.
307	notify: Lock<NotifyNode>,
308}
309
310impl OriginNode {
311	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
312		Self {
313			broadcast: None,
314			nested: HashMap::new(),
315			notify: Lock::new(NotifyNode::new(parent)),
316		}
317	}
318
319	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
320		let (dir, rest) = path.next_part().expect("leaf called with empty path");
321
322		let next = self.entry(dir);
323		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
324	}
325
326	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
327		match self.nested.get(dir) {
328			Some(next) => next.clone(),
329			None => {
330				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
331				self.nested.insert(dir.to_string(), next.clone());
332				next
333			}
334		}
335	}
336
337	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
338		let full = full.as_path();
339		let rest = relative.as_path();
340
341		// If the path has a directory component, then publish it to the nested node.
342		if let Some((dir, relative)) = rest.next_part() {
343			// Not using entry to avoid allocating a string most of the time.
344			self.entry(dir).lock().publish(&full, broadcast, &relative);
345		} else if let Some(existing) = &mut self.broadcast {
346			// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
347			// on ties, the newer broadcast wins, since the previous one may be about to close.
348			//
349			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
350			// backup queue can't accumulate clones of the active entry and trigger redundant
351			// reannouncements when a peer churns.
352			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
353				return;
354			}
355
356			if broadcast.hops.len() <= existing.active.hops.len() {
357				let old = existing.active.clone();
358				existing.active = broadcast.clone();
359				existing.backup.push_back(old);
360
361				self.notify.lock().reannounce(full, broadcast);
362			} else {
363				// Longer path: keep as a backup in case the active one drops.
364				existing.backup.push_back(broadcast.clone());
365			}
366		} else {
367			// This node is a leaf with no existing broadcast.
368			self.broadcast = Some(OriginBroadcast {
369				path: full.to_owned(),
370				active: broadcast.clone(),
371				backup: VecDeque::new(),
372			});
373			self.notify.lock().announce(full, broadcast);
374		}
375	}
376
377	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
378		self.consume_initial(&mut notify);
379		self.notify.lock().consumers.insert(id, notify);
380	}
381
382	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
383		if let Some(broadcast) = &self.broadcast {
384			notify.announce(&broadcast.path, broadcast.active.clone());
385		}
386
387		// Recursively subscribe to all nested nodes.
388		for nested in self.nested.values() {
389			nested.lock().consume_initial(notify);
390		}
391	}
392
393	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
394		let rest = rest.as_path();
395
396		if let Some((dir, rest)) = rest.next_part() {
397			let node = self.nested.get(dir)?.lock();
398			node.consume_broadcast(&rest)
399		} else {
400			self.broadcast.as_ref().map(|b| b.active.clone())
401		}
402	}
403
404	fn unconsume(&mut self, id: ConsumerId) {
405		self.notify.lock().consumers.remove(&id).expect("consumer not found");
406		if self.is_empty() {
407			//tracing::warn!("TODO: empty node; memory leak");
408			// This happens when consuming a path that is not being broadcasted.
409		}
410	}
411
412	// Returns true if the broadcast should be unannounced.
413	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
414		let full = full.as_path();
415		let relative = relative.as_path();
416
417		if let Some((dir, relative)) = relative.next_part() {
418			let nested = self.entry(dir);
419			let mut locked = nested.lock();
420			locked.remove(&full, broadcast, &relative);
421
422			if locked.is_empty() {
423				drop(locked);
424				self.nested.remove(dir);
425			}
426		} else {
427			let entry = match &mut self.broadcast {
428				Some(existing) => existing,
429				None => return,
430			};
431
432			// See if we can remove the broadcast from the backup list.
433			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
434			if let Some(pos) = pos {
435				entry.backup.remove(pos);
436				// Nothing else to do
437				return;
438			}
439
440			// Okay so it must be the active broadcast or else we fucked up.
441			assert!(entry.active.is_clone(&broadcast));
442
443			// Promote the backup with the shortest hop chain so we keep preferring short paths.
444			// Ties break toward the oldest (FIFO) since min_by_key returns the first minimum.
445			let best = entry
446				.backup
447				.iter()
448				.enumerate()
449				.min_by_key(|(_, b)| b.hops.len())
450				.map(|(i, _)| i);
451			if let Some(idx) = best {
452				let active = entry.backup.remove(idx).expect("index in range");
453				entry.active = active;
454				self.notify.lock().reannounce(full, &entry.active);
455			} else {
456				// No more backups, so remove the entry.
457				self.broadcast = None;
458				self.notify.lock().unannounce(full);
459			}
460		}
461	}
462
463	fn is_empty(&self) -> bool {
464		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
465	}
466}
467
468#[derive(Clone)]
469struct OriginNodes {
470	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
471}
472
473impl OriginNodes {
474	// Returns nested roots that match the prefixes.
475	// PathPrefixes guarantees no duplicates or overlapping prefixes.
476	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
477		let mut roots = Vec::new();
478
479		for (root, state) in &self.nodes {
480			for prefix in prefixes {
481				if root.has_prefix(prefix) {
482					// Keep the existing node if we're allowed to access it.
483					roots.push((root.to_owned(), state.clone()));
484					continue;
485				}
486
487				if let Some(suffix) = prefix.strip_prefix(root) {
488					// If the requested prefix is larger than the allowed prefix, then we further scope it.
489					let nested = state.lock().leaf(&suffix);
490					roots.push((prefix.to_owned(), nested));
491				}
492			}
493		}
494
495		if roots.is_empty() {
496			None
497		} else {
498			Some(Self { nodes: roots })
499		}
500	}
501
502	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
503		let new_root = new_root.as_path();
504		let mut roots = Vec::new();
505
506		if new_root.is_empty() {
507			return Some(self.clone());
508		}
509
510		for (root, state) in &self.nodes {
511			if let Some(suffix) = root.strip_prefix(&new_root) {
512				// If the old root is longer than the new root, shorten the keys.
513				roots.push((suffix.to_owned(), state.clone()));
514			} else if let Some(suffix) = new_root.strip_prefix(root) {
515				// If the new root is longer than the old root, add a new root.
516				// NOTE: suffix can't be empty
517				let nested = state.lock().leaf(&suffix);
518				roots.push(("".into(), nested));
519			}
520		}
521
522		if roots.is_empty() {
523			None
524		} else {
525			Some(Self { nodes: roots })
526		}
527	}
528
529	// Returns the root that has this prefix.
530	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
531		let path = path.as_path();
532
533		for (root, state) in &self.nodes {
534			if let Some(suffix) = path.strip_prefix(root) {
535				return Some((state.clone(), suffix.to_owned()));
536			}
537		}
538
539		None
540	}
541}
542
543impl Default for OriginNodes {
544	fn default() -> Self {
545		Self {
546			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
547		}
548	}
549}
550
551/// A broadcast path and its associated consumer, or None if closed.
552pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
553
554/// Announces broadcasts to consumers over the network.
555#[derive(Clone)]
556pub struct OriginProducer {
557	// Identity for this origin. Appended to broadcast hops when
558	// re-announcing so downstream relays can detect loops and prefer the
559	// shortest path.
560	info: Origin,
561
562	// The roots of the tree that we are allowed to publish.
563	// A path of "" means we can publish anything.
564	nodes: OriginNodes,
565
566	// The prefix that is automatically stripped from all paths.
567	root: PathOwned,
568}
569
570impl std::ops::Deref for OriginProducer {
571	type Target = Origin;
572
573	fn deref(&self) -> &Self::Target {
574		&self.info
575	}
576}
577
578impl OriginProducer {
579	/// Build a producer for the given origin id with no scoped prefix and no
580	/// pre-existing broadcasts. Prefer [`Origin::produce`].
581	pub fn new(info: Origin) -> Self {
582		Self {
583			info,
584			nodes: OriginNodes::default(),
585			root: PathOwned::default(),
586		}
587	}
588
589	/// Create and publish a new broadcast, returning the producer.
590	///
591	/// This is a helper method when you only want to publish a broadcast to a single origin.
592	/// Returns [None] if the broadcast is not allowed to be published.
593	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
594		let broadcast = Broadcast::new().produce();
595		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
596	}
597
598	/// Publish a broadcast, announcing it to all consumers.
599	///
600	/// The broadcast will be unannounced when it is closed.
601	/// If there is already a broadcast with the same path, the new one replaces the active only
602	/// if its hop path is shorter or equal; otherwise it is queued as a backup.
603	/// When the active broadcast closes, the backup with the shortest hop path is promoted and
604	/// reannounced. Backups that close before being promoted are silently dropped.
605	///
606	/// Returns false if the broadcast is not allowed to be published.
607	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
608		let path = path.as_path();
609
610		// Loop detection: refuse broadcasts whose hop chain already contains our id.
611		if broadcast.hops.contains(&self.info) {
612			return false;
613		}
614
615		let (root, rest) = match self.nodes.get(&path) {
616			Some(root) => root,
617			None => return false,
618		};
619
620		let full = self.root.join(&path);
621
622		root.lock().publish(&full, &broadcast, &rest);
623		let root = root.clone();
624
625		web_async::spawn(async move {
626			broadcast.closed().await;
627			root.lock().remove(&full, broadcast, &rest);
628		});
629
630		true
631	}
632
633	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
634	///
635	/// Returns None if there are no legal prefixes (the requested prefixes are
636	/// disjoint from this producer's current scope).
637	// TODO accept PathPrefixes instead of &[Path]
638	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
639		let prefixes = PathPrefixes::new(prefixes);
640		Some(OriginProducer {
641			info: self.info,
642			nodes: self.nodes.select(&prefixes)?,
643			root: self.root.clone(),
644		})
645	}
646
647	/// Subscribe to all announced broadcasts.
648	pub fn consume(&self) -> OriginConsumer {
649		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
650	}
651
652	/// Get a broadcast by path if it has *already* been published.
653	///
654	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
655	/// announcement-cursor allocation, which is currently relatively expensive.
656	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
657	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
658		let path = path.as_path();
659		let (root, rest) = self.nodes.get(&path)?;
660		let state = root.lock();
661		state.consume_broadcast(&rest)
662	}
663
664	/// Returns a new OriginProducer that automatically strips out the provided prefix.
665	///
666	/// Returns None if the provided root is not authorized; when [`Self::scope`]
667	/// was already used without a wildcard.
668	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
669		let prefix = prefix.as_path();
670
671		Some(Self {
672			info: self.info,
673			root: self.root.join(&prefix).to_owned(),
674			nodes: self.nodes.root(&prefix)?,
675		})
676	}
677
678	/// Returns the root that is automatically stripped from all paths.
679	pub fn root(&self) -> &Path<'_> {
680		&self.root
681	}
682
683	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
684	// TODO return PathPrefixes
685	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
686		self.nodes.nodes.iter().map(|(root, _)| root)
687	}
688
689	/// Converts a relative path to an absolute path.
690	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
691		self.root.join(path)
692	}
693}
694
695/// Consumes announced broadcasts matching against an optional prefix.
696///
697/// NOTE: Clone is expensive, try to avoid it.
698pub struct OriginConsumer {
699	id: ConsumerId,
700	// Identity of the origin this consumer was derived from.
701	info: Origin,
702	nodes: OriginNodes,
703	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
704
705	// A prefix that is automatically stripped from all paths.
706	root: PathOwned,
707}
708
709impl std::ops::Deref for OriginConsumer {
710	type Target = Origin;
711
712	fn deref(&self) -> &Self::Target {
713		&self.info
714	}
715}
716
717impl OriginConsumer {
718	fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
719		let (tx, rx) = mpsc::unbounded_channel();
720
721		let id = ConsumerId::new();
722
723		for (_, state) in &nodes.nodes {
724			let notify = OriginConsumerNotify {
725				root: root.clone(),
726				tx: tx.clone(),
727			};
728			state.lock().consume(id, notify);
729		}
730
731		Self {
732			id,
733			info,
734			nodes,
735			updates: rx,
736			root,
737		}
738	}
739
740	/// Returns the next (un)announced broadcast and the absolute path.
741	///
742	/// The broadcast will only be announced if it was previously unannounced.
743	/// The same path won't be announced/unannounced twice, instead it will toggle.
744	/// Returns None if the consumer is closed.
745	///
746	/// Note: The returned path is absolute and will always match this consumer's prefix.
747	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
748		self.updates.recv().await
749	}
750
751	/// Returns the next (un)announced broadcast and the absolute path without blocking.
752	///
753	/// Returns None if there is no update available; NOT because the consumer is closed.
754	/// You have to use `is_closed` to check if the consumer is closed.
755	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
756		self.updates.try_recv().ok()
757	}
758
759	/// Create another consumer with its own announcement cursor over the same origin.
760	pub fn consume(&self) -> Self {
761		self.clone()
762	}
763
764	/// Get a broadcast by path if it has *already* been announced.
765	///
766	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
767	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
768	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
769	/// (blocks until announced) unless you can guarantee the announcement has already
770	/// landed (e.g. you're responding to an `announced()` callback).
771	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
772		let path = path.as_path();
773		let (root, rest) = self.nodes.get(&path)?;
774		let state = root.lock();
775		state.consume_broadcast(&rest)
776	}
777
778	/// Block until a broadcast with the given path is announced and return it.
779	///
780	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
781	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
782	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
783	///
784	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
785	/// cannot guarantee the announcement has already been received.
786	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
787		let path = path.as_path();
788
789		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
790		let mut consumer = self.scope(std::slice::from_ref(&path))?;
791
792		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
793		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
794		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
795		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
796			return None;
797		}
798
799		loop {
800			let (announced, broadcast) = consumer.announced().await?;
801			// `scope` narrows by prefix, but we only want an exact-path match.
802			if announced.as_path() == path {
803				if let Some(broadcast) = broadcast {
804					return Some(broadcast);
805				}
806			}
807		}
808	}
809
810	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
811	///
812	/// Returns None if there are no legal prefixes (the requested prefixes are
813	/// disjoint from this consumer's current scope, so it would always return None).
814	// TODO accept PathPrefixes instead of &[Path]
815	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
816		let prefixes = PathPrefixes::new(prefixes);
817		Some(OriginConsumer::new(
818			self.info,
819			self.root.clone(),
820			self.nodes.select(&prefixes)?,
821		))
822	}
823
824	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
825	///
826	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
827	/// already used without a wildcard.
828	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
829		let prefix = prefix.as_path();
830
831		Some(Self::new(
832			self.info,
833			self.root.join(&prefix).to_owned(),
834			self.nodes.root(&prefix)?,
835		))
836	}
837
838	/// Returns the prefix that is automatically stripped from all paths.
839	pub fn root(&self) -> &Path<'_> {
840		&self.root
841	}
842
843	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
844	// TODO return PathPrefixes
845	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
846		self.nodes.nodes.iter().map(|(root, _)| root)
847	}
848
849	/// Converts a relative path to an absolute path.
850	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
851		self.root.join(path)
852	}
853}
854
855impl Drop for OriginConsumer {
856	fn drop(&mut self) {
857		for (_, root) in &self.nodes.nodes {
858			root.lock().unconsume(self.id);
859		}
860	}
861}
862
863impl Clone for OriginConsumer {
864	fn clone(&self) -> Self {
865		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
866	}
867}
868
869#[cfg(test)]
870use futures::FutureExt;
871
872#[cfg(test)]
873impl OriginConsumer {
874	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
875		let expected = expected.as_path();
876		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
877		assert_eq!(path, expected, "wrong path");
878		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
879	}
880
881	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
882		let expected = expected.as_path();
883		let (path, active) = self.try_announced().expect("no next");
884		assert_eq!(path, expected, "wrong path");
885		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
886	}
887
888	pub fn assert_next_none(&mut self, expected: impl AsPath) {
889		let expected = expected.as_path();
890		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
891		assert_eq!(path, expected, "wrong path");
892		assert!(active.is_none(), "should be unannounced");
893	}
894
895	pub fn assert_next_wait(&mut self) {
896		if let Some(res) = self.announced().now_or_never() {
897			panic!("next should block: got {:?}", res.map(|(path, _)| path));
898		}
899	}
900
901	/*
902	pub fn assert_next_closed(&mut self) {
903		assert!(
904			self.announced().now_or_never().expect("next blocked").is_none(),
905			"next should be closed"
906		);
907	}
908	*/
909}
910
911#[cfg(test)]
912mod tests {
913	use crate::Broadcast;
914
915	use super::*;
916
917	#[test]
918	fn origin_list_push_fails_at_limit() {
919		let mut list = OriginList::new();
920		for _ in 0..MAX_HOPS {
921			list.push(Origin::random()).unwrap();
922		}
923		assert_eq!(list.len(), MAX_HOPS);
924		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
925	}
926
927	#[test]
928	fn origin_list_try_from_vec_enforces_limit() {
929		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
930		assert!(OriginList::try_from(under).is_ok());
931
932		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
933		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
934	}
935
936	#[tokio::test]
937	async fn test_announce() {
938		tokio::time::pause();
939
940		let origin = Origin::random().produce();
941		let broadcast1 = Broadcast::new().produce();
942		let broadcast2 = Broadcast::new().produce();
943
944		let mut consumer1 = origin.consume();
945		// Make a new consumer that should get it.
946		consumer1.assert_next_wait();
947
948		// Publish the first broadcast.
949		origin.publish_broadcast("test1", broadcast1.consume());
950
951		consumer1.assert_next("test1", &broadcast1.consume());
952		consumer1.assert_next_wait();
953
954		// Make a new consumer that should get the existing broadcast.
955		// But we don't consume it yet.
956		let mut consumer2 = origin.consume();
957
958		// Publish the second broadcast.
959		origin.publish_broadcast("test2", broadcast2.consume());
960
961		consumer1.assert_next("test2", &broadcast2.consume());
962		consumer1.assert_next_wait();
963
964		consumer2.assert_next("test1", &broadcast1.consume());
965		consumer2.assert_next("test2", &broadcast2.consume());
966		consumer2.assert_next_wait();
967
968		// Close the first broadcast.
969		drop(broadcast1);
970
971		// Wait for the async task to run.
972		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
973
974		// All consumers should get a None now.
975		consumer1.assert_next_none("test1");
976		consumer2.assert_next_none("test1");
977		consumer1.assert_next_wait();
978		consumer2.assert_next_wait();
979
980		// And a new consumer only gets the last broadcast.
981		let mut consumer3 = origin.consume();
982		consumer3.assert_next("test2", &broadcast2.consume());
983		consumer3.assert_next_wait();
984
985		// Close the other producer and make sure it cleans up
986		drop(broadcast2);
987
988		// Wait for the async task to run.
989		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
990
991		consumer1.assert_next_none("test2");
992		consumer2.assert_next_none("test2");
993		consumer3.assert_next_none("test2");
994
995		/* TODO close the origin consumer when the producer is dropped
996		consumer1.assert_next_closed();
997		consumer2.assert_next_closed();
998		consumer3.assert_next_closed();
999		*/
1000	}
1001
1002	#[tokio::test]
1003	async fn test_duplicate() {
1004		tokio::time::pause();
1005
1006		let origin = Origin::random().produce();
1007
1008		let broadcast1 = Broadcast::new().produce();
1009		let broadcast2 = Broadcast::new().produce();
1010		let broadcast3 = Broadcast::new().produce();
1011
1012		let consumer1 = broadcast1.consume();
1013		let consumer2 = broadcast2.consume();
1014		let consumer3 = broadcast3.consume();
1015
1016		let mut consumer = origin.consume();
1017
1018		origin.publish_broadcast("test", consumer1.clone());
1019		origin.publish_broadcast("test", consumer2.clone());
1020		origin.publish_broadcast("test", consumer3.clone());
1021		assert!(consumer.get_broadcast("test").is_some());
1022
1023		// On equal hop lengths, each new publish replaces the active and reannounces.
1024		consumer.assert_next("test", &consumer1);
1025		consumer.assert_next_none("test");
1026		consumer.assert_next("test", &consumer2);
1027		consumer.assert_next_none("test");
1028		consumer.assert_next("test", &consumer3);
1029
1030		// Drop a backup, nothing should change.
1031		drop(broadcast2);
1032
1033		// Wait for the async task to run.
1034		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1035
1036		assert!(consumer.get_broadcast("test").is_some());
1037		consumer.assert_next_wait();
1038
1039		// Drop the active, we should reannounce with the remaining backup.
1040		drop(broadcast3);
1041
1042		// Wait for the async task to run.
1043		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1044
1045		assert!(consumer.get_broadcast("test").is_some());
1046		consumer.assert_next_none("test");
1047		consumer.assert_next("test", &consumer1);
1048
1049		// Drop the final broadcast, we should unannounce.
1050		drop(broadcast1);
1051
1052		// Wait for the async task to run.
1053		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1054		assert!(consumer.get_broadcast("test").is_none());
1055
1056		consumer.assert_next_none("test");
1057		consumer.assert_next_wait();
1058	}
1059
1060	#[tokio::test]
1061	async fn test_duplicate_reverse() {
1062		tokio::time::pause();
1063
1064		let origin = Origin::random().produce();
1065		let broadcast1 = Broadcast::new().produce();
1066		let broadcast2 = Broadcast::new().produce();
1067
1068		origin.publish_broadcast("test", broadcast1.consume());
1069		origin.publish_broadcast("test", broadcast2.consume());
1070		assert!(origin.consume().get_broadcast("test").is_some());
1071
1072		// This is harder, dropping the new broadcast first.
1073		drop(broadcast2);
1074
1075		// Wait for the cleanup async task to run.
1076		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1077		assert!(origin.consume().get_broadcast("test").is_some());
1078
1079		drop(broadcast1);
1080
1081		// Wait for the cleanup async task to run.
1082		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1083		assert!(origin.consume().get_broadcast("test").is_none());
1084	}
1085
1086	#[tokio::test]
1087	async fn test_double_publish() {
1088		tokio::time::pause();
1089
1090		let origin = Origin::random().produce();
1091		let broadcast = Broadcast::new().produce();
1092
1093		// Ensure it doesn't crash.
1094		origin.publish_broadcast("test", broadcast.consume());
1095		origin.publish_broadcast("test", broadcast.consume());
1096
1097		assert!(origin.consume().get_broadcast("test").is_some());
1098
1099		drop(broadcast);
1100
1101		// Wait for the async task to run.
1102		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1103		assert!(origin.consume().get_broadcast("test").is_none());
1104	}
1105	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
1106	#[tokio::test]
1107	#[should_panic]
1108	async fn test_128() {
1109		let origin = Origin::random().produce();
1110		let broadcast = Broadcast::new().produce();
1111
1112		let mut consumer = origin.consume();
1113		for i in 0..256 {
1114			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1115		}
1116
1117		for i in 0..256 {
1118			consumer.assert_next(format!("test{i}"), &broadcast.consume());
1119		}
1120	}
1121
1122	#[tokio::test]
1123	async fn test_128_fix() {
1124		let origin = Origin::random().produce();
1125		let broadcast = Broadcast::new().produce();
1126
1127		let mut consumer = origin.consume();
1128		for i in 0..256 {
1129			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1130		}
1131
1132		for i in 0..256 {
1133			// try_next does not have the same issue because it's synchronous.
1134			consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
1135		}
1136	}
1137
1138	#[tokio::test]
1139	async fn test_with_root_basic() {
1140		let origin = Origin::random().produce();
1141		let broadcast = Broadcast::new().produce();
1142
1143		// Create a producer with root "/foo"
1144		let foo_producer = origin.with_root("foo").expect("should create root");
1145		assert_eq!(foo_producer.root().as_str(), "foo");
1146
1147		let mut consumer = origin.consume();
1148
1149		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1150		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1151		// The original consumer should see the full path
1152		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1153
1154		// A consumer created from the rooted producer should see the stripped path
1155		let mut foo_consumer = foo_producer.consume();
1156		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1157	}
1158
1159	#[tokio::test]
1160	async fn test_with_root_nested() {
1161		let origin = Origin::random().produce();
1162		let broadcast = Broadcast::new().produce();
1163
1164		// Create nested roots
1165		let foo_producer = origin.with_root("foo").expect("should create foo root");
1166		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1167		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1168
1169		let mut consumer = origin.consume();
1170
1171		// Publishing to "baz" should actually publish to "foo/bar/baz"
1172		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1173		// The original consumer sees the full path
1174		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1175
1176		// Consumer from foo_bar_producer sees just "baz"
1177		let mut foo_bar_consumer = foo_bar_producer.consume();
1178		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1179	}
1180
1181	#[tokio::test]
1182	async fn test_publish_scope_allows() {
1183		let origin = Origin::random().produce();
1184		let broadcast = Broadcast::new().produce();
1185
1186		// Create a producer that can only publish to "allowed" paths
1187		let limited_producer = origin
1188			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1189			.expect("should create limited producer");
1190
1191		// Should be able to publish to allowed paths
1192		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1193		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1194		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1195
1196		// Should not be able to publish to disallowed paths
1197		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1198		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1199		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1200	}
1201
1202	#[tokio::test]
1203	async fn test_publish_scope_empty() {
1204		let origin = Origin::random().produce();
1205
1206		// Creating a producer with no allowed paths should return None
1207		assert!(origin.scope(&[]).is_none());
1208	}
1209
1210	#[tokio::test]
1211	async fn test_consume_scope_filters() {
1212		let origin = Origin::random().produce();
1213		let broadcast1 = Broadcast::new().produce();
1214		let broadcast2 = Broadcast::new().produce();
1215		let broadcast3 = Broadcast::new().produce();
1216
1217		let mut consumer = origin.consume();
1218
1219		// Publish to different paths
1220		origin.publish_broadcast("allowed", broadcast1.consume());
1221		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1222		origin.publish_broadcast("notallowed", broadcast3.consume());
1223
1224		// Create a consumer that only sees "allowed" paths
1225		let mut limited_consumer = origin
1226			.consume()
1227			.scope(&["allowed".into()])
1228			.expect("should create limited consumer");
1229
1230		// Should only receive broadcasts under "allowed"
1231		limited_consumer.assert_next("allowed", &broadcast1.consume());
1232		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1233		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1234
1235		// Unscoped consumer should see all
1236		consumer.assert_next("allowed", &broadcast1.consume());
1237		consumer.assert_next("allowed/nested", &broadcast2.consume());
1238		consumer.assert_next("notallowed", &broadcast3.consume());
1239	}
1240
1241	#[tokio::test]
1242	async fn test_consume_scope_multiple_prefixes() {
1243		let origin = Origin::random().produce();
1244		let broadcast1 = Broadcast::new().produce();
1245		let broadcast2 = Broadcast::new().produce();
1246		let broadcast3 = Broadcast::new().produce();
1247
1248		origin.publish_broadcast("foo/test", broadcast1.consume());
1249		origin.publish_broadcast("bar/test", broadcast2.consume());
1250		origin.publish_broadcast("baz/test", broadcast3.consume());
1251
1252		// Consumer that only sees "foo" and "bar" paths
1253		let mut limited_consumer = origin
1254			.consume()
1255			.scope(&["foo".into(), "bar".into()])
1256			.expect("should create limited consumer");
1257
1258		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1259		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1260		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1261		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1262	}
1263
1264	#[tokio::test]
1265	async fn test_with_root_and_publish_scope() {
1266		let origin = Origin::random().produce();
1267		let broadcast = Broadcast::new().produce();
1268
1269		// User connects to /foo root
1270		let foo_producer = origin.with_root("foo").expect("should create foo root");
1271
1272		// Limit them to publish only to "bar" and "goop/pee" within /foo
1273		let limited_producer = foo_producer
1274			.scope(&["bar".into(), "goop/pee".into()])
1275			.expect("should create limited producer");
1276
1277		let mut consumer = origin.consume();
1278
1279		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1280		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1281		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1282		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1283		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1284
1285		// Should not be able to publish outside allowed paths
1286		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1287		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1288		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1289
1290		// Original consumer sees full paths
1291		consumer.assert_next("foo/bar", &broadcast.consume());
1292		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1293		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1294		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1295	}
1296
1297	#[tokio::test]
1298	async fn test_with_root_and_consume_scope() {
1299		let origin = Origin::random().produce();
1300		let broadcast1 = Broadcast::new().produce();
1301		let broadcast2 = Broadcast::new().produce();
1302		let broadcast3 = Broadcast::new().produce();
1303
1304		// Publish broadcasts
1305		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1306		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1307		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1308
1309		// User connects to /foo root
1310		let foo_producer = origin.with_root("foo").expect("should create foo root");
1311
1312		// Create consumer limited to "bar" and "goop/pee" within /foo
1313		let mut limited_consumer = foo_producer
1314			.consume()
1315			.scope(&["bar".into(), "goop/pee".into()])
1316			.expect("should create limited consumer");
1317
1318		// Should only see allowed paths (without foo prefix)
1319		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1320		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1321		limited_consumer.assert_next_wait(); // Should not see "other/test"
1322	}
1323
1324	#[tokio::test]
1325	async fn test_with_root_unauthorized() {
1326		let origin = Origin::random().produce();
1327
1328		// First limit the producer to specific paths
1329		let limited_producer = origin
1330			.scope(&["allowed".into()])
1331			.expect("should create limited producer");
1332
1333		// Trying to create a root outside allowed paths should fail
1334		assert!(limited_producer.with_root("notallowed").is_none());
1335
1336		// But creating a root within allowed paths should work
1337		let allowed_root = limited_producer
1338			.with_root("allowed")
1339			.expect("should create allowed root");
1340		assert_eq!(allowed_root.root().as_str(), "allowed");
1341	}
1342
1343	#[tokio::test]
1344	async fn test_wildcard_permission() {
1345		let origin = Origin::random().produce();
1346		let broadcast = Broadcast::new().produce();
1347
1348		// Producer with root access (empty string means wildcard)
1349		let root_producer = origin.clone();
1350
1351		// Should be able to publish anywhere
1352		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1353		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1354
1355		// Can create any root
1356		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1357		assert_eq!(foo_producer.root().as_str(), "foo");
1358	}
1359
1360	#[tokio::test]
1361	async fn test_consume_broadcast_with_permissions() {
1362		let origin = Origin::random().produce();
1363		let broadcast1 = Broadcast::new().produce();
1364		let broadcast2 = Broadcast::new().produce();
1365
1366		origin.publish_broadcast("allowed/test", broadcast1.consume());
1367		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1368
1369		// Create limited consumer
1370		let limited_consumer = origin
1371			.consume()
1372			.scope(&["allowed".into()])
1373			.expect("should create limited consumer");
1374
1375		// Should be able to get allowed broadcast
1376		let result = limited_consumer.get_broadcast("allowed/test");
1377		assert!(result.is_some());
1378		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1379
1380		// Should not be able to get disallowed broadcast
1381		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1382
1383		// Original consumer can get both
1384		let consumer = origin.consume();
1385		assert!(consumer.get_broadcast("allowed/test").is_some());
1386		assert!(consumer.get_broadcast("notallowed/test").is_some());
1387	}
1388
1389	#[tokio::test]
1390	async fn test_nested_paths_with_permissions() {
1391		let origin = Origin::random().produce();
1392		let broadcast = Broadcast::new().produce();
1393
1394		// Create producer limited to "a/b/c"
1395		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1396
1397		// Should be able to publish to exact path and nested paths
1398		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1399		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1400		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1401
1402		// Should not be able to publish to parent or sibling paths
1403		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1404		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1405		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1406	}
1407
1408	#[tokio::test]
1409	async fn test_multiple_consumers_with_different_permissions() {
1410		let origin = Origin::random().produce();
1411		let broadcast1 = Broadcast::new().produce();
1412		let broadcast2 = Broadcast::new().produce();
1413		let broadcast3 = Broadcast::new().produce();
1414
1415		// Publish to different paths
1416		origin.publish_broadcast("foo/test", broadcast1.consume());
1417		origin.publish_broadcast("bar/test", broadcast2.consume());
1418		origin.publish_broadcast("baz/test", broadcast3.consume());
1419
1420		// Create consumers with different permissions
1421		let mut foo_consumer = origin
1422			.consume()
1423			.scope(&["foo".into()])
1424			.expect("should create foo consumer");
1425
1426		let mut bar_consumer = origin
1427			.consume()
1428			.scope(&["bar".into()])
1429			.expect("should create bar consumer");
1430
1431		let mut foobar_consumer = origin
1432			.consume()
1433			.scope(&["foo".into(), "bar".into()])
1434			.expect("should create foobar consumer");
1435
1436		// Each consumer should only see their allowed paths
1437		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1438		foo_consumer.assert_next_wait();
1439
1440		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1441		bar_consumer.assert_next_wait();
1442
1443		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1444		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1445		foobar_consumer.assert_next_wait();
1446	}
1447
1448	#[tokio::test]
1449	async fn test_select_with_empty_prefix() {
1450		let origin = Origin::random().produce();
1451		let broadcast1 = Broadcast::new().produce();
1452		let broadcast2 = Broadcast::new().produce();
1453
1454		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1455		let demo_producer = origin.with_root("demo").expect("should create demo root");
1456		let limited_producer = demo_producer
1457			.scope(&["worm-node".into(), "foobar".into()])
1458			.expect("should create limited producer");
1459
1460		// Publish some broadcasts
1461		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1462		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1463
1464		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1465		let mut consumer = limited_producer
1466			.consume()
1467			.scope(&["".into()])
1468			.expect("should create consumer with empty prefix");
1469
1470		// Should see both broadcasts (order depends on PathPrefixes sort)
1471		let a1 = consumer.try_announced().expect("expected first announcement");
1472		let a2 = consumer.try_announced().expect("expected second announcement");
1473		consumer.assert_next_wait();
1474
1475		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1476		paths.sort();
1477		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1478	}
1479
1480	#[tokio::test]
1481	async fn test_select_narrowing_scope() {
1482		let origin = Origin::random().produce();
1483		let broadcast1 = Broadcast::new().produce();
1484		let broadcast2 = Broadcast::new().produce();
1485		let broadcast3 = Broadcast::new().produce();
1486
1487		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1488		let demo_producer = origin.with_root("demo").expect("should create demo root");
1489		let limited_producer = demo_producer
1490			.scope(&["worm-node".into(), "foobar".into()])
1491			.expect("should create limited producer");
1492
1493		// Publish broadcasts at different levels
1494		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1495		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1496		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1497
1498		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1499		let mut worm_consumer = limited_producer
1500			.consume()
1501			.scope(&["worm-node".into()])
1502			.expect("should create worm-node consumer");
1503
1504		// Should see worm-node content with paths stripped to ""
1505		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1506		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1507		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1508
1509		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1510		let mut foo_consumer = limited_producer
1511			.consume()
1512			.scope(&["worm-node/foo".into()])
1513			.expect("should create worm-node/foo consumer");
1514
1515		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1516		foo_consumer.assert_next_wait(); // Should NOT see other content
1517	}
1518
1519	#[tokio::test]
1520	async fn test_select_multiple_roots_with_empty_prefix() {
1521		let origin = Origin::random().produce();
1522		let broadcast1 = Broadcast::new().produce();
1523		let broadcast2 = Broadcast::new().produce();
1524		let broadcast3 = Broadcast::new().produce();
1525
1526		// Producer with multiple allowed roots
1527		let limited_producer = origin
1528			.scope(&["app1".into(), "app2".into(), "shared".into()])
1529			.expect("should create limited producer");
1530
1531		// Publish to each root
1532		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1533		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1534		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1535
1536		// scope with empty prefix should maintain all roots
1537		let mut consumer = limited_producer
1538			.consume()
1539			.scope(&["".into()])
1540			.expect("should create consumer with empty prefix");
1541
1542		// Should see all broadcasts from all roots
1543		consumer.assert_next("app1/data", &broadcast1.consume());
1544		consumer.assert_next("app2/config", &broadcast2.consume());
1545		consumer.assert_next("shared/resource", &broadcast3.consume());
1546		consumer.assert_next_wait();
1547	}
1548
1549	#[tokio::test]
1550	async fn test_publish_scope_with_empty_prefix() {
1551		let origin = Origin::random().produce();
1552		let broadcast = Broadcast::new().produce();
1553
1554		// Producer with specific allowed paths
1555		let limited_producer = origin
1556			.scope(&["services/api".into(), "services/web".into()])
1557			.expect("should create limited producer");
1558
1559		// scope with empty prefix should keep the same restrictions
1560		let same_producer = limited_producer
1561			.scope(&["".into()])
1562			.expect("should create producer with empty prefix");
1563
1564		// Should still have the same publishing restrictions
1565		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1566		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1567		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1568		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1569	}
1570
1571	#[tokio::test]
1572	async fn test_select_narrowing_to_deeper_path() {
1573		let origin = Origin::random().produce();
1574		let broadcast1 = Broadcast::new().produce();
1575		let broadcast2 = Broadcast::new().produce();
1576		let broadcast3 = Broadcast::new().produce();
1577
1578		// Producer with broad permission
1579		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1580
1581		// Publish at various depths
1582		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1583		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1584		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1585
1586		// Narrow down to team2 only
1587		let mut team2_consumer = limited_producer
1588			.consume()
1589			.scope(&["org/team2".into()])
1590			.expect("should create team2 consumer");
1591
1592		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1593		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1594
1595		// Further narrow down to team1/project1
1596		let mut project1_consumer = limited_producer
1597			.consume()
1598			.scope(&["org/team1/project1".into()])
1599			.expect("should create project1 consumer");
1600
1601		// Should only see project1 content at root
1602		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1603		project1_consumer.assert_next_wait();
1604	}
1605
1606	#[tokio::test]
1607	async fn test_select_with_non_matching_prefix() {
1608		let origin = Origin::random().produce();
1609
1610		// Producer with specific allowed paths
1611		let limited_producer = origin
1612			.scope(&["allowed/path".into()])
1613			.expect("should create limited producer");
1614
1615		// Trying to scope with a completely different prefix should return None
1616		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1617
1618		// Similarly for scope
1619		assert!(limited_producer.scope(&["other/path".into()]).is_none());
1620	}
1621
1622	// Regression test for https://github.com/moq-dev/moq/issues/910
1623	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1624	#[tokio::test]
1625	async fn test_with_root_trailing_slash_consumer() {
1626		let origin = Origin::random().produce();
1627
1628		// Use an owned String so the trailing slash is NOT normalized away.
1629		let prefix = "some_prefix/".to_string();
1630		let mut consumer = origin.consume().with_root(prefix).unwrap();
1631
1632		let b = origin.create_broadcast("some_prefix/test").unwrap();
1633		consumer.assert_next("test", &b.consume());
1634	}
1635
1636	// Same issue but for the producer side of with_root
1637	#[tokio::test]
1638	async fn test_with_root_trailing_slash_producer() {
1639		let origin = Origin::random().produce();
1640
1641		// Use an owned String so the trailing slash is NOT normalized away.
1642		let prefix = "some_prefix/".to_string();
1643		let rooted = origin.with_root(prefix).unwrap();
1644
1645		let b = rooted.create_broadcast("test").unwrap();
1646
1647		let mut consumer = rooted.consume();
1648		consumer.assert_next("test", &b.consume());
1649	}
1650
1651	// Verify unannounce also doesn't panic with trailing slash
1652	#[tokio::test]
1653	async fn test_with_root_trailing_slash_unannounce() {
1654		tokio::time::pause();
1655
1656		let origin = Origin::random().produce();
1657
1658		let prefix = "some_prefix/".to_string();
1659		let mut consumer = origin.consume().with_root(prefix).unwrap();
1660
1661		let b = origin.create_broadcast("some_prefix/test").unwrap();
1662		consumer.assert_next("test", &b.consume());
1663
1664		// Drop the broadcast producer to trigger unannounce
1665		drop(b);
1666		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1667
1668		// unannounce also calls strip_prefix(&self.root).unwrap()
1669		consumer.assert_next_none("test");
1670	}
1671
1672	#[tokio::test]
1673	async fn test_select_maintains_access_with_wider_prefix() {
1674		let origin = Origin::random().produce();
1675		let broadcast1 = Broadcast::new().produce();
1676		let broadcast2 = Broadcast::new().produce();
1677
1678		// Setup: user with root "demo" allowed to subscribe to specific paths
1679		let demo_producer = origin.with_root("demo").expect("should create demo root");
1680		let user_producer = demo_producer
1681			.scope(&["worm-node".into(), "foobar".into()])
1682			.expect("should create user producer");
1683
1684		// Publish some data
1685		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1686		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1687
1688		// Key test: scope with "" should maintain access to allowed roots
1689		let mut consumer = user_producer
1690			.consume()
1691			.scope(&["".into()])
1692			.expect("scope with empty prefix should not fail when user has specific permissions");
1693
1694		// Should still receive broadcasts from allowed paths (order not guaranteed)
1695		let a1 = consumer.try_announced().expect("expected first announcement");
1696		let a2 = consumer.try_announced().expect("expected second announcement");
1697		consumer.assert_next_wait();
1698
1699		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1700		paths.sort();
1701		assert_eq!(paths, ["foobar", "worm-node/data"]);
1702
1703		// Also test that we can still narrow the scope
1704		let mut narrow_consumer = user_producer
1705			.consume()
1706			.scope(&["worm-node".into()])
1707			.expect("should be able to narrow scope to worm-node");
1708
1709		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1710		narrow_consumer.assert_next_wait(); // Should not see foobar
1711	}
1712
1713	#[tokio::test]
1714	async fn test_duplicate_prefixes_deduped() {
1715		let origin = Origin::random().produce();
1716		let broadcast = Broadcast::new().produce();
1717
1718		// scope with duplicate prefixes should work (deduped internally)
1719		let producer = origin
1720			.scope(&["demo".into(), "demo".into()])
1721			.expect("should create producer");
1722
1723		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1724
1725		let mut consumer = producer.consume();
1726		consumer.assert_next("demo/stream", &broadcast.consume());
1727		consumer.assert_next_wait();
1728	}
1729
1730	#[tokio::test]
1731	async fn test_overlapping_prefixes_deduped() {
1732		let origin = Origin::random().produce();
1733		let broadcast = Broadcast::new().produce();
1734
1735		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1736		let producer = origin
1737			.scope(&["demo".into(), "demo/foo".into()])
1738			.expect("should create producer");
1739
1740		// Can still publish under "demo/bar" since "demo" covers everything
1741		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1742
1743		let mut consumer = producer.consume();
1744		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1745		consumer.assert_next_wait();
1746	}
1747
1748	#[tokio::test]
1749	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1750		let origin = Origin::random().produce();
1751		let broadcast = Broadcast::new().produce();
1752
1753		// Both "demo" and "demo/foo" are requested — should only have one node
1754		let producer = origin
1755			.scope(&["demo".into(), "demo/foo".into()])
1756			.expect("should create producer");
1757
1758		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1759
1760		let mut consumer = producer.consume();
1761		// Should only get ONE announcement (not two from overlapping nodes)
1762		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1763		consumer.assert_next_wait();
1764	}
1765
1766	#[tokio::test]
1767	async fn test_allowed_returns_deduped_prefixes() {
1768		let origin = Origin::random().produce();
1769
1770		let producer = origin
1771			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1772			.expect("should create producer");
1773
1774		let allowed: Vec<_> = producer.allowed().collect();
1775		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1776	}
1777
1778	#[tokio::test]
1779	async fn test_announced_broadcast_already_announced() {
1780		let origin = Origin::random().produce();
1781		let broadcast = Broadcast::new().produce();
1782
1783		origin.publish_broadcast("test", broadcast.consume());
1784
1785		let consumer = origin.consume();
1786		let result = consumer.announced_broadcast("test").await.expect("should find it");
1787		assert!(result.is_clone(&broadcast.consume()));
1788	}
1789
1790	#[tokio::test]
1791	async fn test_announced_broadcast_delayed() {
1792		tokio::time::pause();
1793
1794		let origin = Origin::random().produce();
1795		let broadcast = Broadcast::new().produce();
1796
1797		let consumer = origin.consume();
1798
1799		// Start waiting before it's announced.
1800		let wait = tokio::spawn({
1801			let consumer = consumer.clone();
1802			async move { consumer.announced_broadcast("test").await }
1803		});
1804
1805		// Give the spawned task a chance to subscribe.
1806		tokio::task::yield_now().await;
1807
1808		origin.publish_broadcast("test", broadcast.consume());
1809
1810		let result = wait.await.unwrap().expect("should find it");
1811		assert!(result.is_clone(&broadcast.consume()));
1812	}
1813
1814	#[tokio::test]
1815	async fn test_announced_broadcast_ignores_unrelated_paths() {
1816		tokio::time::pause();
1817
1818		let origin = Origin::random().produce();
1819		let other = Broadcast::new().produce();
1820		let target = Broadcast::new().produce();
1821
1822		let consumer = origin.consume();
1823
1824		let wait = tokio::spawn({
1825			let consumer = consumer.clone();
1826			async move { consumer.announced_broadcast("target").await }
1827		});
1828
1829		tokio::task::yield_now().await;
1830
1831		// Publish an unrelated broadcast first — announced_broadcast should skip it.
1832		origin.publish_broadcast("other", other.consume());
1833		tokio::task::yield_now().await;
1834		assert!(!wait.is_finished(), "must not resolve on unrelated path");
1835
1836		origin.publish_broadcast("target", target.consume());
1837		let result = wait.await.unwrap().expect("should find target");
1838		assert!(result.is_clone(&target.consume()));
1839	}
1840
1841	#[tokio::test]
1842	async fn test_announced_broadcast_skips_nested_paths() {
1843		tokio::time::pause();
1844
1845		let origin = Origin::random().produce();
1846		let nested = Broadcast::new().produce();
1847		let exact = Broadcast::new().produce();
1848
1849		let consumer = origin.consume();
1850
1851		let wait = tokio::spawn({
1852			let consumer = consumer.clone();
1853			async move { consumer.announced_broadcast("foo").await }
1854		});
1855
1856		tokio::task::yield_now().await;
1857
1858		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
1859		origin.publish_broadcast("foo/bar", nested.consume());
1860		tokio::task::yield_now().await;
1861		assert!(!wait.is_finished(), "must not resolve on a nested path");
1862
1863		origin.publish_broadcast("foo", exact.consume());
1864		let result = wait.await.unwrap().expect("should find foo exactly");
1865		assert!(result.is_clone(&exact.consume()));
1866	}
1867
1868	#[tokio::test]
1869	async fn test_announced_broadcast_disallowed() {
1870		let origin = Origin::random().produce();
1871		let limited = origin
1872			.consume()
1873			.scope(&["allowed".into()])
1874			.expect("should create limited");
1875
1876		// Path is outside allowed prefixes — should return None immediately.
1877		assert!(limited.announced_broadcast("notallowed").await.is_none());
1878	}
1879
1880	#[tokio::test]
1881	async fn test_announced_broadcast_scope_too_narrow() {
1882		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
1883		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
1884		let origin = Origin::random().produce();
1885		let limited = origin
1886			.consume()
1887			.scope(&["foo/specific".into()])
1888			.expect("should create limited");
1889
1890		// now_or_never so we fail fast instead of hanging if the guard regresses.
1891		let result = limited
1892			.announced_broadcast("foo")
1893			.now_or_never()
1894			.expect("must not block");
1895		assert!(result.is_none());
1896	}
1897}