moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Path, PathOwned, Produce};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() {
125			next
126		} else {
127			next.lock().leaf(&rest)
128		}
129	}
130
131	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
132		match self.nested.get(dir) {
133			Some(next) => next.clone(),
134			None => {
135				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
136				self.nested.insert(dir.to_string(), next.clone());
137				next
138			}
139		}
140	}
141
142	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
143		let full = full.as_path();
144		let rest = relative.as_path();
145
146		// If the path has a directory component, then publish it to the nested node.
147		if let Some((dir, relative)) = rest.next_part() {
148			// Not using entry to avoid allocating a string most of the time.
149			self.entry(dir).lock().publish(&full, broadcast, &relative);
150		} else if let Some(existing) = &mut self.broadcast {
151			// This node is a leaf with an existing broadcast.
152			let old = existing.active.clone();
153			existing.active = broadcast.clone();
154			existing.backup.push(old);
155
156			self.notify.lock().reannounce(full, broadcast);
157		} else {
158			// This node is a leaf with no existing broadcast.
159			self.broadcast = Some(OriginBroadcast {
160				path: full.to_owned(),
161				active: broadcast.clone(),
162				backup: Vec::new(),
163			});
164			self.notify.lock().announce(full, broadcast);
165		}
166	}
167
168	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
169		self.consume_initial(&mut notify);
170		self.notify.lock().consumers.insert(id, notify);
171	}
172
173	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
174		if let Some(broadcast) = &self.broadcast {
175			notify.announce(&broadcast.path, broadcast.active.clone());
176		}
177
178		// Recursively subscribe to all nested nodes.
179		for nested in self.nested.values() {
180			nested.lock().consume_initial(notify);
181		}
182	}
183
184	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
185		let rest = rest.as_path();
186
187		if let Some((dir, rest)) = rest.next_part() {
188			let node = self.nested.get(dir)?.lock();
189			node.consume_broadcast(&rest)
190		} else {
191			self.broadcast.as_ref().map(|b| b.active.clone())
192		}
193	}
194
195	fn unconsume(&mut self, id: ConsumerId) {
196		self.notify.lock().consumers.remove(&id).expect("consumer not found");
197		if self.is_empty() {
198			//tracing::warn!("TODO: empty node; memory leak");
199			// This happens when consuming a path that is not being broadcasted.
200		}
201	}
202
203	// Returns true if the broadcast should be unannounced.
204	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
205		let full = full.as_path();
206		let relative = relative.as_path();
207
208		if let Some((dir, relative)) = relative.next_part() {
209			let nested = self.entry(dir);
210			let mut locked = nested.lock();
211			locked.remove(&full, broadcast, &relative);
212
213			if locked.is_empty() {
214				drop(locked);
215				self.nested.remove(dir);
216			}
217		} else {
218			let entry = match &mut self.broadcast {
219				Some(existing) => existing,
220				None => return,
221			};
222
223			// See if we can remove the broadcast from the backup list.
224			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
225			if let Some(pos) = pos {
226				entry.backup.remove(pos);
227				// Nothing else to do
228				return;
229			}
230
231			// Okay so it must be the active broadcast or else we fucked up.
232			assert!(entry.active.is_clone(&broadcast));
233
234			// If there's a backup broadcast, then announce it.
235			if let Some(active) = entry.backup.pop() {
236				entry.active = active;
237				self.notify.lock().reannounce(full, &entry.active);
238			} else {
239				// No more backups, so remove the entry.
240				self.broadcast = None;
241				self.notify.lock().unannounce(full);
242			}
243		}
244	}
245
246	fn is_empty(&self) -> bool {
247		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
248	}
249}
250
251#[derive(Clone)]
252struct OriginNodes {
253	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
254}
255
256impl OriginNodes {
257	// Returns nested roots that match the prefixes.
258	// TODO enforce that prefixes can't overlap.
259	pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
260		let mut roots = Vec::new();
261
262		for (root, state) in &self.nodes {
263			for prefix in prefixes {
264				if root.has_prefix(prefix) {
265					// Keep the existing node if we're allowed to access it.
266					roots.push((root.to_owned(), state.clone()));
267					continue;
268				}
269
270				if let Some(suffix) = prefix.strip_prefix(root) {
271					// If the requested prefix is larger than the allowed prefix, then we further scope it.
272					let nested = state.lock().leaf(&suffix);
273					roots.push((prefix.to_owned(), nested));
274				}
275			}
276		}
277
278		if roots.is_empty() {
279			None
280		} else {
281			Some(Self { nodes: roots })
282		}
283	}
284
285	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
286		let new_root = new_root.as_path();
287		let mut roots = Vec::new();
288
289		if new_root.is_empty() {
290			return Some(self.clone());
291		}
292
293		for (root, state) in &self.nodes {
294			if let Some(suffix) = root.strip_prefix(&new_root) {
295				// If the old root is longer than the new root, shorten the keys.
296				roots.push((suffix.to_owned(), state.clone()));
297			} else if let Some(suffix) = new_root.strip_prefix(root) {
298				// If the new root is longer than the old root, add a new root.
299				// NOTE: suffix can't be empty
300				let nested = state.lock().leaf(&suffix);
301				roots.push(("".into(), nested));
302			}
303		}
304
305		if roots.is_empty() {
306			None
307		} else {
308			Some(Self { nodes: roots })
309		}
310	}
311
312	// Returns the root that has this prefix.
313	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
314		let path = path.as_path();
315
316		for (root, state) in &self.nodes {
317			if let Some(suffix) = path.strip_prefix(root) {
318				return Some((state.clone(), suffix.to_owned()));
319			}
320		}
321
322		None
323	}
324}
325
326impl Default for OriginNodes {
327	fn default() -> Self {
328		Self {
329			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
330		}
331	}
332}
333
334/// A broadcast path and its associated consumer, or None if closed.
335pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
336
337pub struct Origin {}
338
339impl Origin {
340	pub fn produce() -> Produce<OriginProducer, OriginConsumer> {
341		let producer = OriginProducer::default();
342		let consumer = producer.consume();
343		Produce { producer, consumer }
344	}
345}
346
347/// Announces broadcasts to consumers over the network.
348#[derive(Clone, Default)]
349pub struct OriginProducer {
350	// The roots of the tree that we are allowed to publish.
351	// A path of "" means we can publish anything.
352	nodes: OriginNodes,
353
354	/// The prefix that is automatically stripped from all paths.
355	root: PathOwned,
356}
357
358impl OriginProducer {
359	/// Publish a broadcast, announcing it to all consumers.
360	///
361	/// The broadcast will be unannounced when it is closed.
362	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
363	/// If the old broadcast is closed before the new one, then nothing will happen.
364	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
365	///
366	/// Returns false if the broadcast is not allowed to be published.
367	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
368		let path = path.as_path();
369
370		let (root, rest) = match self.nodes.get(&path) {
371			Some(root) => root,
372			None => return false,
373		};
374
375		let full = self.root.join(&path);
376
377		root.lock().publish(&full, &broadcast, &rest);
378		let root = root.clone();
379
380		web_async::spawn(async move {
381			broadcast.closed().await;
382			root.lock().remove(&full, broadcast, &rest);
383		});
384
385		true
386	}
387
388	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
389	///
390	/// Returns None if there are no legal prefixes.
391	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
392		Some(OriginProducer {
393			nodes: self.nodes.select(prefixes)?,
394			root: self.root.clone(),
395		})
396	}
397
398	/// Subscribe to all announced broadcasts.
399	pub fn consume(&self) -> OriginConsumer {
400		OriginConsumer::new(self.root.clone(), self.nodes.clone())
401	}
402
403	/// Subscribe to all announced broadcasts matching the prefix.
404	///
405	/// TODO: Don't use overlapping prefixes or duplicates will be published.
406	///
407	/// Returns None if there are no legal prefixes.
408	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
409		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
410	}
411
412	/// Returns a new OriginProducer that automatically strips out the provided prefix.
413	///
414	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
415	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
416		let prefix = prefix.as_path();
417
418		Some(Self {
419			root: self.root.join(&prefix).to_owned(),
420			nodes: self.nodes.root(&prefix)?,
421		})
422	}
423
424	/// Returns the root that is automatically stripped from all paths.
425	pub fn root(&self) -> &Path {
426		&self.root
427	}
428
429	pub fn allowed(&self) -> impl Iterator<Item = &Path> {
430		self.nodes.nodes.iter().map(|(root, _)| root)
431	}
432
433	/// Converts a relative path to an absolute path.
434	pub fn absolute(&self, path: impl AsPath) -> Path {
435		self.root.join(path)
436	}
437}
438
439/// Consumes announced broadcasts matching against an optional prefix.
440// Not clone because it's expensive; call `consume` instead.
441pub struct OriginConsumer {
442	id: ConsumerId,
443	nodes: OriginNodes,
444	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
445
446	/// A prefix that is automatically stripped from all paths.
447	root: PathOwned,
448}
449
450impl OriginConsumer {
451	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
452		let (tx, rx) = mpsc::unbounded_channel();
453
454		let id = ConsumerId::new();
455
456		for (_, state) in &nodes.nodes {
457			let notify = OriginConsumerNotify {
458				root: root.clone(),
459				tx: tx.clone(),
460			};
461			state.lock().consume(id, notify);
462		}
463
464		Self {
465			id,
466			nodes,
467			updates: rx,
468			root,
469		}
470	}
471
472	/// Returns the next (un)announced broadcast and the absolute path.
473	///
474	/// The broadcast will only be announced if it was previously unannounced.
475	/// The same path won't be announced/unannounced twice, instead it will toggle.
476	/// Returns None if the consumer is closed.
477	///
478	/// Note: The returned path is absolute and will always match this consumer's prefix.
479	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
480		self.updates.recv().await
481	}
482
483	/// Returns the next (un)announced broadcast and the absolute path without blocking.
484	///
485	/// Returns None if there is no update available; NOT because the consumer is closed.
486	/// You have to use `is_closed` to check if the consumer is closed.
487	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
488		self.updates.try_recv().ok()
489	}
490
491	pub fn consume(&self) -> OriginConsumer {
492		OriginConsumer::new(self.root.clone(), self.nodes.clone())
493	}
494
495	/// Get a specific broadcast by path.
496	///
497	/// TODO This should include announcement support.
498	///
499	/// Returns None if the path hasn't been announced yet.
500	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
501		let path = path.as_path();
502		let (root, rest) = self.nodes.get(&path)?;
503		let state = root.lock();
504		state.consume_broadcast(&rest)
505	}
506
507	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
508	///
509	/// Returns None if there are no legal prefixes (would always return None).
510	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
511		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
512	}
513
514	/// Returns the prefix that is automatically stripped from all paths.
515	pub fn root(&self) -> &Path {
516		&self.root
517	}
518
519	pub fn allowed(&self) -> impl Iterator<Item = &Path> {
520		self.nodes.nodes.iter().map(|(root, _)| root)
521	}
522
523	/// Converts a relative path to an absolute path.
524	pub fn absolute(&self, path: impl AsPath) -> Path {
525		self.root.join(path)
526	}
527}
528
529impl Drop for OriginConsumer {
530	fn drop(&mut self) {
531		for (_, root) in &self.nodes.nodes {
532			root.lock().unconsume(self.id);
533		}
534	}
535}
536
537#[cfg(test)]
538use futures::FutureExt;
539
540#[cfg(test)]
541impl OriginConsumer {
542	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
543		let expected = expected.as_path();
544		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
545		assert_eq!(path, expected, "wrong path");
546		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
547	}
548
549	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
550		let expected = expected.as_path();
551		let (path, active) = self.try_announced().expect("no next");
552		assert_eq!(path, expected, "wrong path");
553		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
554	}
555
556	pub fn assert_next_none(&mut self, expected: impl AsPath) {
557		let expected = expected.as_path();
558		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
559		assert_eq!(path, expected, "wrong path");
560		assert!(active.is_none(), "should be unannounced");
561	}
562
563	pub fn assert_next_wait(&mut self) {
564		if let Some(res) = self.announced().now_or_never() {
565			panic!("next should block: got {:?}", res.map(|(path, _)| path));
566		}
567	}
568
569	/*
570	pub fn assert_next_closed(&mut self) {
571		assert!(
572			self.announced().now_or_never().expect("next blocked").is_none(),
573			"next should be closed"
574		);
575	}
576	*/
577}
578
579#[cfg(test)]
580mod tests {
581	use crate::Broadcast;
582
583	use super::*;
584
585	#[tokio::test]
586	async fn test_announce() {
587		let origin = Origin::produce();
588		let broadcast1 = Broadcast::produce();
589		let broadcast2 = Broadcast::produce();
590
591		let mut consumer1 = origin.consumer;
592		// Make a new consumer that should get it.
593		consumer1.assert_next_wait();
594
595		// Publish the first broadcast.
596		origin.producer.publish_broadcast("test1", broadcast1.consumer);
597
598		consumer1.assert_next("test1", &broadcast1.producer.consume());
599		consumer1.assert_next_wait();
600
601		// Make a new consumer that should get the existing broadcast.
602		// But we don't consume it yet.
603		let mut consumer2 = origin.producer.consume();
604
605		// Publish the second broadcast.
606		origin.producer.publish_broadcast("test2", broadcast2.consumer);
607
608		consumer1.assert_next("test2", &broadcast2.producer.consume());
609		consumer1.assert_next_wait();
610
611		consumer2.assert_next("test1", &broadcast1.producer.consume());
612		consumer2.assert_next("test2", &broadcast2.producer.consume());
613		consumer2.assert_next_wait();
614
615		// Close the first broadcast.
616		drop(broadcast1.producer);
617
618		// Wait for the async task to run.
619		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
620
621		// All consumers should get a None now.
622		consumer1.assert_next_none("test1");
623		consumer2.assert_next_none("test1");
624		consumer1.assert_next_wait();
625		consumer2.assert_next_wait();
626
627		// And a new consumer only gets the last broadcast.
628		let mut consumer3 = origin.producer.consume();
629		consumer3.assert_next("test2", &broadcast2.producer.consume());
630		consumer3.assert_next_wait();
631
632		// Close the other producer and make sure it cleans up
633		drop(broadcast2.producer);
634
635		// Wait for the async task to run.
636		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
637
638		consumer1.assert_next_none("test2");
639		consumer2.assert_next_none("test2");
640		consumer3.assert_next_none("test2");
641
642		/* TODO close the origin consumer when the producer is dropped
643		consumer1.assert_next_closed();
644		consumer2.assert_next_closed();
645		consumer3.assert_next_closed();
646		*/
647	}
648
649	#[tokio::test]
650	async fn test_duplicate() {
651		let mut origin = Origin::produce();
652
653		let broadcast1 = Broadcast::produce();
654		let broadcast2 = Broadcast::produce();
655		let broadcast3 = Broadcast::produce();
656
657		let consumer1 = broadcast1.consumer;
658		let consumer2 = broadcast2.consumer;
659		let consumer3 = broadcast3.consumer;
660
661		origin.producer.publish_broadcast("test", consumer1.clone());
662		origin.producer.publish_broadcast("test", consumer2.clone());
663		origin.producer.publish_broadcast("test", consumer3.clone());
664
665		assert!(origin.consumer.consume_broadcast("test").is_some());
666
667		origin.consumer.assert_next("test", &consumer1);
668		origin.consumer.assert_next_none("test");
669		origin.consumer.assert_next("test", &consumer2);
670		origin.consumer.assert_next_none("test");
671		origin.consumer.assert_next("test", &consumer3);
672
673		// Drop the backup, nothing should change.
674		drop(broadcast2.producer);
675
676		// Wait for the async task to run.
677		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
678
679		assert!(origin.consumer.consume_broadcast("test").is_some());
680		origin.consumer.assert_next_wait();
681
682		// Drop the active, we should reannounce.
683		drop(broadcast3.producer);
684
685		// Wait for the async task to run.
686		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
687
688		assert!(origin.consumer.consume_broadcast("test").is_some());
689		origin.consumer.assert_next_none("test");
690		origin.consumer.assert_next("test", &consumer1);
691
692		// Drop the final broadcast, we should unannounce.
693		drop(broadcast1.producer);
694
695		// Wait for the async task to run.
696		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
697		assert!(origin.consumer.consume_broadcast("test").is_none());
698
699		origin.consumer.assert_next_none("test");
700		origin.consumer.assert_next_wait();
701	}
702
703	#[tokio::test]
704	async fn test_duplicate_reverse() {
705		let origin = Origin::produce();
706		let broadcast1 = Broadcast::produce();
707		let broadcast2 = Broadcast::produce();
708
709		origin.producer.publish_broadcast("test", broadcast1.consumer);
710		origin.producer.publish_broadcast("test", broadcast2.consumer);
711		assert!(origin.consumer.consume_broadcast("test").is_some());
712
713		// This is harder, dropping the new broadcast first.
714		drop(broadcast2.producer);
715
716		// Wait for the cleanup async task to run.
717		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
718		assert!(origin.consumer.consume_broadcast("test").is_some());
719
720		drop(broadcast1.producer);
721
722		// Wait for the cleanup async task to run.
723		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
724		assert!(origin.consumer.consume_broadcast("test").is_none());
725	}
726
727	#[tokio::test]
728	async fn test_double_publish() {
729		let origin = Origin::produce();
730		let broadcast = Broadcast::produce();
731
732		// Ensure it doesn't crash.
733		origin.producer.publish_broadcast("test", broadcast.producer.consume());
734		origin.producer.publish_broadcast("test", broadcast.producer.consume());
735
736		assert!(origin.consumer.consume_broadcast("test").is_some());
737
738		drop(broadcast.producer);
739
740		// Wait for the async task to run.
741		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
742		assert!(origin.consumer.consume_broadcast("test").is_none());
743	}
744	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
745	#[tokio::test]
746	#[should_panic]
747	async fn test_128() {
748		let mut origin = Origin::produce();
749		let broadcast = Broadcast::produce();
750
751		for i in 0..256 {
752			origin
753				.producer
754				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
755		}
756
757		for i in 0..256 {
758			origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer);
759		}
760	}
761
762	#[tokio::test]
763	async fn test_128_fix() {
764		let mut origin = Origin::produce();
765		let broadcast = Broadcast::produce();
766
767		for i in 0..256 {
768			origin
769				.producer
770				.publish_broadcast(format!("test{i}"), broadcast.consumer.clone());
771		}
772
773		for i in 0..256 {
774			// try_next does not have the same issue because it's synchronous.
775			origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer);
776		}
777	}
778
779	#[tokio::test]
780	async fn test_with_root_basic() {
781		let mut origin = Origin::produce();
782		let broadcast = Broadcast::produce();
783
784		// Create a producer with root "/foo"
785		let foo_producer = origin.producer.with_root("foo").expect("should create root");
786		assert_eq!(foo_producer.root().as_str(), "foo");
787
788		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
789		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone()));
790
791		// The original consumer should see the full path
792		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
793
794		// A consumer created from the rooted producer should see the stripped path
795		let mut foo_consumer = foo_producer.consume();
796		foo_consumer.assert_next("bar/baz", &broadcast.consumer);
797	}
798
799	#[tokio::test]
800	async fn test_with_root_nested() {
801		let mut origin = Origin::produce();
802		let broadcast = Broadcast::produce();
803
804		// Create nested roots
805		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
806		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
807		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
808
809		// Publishing to "baz" should actually publish to "foo/bar/baz"
810		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone()));
811
812		// The original consumer sees the full path
813		origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer);
814
815		// Consumer from foo_bar_producer sees just "baz"
816		let mut foo_bar_consumer = foo_bar_producer.consume();
817		foo_bar_consumer.assert_next("baz", &broadcast.consumer);
818	}
819
820	#[tokio::test]
821	async fn test_publish_only_allows() {
822		let origin = Origin::produce();
823		let broadcast = Broadcast::produce();
824
825		// Create a producer that can only publish to "allowed" paths
826		let limited_producer = origin
827			.producer
828			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
829			.expect("should create limited producer");
830
831		// Should be able to publish to allowed paths
832		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone()));
833		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone()));
834		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone()));
835
836		// Should not be able to publish to disallowed paths
837		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone()));
838		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path
839		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
840	}
841
842	#[tokio::test]
843	async fn test_publish_only_empty() {
844		let origin = Origin::produce();
845
846		// Creating a producer with no allowed paths should return None
847		assert!(origin.producer.publish_only(&[]).is_none());
848	}
849
850	#[tokio::test]
851	async fn test_consume_only_filters() {
852		let mut origin = Origin::produce();
853		let broadcast1 = Broadcast::produce();
854		let broadcast2 = Broadcast::produce();
855		let broadcast3 = Broadcast::produce();
856
857		// Publish to different paths
858		origin
859			.producer
860			.publish_broadcast("allowed", broadcast1.consumer.clone());
861		origin
862			.producer
863			.publish_broadcast("allowed/nested", broadcast2.consumer.clone());
864		origin
865			.producer
866			.publish_broadcast("notallowed", broadcast3.consumer.clone());
867
868		// Create a consumer that only sees "allowed" paths
869		let mut limited_consumer = origin
870			.consumer
871			.consume_only(&["allowed".into()])
872			.expect("should create limited consumer");
873
874		// Should only receive broadcasts under "allowed"
875		limited_consumer.assert_next("allowed", &broadcast1.consumer);
876		limited_consumer.assert_next("allowed/nested", &broadcast2.consumer);
877		limited_consumer.assert_next_wait(); // Should not see "notallowed"
878
879		// Original consumer should see all
880		origin.consumer.assert_next("allowed", &broadcast1.consumer);
881		origin.consumer.assert_next("allowed/nested", &broadcast2.consumer);
882		origin.consumer.assert_next("notallowed", &broadcast3.consumer);
883	}
884
885	#[tokio::test]
886	async fn test_consume_only_multiple_prefixes() {
887		let origin = Origin::produce();
888		let broadcast1 = Broadcast::produce();
889		let broadcast2 = Broadcast::produce();
890		let broadcast3 = Broadcast::produce();
891
892		origin
893			.producer
894			.publish_broadcast("foo/test", broadcast1.consumer.clone());
895		origin
896			.producer
897			.publish_broadcast("bar/test", broadcast2.consumer.clone());
898		origin
899			.producer
900			.publish_broadcast("baz/test", broadcast3.consumer.clone());
901
902		// Consumer that only sees "foo" and "bar" paths
903		let mut limited_consumer = origin
904			.consumer
905			.consume_only(&["foo".into(), "bar".into()])
906			.expect("should create limited consumer");
907
908		limited_consumer.assert_next("foo/test", &broadcast1.consumer);
909		limited_consumer.assert_next("bar/test", &broadcast2.consumer);
910		limited_consumer.assert_next_wait(); // Should not see "baz/test"
911	}
912
913	#[tokio::test]
914	async fn test_with_root_and_publish_only() {
915		let mut origin = Origin::produce();
916		let broadcast = Broadcast::produce();
917
918		// User connects to /foo root
919		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
920
921		// Limit them to publish only to "bar" and "goop/pee" within /foo
922		let limited_producer = foo_producer
923			.publish_only(&["bar".into(), "goop/pee".into()])
924			.expect("should create limited producer");
925
926		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
927		assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone()));
928		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone()));
929		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone()));
930		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone()));
931
932		// Should not be able to publish outside allowed paths
933		assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone()));
934		assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed
935		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone()));
936
937		// Original consumer sees full paths
938		origin.consumer.assert_next("foo/bar", &broadcast.consumer);
939		origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer);
940		origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer);
941		origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer);
942	}
943
944	#[tokio::test]
945	async fn test_with_root_and_consume_only() {
946		let origin = Origin::produce();
947		let broadcast1 = Broadcast::produce();
948		let broadcast2 = Broadcast::produce();
949		let broadcast3 = Broadcast::produce();
950
951		// Publish broadcasts
952		origin
953			.producer
954			.publish_broadcast("foo/bar/test", broadcast1.consumer.clone());
955		origin
956			.producer
957			.publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone());
958		origin
959			.producer
960			.publish_broadcast("foo/other/test", broadcast3.consumer.clone());
961
962		// User connects to /foo root
963		let foo_producer = origin.producer.with_root("foo").expect("should create foo root");
964
965		// Create consumer limited to "bar" and "goop/pee" within /foo
966		let mut limited_consumer = foo_producer
967			.consume_only(&["bar".into(), "goop/pee".into()])
968			.expect("should create limited consumer");
969
970		// Should only see allowed paths (without foo prefix)
971		limited_consumer.assert_next("bar/test", &broadcast1.consumer);
972		limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer);
973		limited_consumer.assert_next_wait(); // Should not see "other/test"
974	}
975
976	#[tokio::test]
977	async fn test_with_root_unauthorized() {
978		let origin = Origin::produce();
979
980		// First limit the producer to specific paths
981		let limited_producer = origin
982			.producer
983			.publish_only(&["allowed".into()])
984			.expect("should create limited producer");
985
986		// Trying to create a root outside allowed paths should fail
987		assert!(limited_producer.with_root("notallowed").is_none());
988
989		// But creating a root within allowed paths should work
990		let allowed_root = limited_producer
991			.with_root("allowed")
992			.expect("should create allowed root");
993		assert_eq!(allowed_root.root().as_str(), "allowed");
994	}
995
996	#[tokio::test]
997	async fn test_wildcard_permission() {
998		let origin = Origin::produce();
999		let broadcast = Broadcast::produce();
1000
1001		// Producer with root access (empty string means wildcard)
1002		let root_producer = origin.producer.clone();
1003
1004		// Should be able to publish anywhere
1005		assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone()));
1006		assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone()));
1007
1008		// Can create any root
1009		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1010		assert_eq!(foo_producer.root().as_str(), "foo");
1011	}
1012
1013	#[tokio::test]
1014	async fn test_consume_broadcast_with_permissions() {
1015		let origin = Origin::produce();
1016		let broadcast1 = Broadcast::produce();
1017		let broadcast2 = Broadcast::produce();
1018
1019		origin
1020			.producer
1021			.publish_broadcast("allowed/test", broadcast1.consumer.clone());
1022		origin
1023			.producer
1024			.publish_broadcast("notallowed/test", broadcast2.consumer.clone());
1025
1026		// Create limited consumer
1027		let limited_consumer = origin
1028			.consumer
1029			.consume_only(&["allowed".into()])
1030			.expect("should create limited consumer");
1031
1032		// Should be able to get allowed broadcast
1033		let result = limited_consumer.consume_broadcast("allowed/test");
1034		assert!(result.is_some());
1035		assert!(result.unwrap().is_clone(&broadcast1.consumer));
1036
1037		// Should not be able to get disallowed broadcast
1038		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1039
1040		// Original consumer can get both
1041		assert!(origin.consumer.consume_broadcast("allowed/test").is_some());
1042		assert!(origin.consumer.consume_broadcast("notallowed/test").is_some());
1043	}
1044
1045	#[tokio::test]
1046	async fn test_nested_paths_with_permissions() {
1047		let origin = Origin::produce();
1048		let broadcast = Broadcast::produce();
1049
1050		// Create producer limited to "a/b/c"
1051		let limited_producer = origin
1052			.producer
1053			.publish_only(&["a/b/c".into()])
1054			.expect("should create limited producer");
1055
1056		// Should be able to publish to exact path and nested paths
1057		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone()));
1058		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone()));
1059		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone()));
1060
1061		// Should not be able to publish to parent or sibling paths
1062		assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone()));
1063		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone()));
1064		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone()));
1065	}
1066
1067	#[tokio::test]
1068	async fn test_multiple_consumers_with_different_permissions() {
1069		let origin = Origin::produce();
1070		let broadcast1 = Broadcast::produce();
1071		let broadcast2 = Broadcast::produce();
1072		let broadcast3 = Broadcast::produce();
1073
1074		// Publish to different paths
1075		origin
1076			.producer
1077			.publish_broadcast("foo/test", broadcast1.consumer.clone());
1078		origin
1079			.producer
1080			.publish_broadcast("bar/test", broadcast2.consumer.clone());
1081		origin
1082			.producer
1083			.publish_broadcast("baz/test", broadcast3.consumer.clone());
1084
1085		// Create consumers with different permissions
1086		let mut foo_consumer = origin
1087			.consumer
1088			.consume_only(&["foo".into()])
1089			.expect("should create foo consumer");
1090
1091		let mut bar_consumer = origin
1092			.consumer
1093			.consume_only(&["bar".into()])
1094			.expect("should create bar consumer");
1095
1096		let mut foobar_consumer = origin
1097			.consumer
1098			.consume_only(&["foo".into(), "bar".into()])
1099			.expect("should create foobar consumer");
1100
1101		// Each consumer should only see their allowed paths
1102		foo_consumer.assert_next("foo/test", &broadcast1.consumer);
1103		foo_consumer.assert_next_wait();
1104
1105		bar_consumer.assert_next("bar/test", &broadcast2.consumer);
1106		bar_consumer.assert_next_wait();
1107
1108		foobar_consumer.assert_next("foo/test", &broadcast1.consumer);
1109		foobar_consumer.assert_next("bar/test", &broadcast2.consumer);
1110		foobar_consumer.assert_next_wait();
1111	}
1112
1113	#[tokio::test]
1114	async fn test_select_with_empty_prefix() {
1115		let origin = Origin::produce();
1116		let broadcast1 = Broadcast::produce();
1117		let broadcast2 = Broadcast::produce();
1118
1119		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1120		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1121		let limited_producer = demo_producer
1122			.publish_only(&["worm-node".into(), "foobar".into()])
1123			.expect("should create limited producer");
1124
1125		// Publish some broadcasts
1126		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone()));
1127		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone()));
1128
1129		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1130		let mut consumer = limited_producer
1131			.consume_only(&["".into()])
1132			.expect("should create consumer with empty prefix");
1133
1134		// Should still see both broadcasts
1135		consumer.assert_next("worm-node/test", &broadcast1.consumer);
1136		consumer.assert_next("foobar/test", &broadcast2.consumer);
1137		consumer.assert_next_wait();
1138	}
1139
1140	#[tokio::test]
1141	async fn test_select_narrowing_scope() {
1142		let origin = Origin::produce();
1143		let broadcast1 = Broadcast::produce();
1144		let broadcast2 = Broadcast::produce();
1145		let broadcast3 = Broadcast::produce();
1146
1147		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1148		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1149		let limited_producer = demo_producer
1150			.publish_only(&["worm-node".into(), "foobar".into()])
1151			.expect("should create limited producer");
1152
1153		// Publish broadcasts at different levels
1154		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone()));
1155		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone()));
1156		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone()));
1157
1158		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1159		let mut worm_consumer = limited_producer
1160			.consume_only(&["worm-node".into()])
1161			.expect("should create worm-node consumer");
1162
1163		// Should see worm-node content with paths stripped to ""
1164		worm_consumer.assert_next("worm-node", &broadcast1.consumer);
1165		worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1166		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1167
1168		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1169		let mut foo_consumer = limited_producer
1170			.consume_only(&["worm-node/foo".into()])
1171			.expect("should create worm-node/foo consumer");
1172
1173		foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer);
1174		foo_consumer.assert_next_wait(); // Should NOT see other content
1175	}
1176
1177	#[tokio::test]
1178	async fn test_select_multiple_roots_with_empty_prefix() {
1179		let origin = Origin::produce();
1180		let broadcast1 = Broadcast::produce();
1181		let broadcast2 = Broadcast::produce();
1182		let broadcast3 = Broadcast::produce();
1183
1184		// Producer with multiple allowed roots
1185		let limited_producer = origin
1186			.producer
1187			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1188			.expect("should create limited producer");
1189
1190		// Publish to each root
1191		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone()));
1192		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone()));
1193		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone()));
1194
1195		// consume_only with empty prefix should maintain all roots
1196		let mut consumer = limited_producer
1197			.consume_only(&["".into()])
1198			.expect("should create consumer with empty prefix");
1199
1200		// Should see all broadcasts from all roots
1201		consumer.assert_next("app1/data", &broadcast1.consumer);
1202		consumer.assert_next("app2/config", &broadcast2.consumer);
1203		consumer.assert_next("shared/resource", &broadcast3.consumer);
1204		consumer.assert_next_wait();
1205	}
1206
1207	#[tokio::test]
1208	async fn test_publish_only_with_empty_prefix() {
1209		let origin = Origin::produce();
1210		let broadcast = Broadcast::produce();
1211
1212		// Producer with specific allowed paths
1213		let limited_producer = origin
1214			.producer
1215			.publish_only(&["services/api".into(), "services/web".into()])
1216			.expect("should create limited producer");
1217
1218		// publish_only with empty prefix should keep the same restrictions
1219		let same_producer = limited_producer
1220			.publish_only(&["".into()])
1221			.expect("should create producer with empty prefix");
1222
1223		// Should still have the same publishing restrictions
1224		assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone()));
1225		assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone()));
1226		assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone()));
1227		assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone()));
1228	}
1229
1230	#[tokio::test]
1231	async fn test_select_narrowing_to_deeper_path() {
1232		let origin = Origin::produce();
1233		let broadcast1 = Broadcast::produce();
1234		let broadcast2 = Broadcast::produce();
1235		let broadcast3 = Broadcast::produce();
1236
1237		// Producer with broad permission
1238		let limited_producer = origin
1239			.producer
1240			.publish_only(&["org".into()])
1241			.expect("should create limited producer");
1242
1243		// Publish at various depths
1244		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone()));
1245		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone()));
1246		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone()));
1247
1248		// Narrow down to team1 only
1249		let mut team1_consumer = limited_producer
1250			.consume_only(&["org/team2".into()])
1251			.expect("should create team1 consumer");
1252
1253		team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer);
1254		team1_consumer.assert_next_wait(); // Should NOT see team1 content
1255
1256		// Further narrow down to team1/project1
1257		let mut project1_consumer = limited_producer
1258			.consume_only(&["org/team1/project1".into()])
1259			.expect("should create project1 consumer");
1260
1261		// Should only see project1 content at root
1262		project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer);
1263		project1_consumer.assert_next_wait();
1264	}
1265
1266	#[tokio::test]
1267	async fn test_select_with_non_matching_prefix() {
1268		let origin = Origin::produce();
1269
1270		// Producer with specific allowed paths
1271		let limited_producer = origin
1272			.producer
1273			.publish_only(&["allowed/path".into()])
1274			.expect("should create limited producer");
1275
1276		// Trying to consume_only with a completely different prefix should return None
1277		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1278
1279		// Similarly for publish_only
1280		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1281	}
1282
1283	#[tokio::test]
1284	async fn test_select_maintains_access_with_wider_prefix() {
1285		let origin = Origin::produce();
1286		let broadcast1 = Broadcast::produce();
1287		let broadcast2 = Broadcast::produce();
1288
1289		// Setup: user with root "demo" allowed to subscribe to specific paths
1290		let demo_producer = origin.producer.with_root("demo").expect("should create demo root");
1291		let user_producer = demo_producer
1292			.publish_only(&["worm-node".into(), "foobar".into()])
1293			.expect("should create user producer");
1294
1295		// Publish some data
1296		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone()));
1297		assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone()));
1298
1299		// Key test: consume_only with "" should maintain access to allowed roots
1300		let mut consumer = user_producer
1301			.consume_only(&["".into()])
1302			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1303
1304		// Should still receive broadcasts from allowed paths
1305		consumer.assert_next("worm-node/data", &broadcast1.consumer);
1306		consumer.assert_next("foobar", &broadcast2.consumer);
1307		consumer.assert_next_wait();
1308
1309		// Also test that we can still narrow the scope
1310		let mut narrow_consumer = user_producer
1311			.consume_only(&["worm-node".into()])
1312			.expect("should be able to narrow scope to worm-node");
1313
1314		narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer);
1315		narrow_consumer.assert_next_wait(); // Should not see foobar
1316	}
1317}