moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() {
125			next
126		} else {
127			next.lock().leaf(&rest)
128		}
129	}
130
131	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
132		match self.nested.get(dir) {
133			Some(next) => next.clone(),
134			None => {
135				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
136				self.nested.insert(dir.to_string(), next.clone());
137				next
138			}
139		}
140	}
141
142	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
143		let full = full.as_path();
144		let rest = relative.as_path();
145
146		// If the path has a directory component, then publish it to the nested node.
147		if let Some((dir, relative)) = rest.next_part() {
148			// Not using entry to avoid allocating a string most of the time.
149			self.entry(dir).lock().publish(&full, broadcast, &relative);
150		} else if let Some(existing) = &mut self.broadcast {
151			// This node is a leaf with an existing broadcast.
152			let old = existing.active.clone();
153			existing.active = broadcast.clone();
154			existing.backup.push(old);
155
156			self.notify.lock().reannounce(full, broadcast);
157		} else {
158			// This node is a leaf with no existing broadcast.
159			self.broadcast = Some(OriginBroadcast {
160				path: full.to_owned(),
161				active: broadcast.clone(),
162				backup: Vec::new(),
163			});
164			self.notify.lock().announce(full, broadcast);
165		}
166	}
167
168	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
169		self.consume_initial(&mut notify);
170		self.notify.lock().consumers.insert(id, notify);
171	}
172
173	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
174		if let Some(broadcast) = &self.broadcast {
175			notify.announce(&broadcast.path, broadcast.active.clone());
176		}
177
178		// Recursively subscribe to all nested nodes.
179		for nested in self.nested.values() {
180			nested.lock().consume_initial(notify);
181		}
182	}
183
184	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
185		let rest = rest.as_path();
186
187		if let Some((dir, rest)) = rest.next_part() {
188			let node = self.nested.get(dir)?.lock();
189			node.consume_broadcast(&rest)
190		} else {
191			self.broadcast.as_ref().map(|b| b.active.clone())
192		}
193	}
194
195	fn unconsume(&mut self, id: ConsumerId) {
196		self.notify.lock().consumers.remove(&id).expect("consumer not found");
197		if self.is_empty() {
198			//tracing::warn!("TODO: empty node; memory leak");
199			// This happens when consuming a path that is not being broadcasted.
200		}
201	}
202
203	// Returns true if the broadcast should be unannounced.
204	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
205		let full = full.as_path();
206		let relative = relative.as_path();
207
208		if let Some((dir, relative)) = relative.next_part() {
209			let nested = self.entry(dir);
210			let mut locked = nested.lock();
211			locked.remove(&full, broadcast, &relative);
212
213			if locked.is_empty() {
214				drop(locked);
215				self.nested.remove(dir);
216			}
217		} else {
218			let entry = match &mut self.broadcast {
219				Some(existing) => existing,
220				None => return,
221			};
222
223			// See if we can remove the broadcast from the backup list.
224			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
225			if let Some(pos) = pos {
226				entry.backup.remove(pos);
227				// Nothing else to do
228				return;
229			}
230
231			// Okay so it must be the active broadcast or else we fucked up.
232			assert!(entry.active.is_clone(&broadcast));
233
234			// If there's a backup broadcast, then announce it.
235			if let Some(active) = entry.backup.pop() {
236				entry.active = active;
237				self.notify.lock().reannounce(full, &entry.active);
238			} else {
239				// No more backups, so remove the entry.
240				self.broadcast = None;
241				self.notify.lock().unannounce(full);
242			}
243		}
244	}
245
246	fn is_empty(&self) -> bool {
247		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
248	}
249}
250
251#[derive(Clone)]
252struct OriginNodes {
253	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
254}
255
256impl OriginNodes {
257	// Returns nested roots that match the prefixes.
258	// TODO enforce that prefixes can't overlap.
259	pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
260		let mut roots = Vec::new();
261
262		for (root, state) in &self.nodes {
263			for prefix in prefixes {
264				if root.has_prefix(prefix) {
265					// Keep the existing node if we're allowed to access it.
266					roots.push((root.to_owned(), state.clone()));
267					continue;
268				}
269
270				if let Some(suffix) = prefix.strip_prefix(root) {
271					// If the requested prefix is larger than the allowed prefix, then we further scope it.
272					let nested = state.lock().leaf(&suffix);
273					roots.push((prefix.to_owned(), nested));
274				}
275			}
276		}
277
278		if roots.is_empty() {
279			None
280		} else {
281			Some(Self { nodes: roots })
282		}
283	}
284
285	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
286		let new_root = new_root.as_path();
287		let mut roots = Vec::new();
288
289		if new_root.is_empty() {
290			return Some(self.clone());
291		}
292
293		for (root, state) in &self.nodes {
294			if let Some(suffix) = root.strip_prefix(&new_root) {
295				// If the old root is longer than the new root, shorten the keys.
296				roots.push((suffix.to_owned(), state.clone()));
297			} else if let Some(suffix) = new_root.strip_prefix(root) {
298				// If the new root is longer than the old root, add a new root.
299				// NOTE: suffix can't be empty
300				let nested = state.lock().leaf(&suffix);
301				roots.push(("".into(), nested));
302			}
303		}
304
305		if roots.is_empty() {
306			None
307		} else {
308			Some(Self { nodes: roots })
309		}
310	}
311
312	// Returns the root that has this prefix.
313	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
314		let path = path.as_path();
315
316		for (root, state) in &self.nodes {
317			if let Some(suffix) = path.strip_prefix(root) {
318				return Some((state.clone(), suffix.to_owned()));
319			}
320		}
321
322		None
323	}
324}
325
326impl Default for OriginNodes {
327	fn default() -> Self {
328		Self {
329			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
330		}
331	}
332}
333
334/// A broadcast path and its associated consumer, or None if closed.
335pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
336
337pub struct Origin {}
338
339impl Origin {
340	pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
341		let producer = OriginProducer::default();
342		let consumer = producer.consume();
343		Produce { producer, consumer }
344	}
345}
346
347/// Announces broadcasts to consumers over the network.
348#[derive(Clone, Default)]
349pub struct OriginProducer {
350	// The roots of the tree that we are allowed to publish.
351	// A path of "" means we can publish anything.
352	nodes: OriginNodes,
353
354	/// The prefix that is automatically stripped from all paths.
355	root: PathOwned,
356}
357
358impl OriginProducer {
359	/// Publish a broadcast, announcing it to all consumers.
360	///
361	/// The broadcast will be unannounced when it is closed.
362	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
363	/// If the old broadcast is closed before the new one, then nothing will happen.
364	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
365	///
366	/// Returns false if the broadcast is not allowed to be published.
367	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
368		let path = path.as_path();
369
370		let (root, rest) = match self.nodes.get(&path) {
371			Some(root) => root,
372			None => return false,
373		};
374
375		let full = self.root.join(&path);
376
377		root.lock().publish(&full, &broadcast, &rest);
378		let root = root.clone();
379
380		web_async::spawn(async move {
381			broadcast.closed().await;
382			root.lock().remove(&full, broadcast, &rest);
383		});
384
385		true
386	}
387
388	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
389	///
390	/// Returns None if there are no legal prefixes.
391	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
392		Some(OriginProducer {
393			nodes: self.nodes.select(prefixes)?,
394			root: self.root.clone(),
395		})
396	}
397
398	/// Subscribe to all announced broadcasts.
399	pub fn consume(&self) -> OriginConsumer {
400		OriginConsumer::new(self.root.clone(), self.nodes.clone())
401	}
402
403	/// Subscribe to all announced broadcasts matching the prefix.
404	///
405	/// TODO: Don't use overlapping prefixes or duplicates will be published.
406	///
407	/// Returns None if there are no legal prefixes.
408	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
409		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
410	}
411
412	/// Returns a new OriginProducer that automatically strips out the provided prefix.
413	///
414	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
415	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
416		let prefix = prefix.as_path();
417
418		Some(Self {
419			root: self.root.join(&prefix).to_owned(),
420			nodes: self.nodes.root(&prefix)?,
421		})
422	}
423
424	/// Returns the root that is automatically stripped from all paths.
425	pub fn root(&self) -> &Path<'_> {
426		&self.root
427	}
428
429	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
430		self.nodes.nodes.iter().map(|(root, _)| root)
431	}
432
433	/// Converts a relative path to an absolute path.
434	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
435		self.root.join(path)
436	}
437}
438
439/// Consumes announced broadcasts matching against an optional prefix.
440///
441/// NOTE: Clone is expensive, try to avoid it.
442pub struct OriginConsumer {
443	id: ConsumerId,
444	nodes: OriginNodes,
445	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
446
447	/// A prefix that is automatically stripped from all paths.
448	root: PathOwned,
449}
450
451impl OriginConsumer {
452	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
453		let (tx, rx) = mpsc::unbounded_channel();
454
455		let id = ConsumerId::new();
456
457		for (_, state) in &nodes.nodes {
458			let notify = OriginConsumerNotify {
459				root: root.clone(),
460				tx: tx.clone(),
461			};
462			state.lock().consume(id, notify);
463		}
464
465		Self {
466			id,
467			nodes,
468			updates: rx,
469			root,
470		}
471	}
472
473	/// Returns the next (un)announced broadcast and the absolute path.
474	///
475	/// The broadcast will only be announced if it was previously unannounced.
476	/// The same path won't be announced/unannounced twice, instead it will toggle.
477	/// Returns None if the consumer is closed.
478	///
479	/// Note: The returned path is absolute and will always match this consumer's prefix.
480	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
481		self.updates.recv().await
482	}
483
484	/// Returns the next (un)announced broadcast and the absolute path without blocking.
485	///
486	/// Returns None if there is no update available; NOT because the consumer is closed.
487	/// You have to use `is_closed` to check if the consumer is closed.
488	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
489		self.updates.try_recv().ok()
490	}
491
492	pub fn consume(&self) -> Self {
493		self.clone()
494	}
495
496	/// Get a specific broadcast by path.
497	///
498	/// TODO This should include announcement support.
499	///
500	/// Returns None if the path hasn't been announced yet.
501	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
502		let path = path.as_path();
503		let (root, rest) = self.nodes.get(&path)?;
504		let state = root.lock();
505		state.consume_broadcast(&rest)
506	}
507
508	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
509	///
510	/// Returns None if there are no legal prefixes (would always return None).
511	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
512		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
513	}
514
515	/// Returns the prefix that is automatically stripped from all paths.
516	pub fn root(&self) -> &Path<'_> {
517		&self.root
518	}
519
520	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
521		self.nodes.nodes.iter().map(|(root, _)| root)
522	}
523
524	/// Converts a relative path to an absolute path.
525	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
526		self.root.join(path)
527	}
528}
529
530impl Drop for OriginConsumer {
531	fn drop(&mut self) {
532		for (_, root) in &self.nodes.nodes {
533			root.lock().unconsume(self.id);
534		}
535	}
536}
537
538impl Clone for OriginConsumer {
539	fn clone(&self) -> Self {
540		OriginConsumer::new(self.root.clone(), self.nodes.clone())
541	}
542}
543
544#[cfg(test)]
545use futures::FutureExt;
546
547#[cfg(test)]
548impl OriginConsumer {
549	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
550		let expected = expected.as_path();
551		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
552		assert_eq!(path, expected, "wrong path");
553		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
554	}
555
556	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
557		let expected = expected.as_path();
558		let (path, active) = self.try_announced().expect("no next");
559		assert_eq!(path, expected, "wrong path");
560		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
561	}
562
563	pub fn assert_next_none(&mut self, expected: impl AsPath) {
564		let expected = expected.as_path();
565		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
566		assert_eq!(path, expected, "wrong path");
567		assert!(active.is_none(), "should be unannounced");
568	}
569
570	pub fn assert_next_wait(&mut self) {
571		if let Some(res) = self.announced().now_or_never() {
572			panic!("next should block: got {:?}", res.map(|(path, _)| path));
573		}
574	}
575
576	/*
577	pub fn assert_next_closed(&mut self) {
578		assert!(
579			self.announced().now_or_never().expect("next blocked").is_none(),
580			"next should be closed"
581		);
582	}
583	*/
584}
585
586#[cfg(test)]
587mod tests {
588	use crate::Broadcast;
589
590	use super::*;
591
592	#[tokio::test]
593	async fn test_announce() {
594		let origin = Origin::produce();
595		let broadcast1 = Broadcast::produce();
596		let broadcast2 = Broadcast::produce();
597
598		let mut consumer1 = origin.consumer;
599		// Make a new consumer that should get it.
600		consumer1.assert_next_wait();
601
602		// Publish the first broadcast.
603		origin.producer.publish_broadcast("test1", broadcast1.consumer);
604
605		consumer1.assert_next("test1", &broadcast1.producer.consume());
606		consumer1.assert_next_wait();
607
608		// Make a new consumer that should get the existing broadcast.
609		// But we don't consume it yet.
610		let mut consumer2 = origin.producer.consume();
611
612		// Publish the second broadcast.
613		origin.producer.publish_broadcast("test2", broadcast2.consumer);
614
615		consumer1.assert_next("test2", &broadcast2.producer.consume());
616		consumer1.assert_next_wait();
617
618		consumer2.assert_next("test1", &broadcast1.producer.consume());
619		consumer2.assert_next("test2", &broadcast2.producer.consume());
620		consumer2.assert_next_wait();
621
622		// Close the first broadcast.
623		drop(broadcast1.producer);
624
625		// Wait for the async task to run.
626		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
627
628		// All consumers should get a None now.
629		consumer1.assert_next_none("test1");
630		consumer2.assert_next_none("test1");
631		consumer1.assert_next_wait();
632		consumer2.assert_next_wait();
633
634		// And a new consumer only gets the last broadcast.
635		let mut consumer3 = origin.producer.consume();
636		consumer3.assert_next("test2", &broadcast2.producer.consume());
637		consumer3.assert_next_wait();
638
639		// Close the other producer and make sure it cleans up
640		drop(broadcast2.producer);
641
642		// Wait for the async task to run.
643		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
644
645		consumer1.assert_next_none("test2");
646		consumer2.assert_next_none("test2");
647		consumer3.assert_next_none("test2");
648
649		/* TODO close the origin consumer when the producer is dropped
650		consumer1.assert_next_closed();
651		consumer2.assert_next_closed();
652		consumer3.assert_next_closed();
653		*/
654	}
655
656	#[tokio::test]
657	async fn test_duplicate() {
658		let mut origin = Origin::produce();
659
660		let broadcast1 = Broadcast::produce();
661		let broadcast2 = Broadcast::produce();
662		let broadcast3 = Broadcast::produce();
663
664		let consumer1 = broadcast1.consumer;
665		let consumer2 = broadcast2.consumer;
666		let consumer3 = broadcast3.consumer;
667
668		origin.producer.publish_broadcast("test", consumer1.clone());
669		origin.producer.publish_broadcast("test", consumer2.clone());
670		origin.producer.publish_broadcast("test", consumer3.clone());
671
672		assert!(origin.consumer.consume_broadcast("test").is_some());
673
674		origin.consumer.assert_next("test", &consumer1);
675		origin.consumer.assert_next_none("test");
676		origin.consumer.assert_next("test", &consumer2);
677		origin.consumer.assert_next_none("test");
678		origin.consumer.assert_next("test", &consumer3);
679
680		// Drop the backup, nothing should change.
681		drop(broadcast2.producer);
682
683		// Wait for the async task to run.
684		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
685
686		assert!(origin.consumer.consume_broadcast("test").is_some());
687		origin.consumer.assert_next_wait();
688
689		// Drop the active, we should reannounce.
690		drop(broadcast3.producer);
691
692		// Wait for the async task to run.
693		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
694
695		assert!(origin.consumer.consume_broadcast("test").is_some());
696		origin.consumer.assert_next_none("test");
697		origin.consumer.assert_next("test", &consumer1);
698
699		// Drop the final broadcast, we should unannounce.
700		drop(broadcast1.producer);
701
702		// Wait for the async task to run.
703		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
704		assert!(origin.consumer.consume_broadcast("test").is_none());
705
706		origin.consumer.assert_next_none("test");
707		origin.consumer.assert_next_wait();
708	}
709
710	#[tokio::test]
711	async fn test_duplicate_reverse() {
712		let origin = Origin::produce();
713		let broadcast1 = Broadcast::produce();
714		let broadcast2 = Broadcast::produce();
715
716		origin.producer.publish_broadcast("test", broadcast1.consumer);
717		origin.producer.publish_broadcast("test", broadcast2.consumer);
718		assert!(origin.consumer.consume_broadcast("test").is_some());
719
720		// This is harder, dropping the new broadcast first.
721		drop(broadcast2.producer);
722
723		// Wait for the cleanup async task to run.
724		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
725		assert!(origin.consumer.consume_broadcast("test").is_some());
726
727		drop(broadcast1.producer);
728
729		// Wait for the cleanup async task to run.
730		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
731		assert!(origin.consumer.consume_broadcast("test").is_none());
732	}
733
734	#[tokio::test]
735	async fn test_double_publish() {
736		let origin = Origin::produce();
737		let broadcast = Broadcast::produce();
738
739		// Ensure it doesn't crash.
740		origin.producer.publish_broadcast("test", broadcast.producer.consume());
741		origin.producer.publish_broadcast("test", broadcast.producer.consume());
742
743		assert!(origin.consumer.consume_broadcast("test").is_some());
744
745		drop(broadcast.producer);
746
747		// Wait for the async task to run.
748		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
749		assert!(origin.consumer.consume_broadcast("test").is_none());
750	}
751	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
752	#[tokio::test]
753	#[should_panic]
754	async fn test_128() {
755		let mut origin = Origin::produce();
756		let broadcast = Broadcast::produce();
757
758		for i in 0..256 {
759			origin
760				.producer
761				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
762		}
763
764		for i in 0..256 {
765			origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
766		}
767	}
768
769	#[tokio::test]
770	async fn test_128_fix() {
771		let mut origin = Origin::produce();
772		let broadcast = Broadcast::produce();
773
774		for i in 0..256 {
775			origin
776				.producer
777				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
778		}
779
780		for i in 0..256 {
781			// try_next does not have the same issue because it's synchronous.
782			origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
783		}
784	}
785
786	#[tokio::test]
787	async fn test_with_root_basic() {
788		let mut origin = Origin::produce();
789		let broadcast = Broadcast::produce();
790
791		// Create a producer with root "/foo"
792		let foo_producer = origin.producer.with_root("foo").expect("should create root");
793		assert_eq!(foo_producer.root().as_str(), "foo");
794
795		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
796		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
797
798		// The original consumer should see the full path
799		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
800
801		// A consumer created from the rooted producer should see the stripped path
802		let mut foo_consumer = foo_producer.consume();
803		foo_consumer.assert_next("bar/baz", &broadcast.consumer);
804	}
805
806	#[tokio::test]
807	async fn test_with_root_nested() {
808		let mut origin = Origin::produce();
809		let broadcast = Broadcast::produce();
810
811		// Create nested roots
812		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
813		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
814		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
815
816		// Publishing to "baz" should actually publish to "foo/bar/baz"
817		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
818
819		// The original consumer sees the full path
820		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
821
822		// Consumer from foo_bar_producer sees just "baz"
823		let mut foo_bar_consumer = foo_bar_producer.consume();
824		foo_bar_consumer.assert_next("baz", &broadcast.consumer);
825	}
826
827	#[tokio::test]
828	async fn test_publish_only_allows() {
829		let origin = Origin::produce();
830		let broadcast = Broadcast::produce();
831
832		// Create a producer that can only publish to "allowed" paths
833		let limited_producer = origin
834			.producer
835			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
836			.expect("should create limited producer");
837
838		// Should be able to publish to allowed paths
839		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
840		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
841		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
842
843		// Should not be able to publish to disallowed paths
844		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
845		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path
846		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
847	}
848
849	#[tokio::test]
850	async fn test_publish_only_empty() {
851		let origin = Origin::produce();
852
853		// Creating a producer with no allowed paths should return None
854		assert!(origin.producer.publish_only(&[]).is_none());
855	}
856
857	#[tokio::test]
858	async fn test_consume_only_filters() {
859		let mut origin = Origin::produce();
860		let broadcast1 = Broadcast::produce();
861		let broadcast2 = Broadcast::produce();
862		let broadcast3 = Broadcast::produce();
863
864		// Publish to different paths
865		origin
866			.producer
867			.publish_broadcast("allowed", broadcast1.consumer.clone());
868		origin
869			.producer
870			.publish_broadcast("allowed/nested", broadcast2.consumer.clone());
871		origin
872			.producer
873			.publish_broadcast("notallowed", broadcast3.consumer.clone());
874
875		// Create a consumer that only sees "allowed" paths
876		let mut limited_consumer = origin
877			.consumer
878			.consume_only(&["allowed".into()])
879			.expect("should create limited consumer");
880
881		// Should only receive broadcasts under "allowed"
882		limited_consumer.assert_next("allowed", &broadcast1.consumer);
883		limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
884		limited_consumer.assert_next_wait(); // Should not see "notallowed"
885
886		// Original consumer should see all
887		origin.consumer.assert_next("allowed", &broadcast1.consumer);
888		origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
889		origin.consumer.assert_next("notallowed", &broadcast3.consumer);
890	}
891
892	#[tokio::test]
893	async fn test_consume_only_multiple_prefixes() {
894		let origin = Origin::produce();
895		let broadcast1 = Broadcast::produce();
896		let broadcast2 = Broadcast::produce();
897		let broadcast3 = Broadcast::produce();
898
899		origin
900			.producer
901			.publish_broadcast("foo/test", broadcast1.consumer.clone());
902		origin
903			.producer
904			.publish_broadcast("bar/test", broadcast2.consumer.clone());
905		origin
906			.producer
907			.publish_broadcast("baz/test", broadcast3.consumer.clone());
908
909		// Consumer that only sees "foo" and "bar" paths
910		let mut limited_consumer = origin
911			.consumer
912			.consume_only(&["foo".into(), "bar".into()])
913			.expect("should create limited consumer");
914
915		limited_consumer.assert_next("foo/test", &broadcast1.consumer);
916		limited_consumer.assert_next("bar/test", &broadcast2.consumer);
917		limited_consumer.assert_next_wait(); // Should not see "baz/test"
918	}
919
920	#[tokio::test]
921	async fn test_with_root_and_publish_only() {
922		let mut origin = Origin::produce();
923		let broadcast = Broadcast::produce();
924
925		// User connects to /foo root
926		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
927
928		// Limit them to publish only to "bar" and "goop/pee" within /foo
929		let limited_producer = foo_producer
930			.publish_only(&["bar".into(), "goop/pee".into()])
931			.expect("should create limited producer");
932
933		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
934		assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
935		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
936		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
937		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
938
939		// Should not be able to publish outside allowed paths
940		assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
941		assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed
942		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
943
944		// Original consumer sees full paths
945		origin.consumer.assert_next("foo/bar", &broadcast.consumer);
946		origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
947		origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
948		origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
949	}
950
951	#[tokio::test]
952	async fn test_with_root_and_consume_only() {
953		let origin = Origin::produce();
954		let broadcast1 = Broadcast::produce();
955		let broadcast2 = Broadcast::produce();
956		let broadcast3 = Broadcast::produce();
957
958		// Publish broadcasts
959		origin
960			.producer
961			.publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
962		origin
963			.producer
964			.publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
965		origin
966			.producer
967			.publish_broadcast("foo/other/test", broadcast3.consumer.clone());
968
969		// User connects to /foo root
970		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
971
972		// Create consumer limited to "bar" and "goop/pee" within /foo
973		let mut limited_consumer = foo_producer
974			.consume_only(&["bar".into(), "goop/pee".into()])
975			.expect("should create limited consumer");
976
977		// Should only see allowed paths (without foo prefix)
978		limited_consumer.assert_next("bar/test", &broadcast1.consumer);
979		limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
980		limited_consumer.assert_next_wait(); // Should not see "other/test"
981	}
982
983	#[tokio::test]
984	async fn test_with_root_unauthorized() {
985		let origin = Origin::produce();
986
987		// First limit the producer to specific paths
988		let limited_producer = origin
989			.producer
990			.publish_only(&["allowed".into()])
991			.expect("should create limited producer");
992
993		// Trying to create a root outside allowed paths should fail
994		assert!(limited_producer.with_root("notallowed").is_none());
995
996		// But creating a root within allowed paths should work
997		let allowed_root = limited_producer
998			.with_root("allowed")
999			.expect("should create allowed root");
1000		assert_eq!(allowed_root.root().as_str(), "allowed");
1001	}
1002
1003	#[tokio::test]
1004	async fn test_wildcard_permission() {
1005		let origin = Origin::produce();
1006		let broadcast = Broadcast::produce();
1007
1008		// Producer with root access (empty string means wildcard)
1009		let root_producer = origin.producer.clone();
1010
1011		// Should be able to publish anywhere
1012		assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1013		assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1014
1015		// Can create any root
1016		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1017		assert_eq!(foo_producer.root().as_str(), "foo");
1018	}
1019
1020	#[tokio::test]
1021	async fn test_consume_broadcast_with_permissions() {
1022		let origin = Origin::produce();
1023		let broadcast1 = Broadcast::produce();
1024		let broadcast2 = Broadcast::produce();
1025
1026		origin
1027			.producer
1028			.publish_broadcast("allowed/test", broadcast1.consumer.clone());
1029		origin
1030			.producer
1031			.publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1032
1033		// Create limited consumer
1034		let limited_consumer = origin
1035			.consumer
1036			.consume_only(&["allowed".into()])
1037			.expect("should create limited consumer");
1038
1039		// Should be able to get allowed broadcast
1040		let result = limited_consumer.consume_broadcast("allowed/test");
1041		assert!(result.is_some());
1042		assert!(result.unwrap().is_clone(&broadcast1.consumer));
1043
1044		// Should not be able to get disallowed broadcast
1045		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1046
1047		// Original consumer can get both
1048		assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1049		assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1050	}
1051
1052	#[tokio::test]
1053	async fn test_nested_paths_with_permissions() {
1054		let origin = Origin::produce();
1055		let broadcast = Broadcast::produce();
1056
1057		// Create producer limited to "a/b/c"
1058		let limited_producer = origin
1059			.producer
1060			.publish_only(&["a/b/c".into()])
1061			.expect("should create limited producer");
1062
1063		// Should be able to publish to exact path and nested paths
1064		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1065		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1066		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1067
1068		// Should not be able to publish to parent or sibling paths
1069		assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1070		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1071		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1072	}
1073
1074	#[tokio::test]
1075	async fn test_multiple_consumers_with_different_permissions() {
1076		let origin = Origin::produce();
1077		let broadcast1 = Broadcast::produce();
1078		let broadcast2 = Broadcast::produce();
1079		let broadcast3 = Broadcast::produce();
1080
1081		// Publish to different paths
1082		origin
1083			.producer
1084			.publish_broadcast("foo/test", broadcast1.consumer.clone());
1085		origin
1086			.producer
1087			.publish_broadcast("bar/test", broadcast2.consumer.clone());
1088		origin
1089			.producer
1090			.publish_broadcast("baz/test", broadcast3.consumer.clone());
1091
1092		// Create consumers with different permissions
1093		let mut foo_consumer = origin
1094			.consumer
1095			.consume_only(&["foo".into()])
1096			.expect("should create foo consumer");
1097
1098		let mut bar_consumer = origin
1099			.consumer
1100			.consume_only(&["bar".into()])
1101			.expect("should create bar consumer");
1102
1103		let mut foobar_consumer = origin
1104			.consumer
1105			.consume_only(&["foo".into(), "bar".into()])
1106			.expect("should create foobar consumer");
1107
1108		// Each consumer should only see their allowed paths
1109		foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1110		foo_consumer.assert_next_wait();
1111
1112		bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1113		bar_consumer.assert_next_wait();
1114
1115		foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1116		foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1117		foobar_consumer.assert_next_wait();
1118	}
1119
1120	#[tokio::test]
1121	async fn test_select_with_empty_prefix() {
1122		let origin = Origin::produce();
1123		let broadcast1 = Broadcast::produce();
1124		let broadcast2 = Broadcast::produce();
1125
1126		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1127		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1128		let limited_producer = demo_producer
1129			.publish_only(&["worm-node".into(), "foobar".into()])
1130			.expect("should create limited producer");
1131
1132		// Publish some broadcasts
1133		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1134		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1135
1136		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1137		let mut consumer = limited_producer
1138			.consume_only(&["".into()])
1139			.expect("should create consumer with empty prefix");
1140
1141		// Should still see both broadcasts
1142		consumer.assert_next("worm-node/test", &broadcast1.consumer);
1143		consumer.assert_next("foobar/test", &broadcast2.consumer);
1144		consumer.assert_next_wait();
1145	}
1146
1147	#[tokio::test]
1148	async fn test_select_narrowing_scope() {
1149		let origin = Origin::produce();
1150		let broadcast1 = Broadcast::produce();
1151		let broadcast2 = Broadcast::produce();
1152		let broadcast3 = Broadcast::produce();
1153
1154		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1155		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1156		let limited_producer = demo_producer
1157			.publish_only(&["worm-node".into(), "foobar".into()])
1158			.expect("should create limited producer");
1159
1160		// Publish broadcasts at different levels
1161		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1162		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1163		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1164
1165		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1166		let mut worm_consumer = limited_producer
1167			.consume_only(&["worm-node".into()])
1168			.expect("should create worm-node consumer");
1169
1170		// Should see worm-node content with paths stripped to ""
1171		worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1172		worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1173		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1174
1175		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1176		let mut foo_consumer = limited_producer
1177			.consume_only(&["worm-node/foo".into()])
1178			.expect("should create worm-node/foo consumer");
1179
1180		foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1181		foo_consumer.assert_next_wait(); // Should NOT see other content
1182	}
1183
1184	#[tokio::test]
1185	async fn test_select_multiple_roots_with_empty_prefix() {
1186		let origin = Origin::produce();
1187		let broadcast1 = Broadcast::produce();
1188		let broadcast2 = Broadcast::produce();
1189		let broadcast3 = Broadcast::produce();
1190
1191		// Producer with multiple allowed roots
1192		let limited_producer = origin
1193			.producer
1194			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1195			.expect("should create limited producer");
1196
1197		// Publish to each root
1198		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1199		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1200		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1201
1202		// consume_only with empty prefix should maintain all roots
1203		let mut consumer = limited_producer
1204			.consume_only(&["".into()])
1205			.expect("should create consumer with empty prefix");
1206
1207		// Should see all broadcasts from all roots
1208		consumer.assert_next("app1/data", &broadcast1.consumer);
1209		consumer.assert_next("app2/config", &broadcast2.consumer);
1210		consumer.assert_next("shared/resource", &broadcast3.consumer);
1211		consumer.assert_next_wait();
1212	}
1213
1214	#[tokio::test]
1215	async fn test_publish_only_with_empty_prefix() {
1216		let origin = Origin::produce();
1217		let broadcast = Broadcast::produce();
1218
1219		// Producer with specific allowed paths
1220		let limited_producer = origin
1221			.producer
1222			.publish_only(&["services/api".into(), "services/web".into()])
1223			.expect("should create limited producer");
1224
1225		// publish_only with empty prefix should keep the same restrictions
1226		let same_producer = limited_producer
1227			.publish_only(&["".into()])
1228			.expect("should create producer with empty prefix");
1229
1230		// Should still have the same publishing restrictions
1231		assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1232		assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1233		assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1234		assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1235	}
1236
1237	#[tokio::test]
1238	async fn test_select_narrowing_to_deeper_path() {
1239		let origin = Origin::produce();
1240		let broadcast1 = Broadcast::produce();
1241		let broadcast2 = Broadcast::produce();
1242		let broadcast3 = Broadcast::produce();
1243
1244		// Producer with broad permission
1245		let limited_producer = origin
1246			.producer
1247			.publish_only(&["org".into()])
1248			.expect("should create limited producer");
1249
1250		// Publish at various depths
1251		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1252		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1253		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1254
1255		// Narrow down to team1 only
1256		let mut team1_consumer = limited_producer
1257			.consume_only(&["org/team2".into()])
1258			.expect("should create team1 consumer");
1259
1260		team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1261		team1_consumer.assert_next_wait(); // Should NOT see team1 content
1262
1263		// Further narrow down to team1/project1
1264		let mut project1_consumer = limited_producer
1265			.consume_only(&["org/team1/project1".into()])
1266			.expect("should create project1 consumer");
1267
1268		// Should only see project1 content at root
1269		project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1270		project1_consumer.assert_next_wait();
1271	}
1272
1273	#[tokio::test]
1274	async fn test_select_with_non_matching_prefix() {
1275		let origin = Origin::produce();
1276
1277		// Producer with specific allowed paths
1278		let limited_producer = origin
1279			.producer
1280			.publish_only(&["allowed/path".into()])
1281			.expect("should create limited producer");
1282
1283		// Trying to consume_only with a completely different prefix should return None
1284		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1285
1286		// Similarly for publish_only
1287		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1288	}
1289
1290	#[tokio::test]
1291	async fn test_select_maintains_access_with_wider_prefix() {
1292		let origin = Origin::produce();
1293		let broadcast1 = Broadcast::produce();
1294		let broadcast2 = Broadcast::produce();
1295
1296		// Setup: user with root "demo" allowed to subscribe to specific paths
1297		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1298		let user_producer = demo_producer
1299			.publish_only(&["worm-node".into(), "foobar".into()])
1300			.expect("should create user producer");
1301
1302		// Publish some data
1303		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1304		assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1305
1306		// Key test: consume_only with "" should maintain access to allowed roots
1307		let mut consumer = user_producer
1308			.consume_only(&["".into()])
1309			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1310
1311		// Should still receive broadcasts from allowed paths
1312		consumer.assert_next("worm-node/data", &broadcast1.consumer);
1313		consumer.assert_next("foobar", &broadcast2.consumer);
1314		consumer.assert_next_wait();
1315
1316		// Also test that we can still narrow the scope
1317		let mut narrow_consumer = user_producer
1318			.consume_only(&["worm-node".into()])
1319			.expect("should be able to narrow scope to worm-node");
1320
1321		narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1322		narrow_consumer.assert_next_wait(); // Should not see foobar
1323	}
1324}