Skip to main content

soil_network/statement_store/
subscription.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Subscription logic for statement store.
8//!
9//! Manages subscriptions to statement topics and notifies subscribers when new statements arrive.
10//! Uses multiple matcher tasks to handle subscriptions concurrently, each responsible for a subset
11//! of subscriptions. Each matcher task maintains its own list of subscriptions and matches incoming
12//! statements against them. When a new statement is submitted, it is sent to all matcher tasks for
13//! processing. If a statement matches a subscription's filter, it is sent to the subscriber via an
14//! async channel.
15//!
16//! This design allows for efficient handling of a large number of subscriptions and statements and
17//! can be scaled by adjusting the number of matcher tasks.
18
19// Buffer size for the matcher task channels, to backpressure the submission senders.
20// This value is generous to allow for bursts of statements without dropping any or backpressuring
21// too early.
22const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000;
23
24// Buffer size for individual subscriptions.
25const SUBSCRIPTION_BUFFER_SIZE: usize = 128;
26
27use futures::{Stream, StreamExt};
28use itertools::Itertools;
29
30use super::LOG_TARGET;
31use soil_client::utils::id_sequence::SeqID;
32pub use soil_statement_store::StatementStore;
33use soil_statement_store::{
34	OptimizedTopicFilter, Result, Statement, StatementEvent, Topic, MAX_TOPICS,
35};
36use std::{
37	collections::{hash_map::Entry, HashMap, HashSet},
38	sync::atomic::AtomicU64,
39};
40use subsoil::core::{traits::SpawnNamed, Bytes, Encode};
41
42/// Trait for initiating statement store subscriptions from the RPC module.
43pub trait StatementStoreSubscriptionApi: Send + Sync {
44	/// Subscribe to statements matching the topic filter.
45	///
46	/// Returns existing matching statements, a sender channel to send matched statements and a
47	/// stream for receiving matched statements when they arrive.
48	fn subscribe_statement(
49		&self,
50		topic_filter: OptimizedTopicFilter,
51	) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>;
52}
53
54/// Messages sent to matcher tasks.
55#[derive(Clone, Debug)]
56pub enum MatcherMessage {
57	/// A new statement has been submitted.
58	NewStatement(Statement),
59	/// A new subscription has been created.
60	Subscribe(SubscriptionInfo),
61	/// Unsubscribe the subscription with the given ID.
62	Unsubscribe(SeqID),
63}
64
65// Handle to manage all subscriptions.
66pub struct SubscriptionsHandle {
67	// Sequence generator for subscription IDs, atomic for thread safety.
68	// Subscription creation is expensive enough that we don't worry about overflow here.
69	id_sequence: AtomicU64,
70	//  Subscriptions matchers handlers.
71	matchers: SubscriptionsMatchersHandlers,
72}
73
74impl SubscriptionsHandle {
75	/// Create a new SubscriptionsHandle with the given task spawner and number of filter workers.
76	pub(crate) fn new(
77		task_spawner: Box<dyn SpawnNamed>,
78		num_matcher_workers: usize,
79	) -> SubscriptionsHandle {
80		let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers);
81
82		for task in 0..num_matcher_workers {
83			let (subscription_matcher_sender, subscription_matcher_receiver) =
84				async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
85			subscriptions_matchers_senders.push(subscription_matcher_sender);
86			task_spawner.spawn_blocking(
87				"statement-store-subscription-filters",
88				Some("statement-store"),
89				Box::pin(async move {
90					let mut subscriptions = SubscriptionsInfo::new();
91					log::debug!(
92						target: LOG_TARGET,
93						"Started statement subscription matcher task: {task}"
94					);
95					loop {
96						let res = subscription_matcher_receiver.recv().await;
97						match res {
98							Ok(MatcherMessage::NewStatement(statement)) => {
99								subscriptions.notify_matching_filters(&statement);
100							},
101							Ok(MatcherMessage::Subscribe(info)) => {
102								subscriptions.subscribe(info);
103							},
104							Ok(MatcherMessage::Unsubscribe(seq_id)) => {
105								subscriptions.unsubscribe(seq_id);
106							},
107							Err(_) => {
108								// Expected when the subscription manager is dropped at shutdown.
109								log::error!(
110									target: LOG_TARGET,
111									"Statement subscription matcher channel closed: {task}"
112								);
113								break;
114							},
115						};
116					}
117				}),
118			);
119		}
120		SubscriptionsHandle {
121			id_sequence: AtomicU64::new(0),
122			matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders),
123		}
124	}
125
126	// Generate the next unique subscription ID.
127	fn next_id(&self) -> SeqID {
128		let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
129		SeqID::from(id)
130	}
131
132	/// Subscribe to statements matching the topic filter.
133	pub(crate) fn subscribe(
134		&self,
135		topic_filter: OptimizedTopicFilter,
136	) -> (async_channel::Sender<StatementEvent>, SubscriptionStatementsStream) {
137		let next_id = self.next_id();
138		let (tx, rx) = async_channel::bounded(SUBSCRIPTION_BUFFER_SIZE);
139		let subscription_info =
140			SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx };
141
142		let result = (
143			subscription_info.tx.clone(),
144			SubscriptionStatementsStream {
145				rx,
146				sub_id: subscription_info.seq_id,
147				matchers: self.matchers.clone(),
148			},
149		);
150
151		self.matchers
152			.send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info));
153		result
154	}
155
156	pub(crate) fn notify(&self, statement: Statement) {
157		self.matchers.send_all(MatcherMessage::NewStatement(statement));
158	}
159}
160
161// Information about all subscriptions.
162// Each matcher task will have its own instance of this struct.
163struct SubscriptionsInfo {
164	// Subscriptions organized by topic for MatchAll filters.
165	//
166	// Maps each topic to an array of HashMaps, where the array is indexed by
167	// `(number_of_topics_in_filter - 1)`. For example, a subscription requiring
168	// topics [A, B] (2 topics) will be stored at index 1 under both topic A and B.
169	//
170	// This structure allows efficient matching: when a statement arrives with N topics,
171	// we only need to check subscriptions that require exactly N or fewer topics.
172	subscriptions_match_all_by_topic:
173		HashMap<Topic, [HashMap<SeqID, SubscriptionInfo>; MAX_TOPICS]>,
174	// Subscriptions organized by topic for MatchAny filters.
175	subscriptions_match_any_by_topic: HashMap<Topic, HashMap<SeqID, SubscriptionInfo>>,
176	// Subscriptions that listen with Any filter (i.e., no topic filtering).
177	subscriptions_any: HashMap<SeqID, SubscriptionInfo>,
178	// Mapping from subscription ID to topic filter.
179	by_sub_id: HashMap<SeqID, OptimizedTopicFilter>,
180}
181
182// Information about a single subscription.
183#[derive(Clone, Debug)]
184pub(crate) struct SubscriptionInfo {
185	// The filter used for this subscription.
186	topic_filter: OptimizedTopicFilter,
187	// The unique ID of this subscription.
188	seq_id: SeqID,
189	// Channel to send matched statements to the subscriber.
190	tx: async_channel::Sender<StatementEvent>,
191}
192
193impl SubscriptionsInfo {
194	fn new() -> SubscriptionsInfo {
195		SubscriptionsInfo {
196			subscriptions_match_all_by_topic: HashMap::new(),
197			subscriptions_match_any_by_topic: HashMap::new(),
198			subscriptions_any: HashMap::new(),
199			by_sub_id: HashMap::new(),
200		}
201	}
202
203	// Subscribe a new subscription.
204	fn subscribe(&mut self, subscription_info: SubscriptionInfo) {
205		self.by_sub_id
206			.insert(subscription_info.seq_id, subscription_info.topic_filter.clone());
207		match &subscription_info.topic_filter {
208			OptimizedTopicFilter::Any => {
209				self.subscriptions_any.insert(subscription_info.seq_id, subscription_info);
210			},
211			OptimizedTopicFilter::MatchAll(topics) => {
212				for topic in topics {
213					self.subscriptions_match_all_by_topic.entry(*topic).or_default()
214						[topics.len() - 1]
215						.insert(subscription_info.seq_id, subscription_info.clone());
216				}
217			},
218			OptimizedTopicFilter::MatchAny(topics) => {
219				for topic in topics {
220					self.subscriptions_match_any_by_topic
221						.entry(*topic)
222						.or_default()
223						.insert(subscription_info.seq_id, subscription_info.clone());
224				}
225			},
226		};
227	}
228
229	// Notify a single subscriber, marking it for unsubscribing if sending fails.
230	fn notify_subscriber(
231		&self,
232		subscription: &SubscriptionInfo,
233		bytes_to_send: Bytes,
234		needs_unsubscribing: &mut HashSet<SeqID>,
235	) {
236		if let Err(err) = subscription.tx.try_send(StatementEvent::NewStatements {
237			statements: vec![bytes_to_send],
238			remaining: None,
239		}) {
240			log::debug!(
241				target: LOG_TARGET,
242				"Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err
243			);
244			// Mark subscription for unsubscribing, to give it a chance to recover the buffers are
245			// generous enough, if subscription cannot keep up we unsubscribe it.
246			needs_unsubscribing.insert(subscription.seq_id);
247		}
248	}
249
250	fn notify_matching_filters(&mut self, statement: &Statement) {
251		self.notify_match_all_subscribers_best(statement);
252		self.notify_match_any_subscribers(statement);
253		self.notify_any_subscribers(statement);
254	}
255
256	// Notify all subscribers with MatchAny filters that match the given statement.
257	fn notify_match_any_subscribers(&mut self, statement: &Statement) {
258		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
259		let mut already_notified: HashSet<SeqID> = HashSet::new();
260
261		let bytes_to_send: Bytes = statement.encode().into();
262		for statement_topic in statement.topics() {
263			if let Some(subscriptions) = self.subscriptions_match_any_by_topic.get(statement_topic)
264			{
265				for subscription in subscriptions
266					.values()
267					.filter(|subscription| already_notified.insert(subscription.seq_id))
268				{
269					self.notify_subscriber(
270						subscription,
271						bytes_to_send.clone(),
272						&mut needs_unsubscribing,
273					);
274				}
275			}
276		}
277
278		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
279		// recover and not miss statements.
280		for sub_id in needs_unsubscribing {
281			self.unsubscribe(sub_id);
282		}
283	}
284
285	// Notify all subscribers with MatchAll filters that match the given statement.
286	fn notify_match_all_subscribers_best(&mut self, statement: &Statement) {
287		let bytes_to_send: Bytes = statement.encode().into();
288		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
289		let num_topics = statement.topics().len();
290
291		// Check all combinations of topics in the statement to find matching subscriptions.
292		// This works well because the maximum allowed topics is small (MAX_TOPICS = 4).
293		for num_topics_to_check in 1..=num_topics {
294			for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
295				// Find the topic with the fewest subscriptions to minimize the number of checks.
296				let Some(Some(topic_with_fewest)) = topics_combination
297					.iter()
298					.map(|topic| self.subscriptions_match_all_by_topic.get(*topic))
299					.min_by_key(|subscriptions| {
300						subscriptions.map_or(0, |subscriptions_by_length| {
301							subscriptions_by_length[num_topics_to_check - 1].len()
302						})
303					})
304				else {
305					continue;
306				};
307
308				for subscription in topic_with_fewest[num_topics_to_check - 1]
309					.values()
310					.filter(|subscription| subscription.topic_filter.matches(statement))
311				{
312					self.notify_subscriber(
313						subscription,
314						bytes_to_send.clone(),
315						&mut needs_unsubscribing,
316					);
317				}
318			}
319		}
320		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
321		// recover and not miss statements.
322		for sub_id in needs_unsubscribing {
323			self.unsubscribe(sub_id);
324		}
325	}
326
327	// Notify all subscribers that don't filter by topic and want to receive all statements.
328	fn notify_any_subscribers(&mut self, statement: &Statement) {
329		let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
330
331		let bytes_to_send: Bytes = statement.encode().into();
332		for subscription in self.subscriptions_any.values() {
333			self.notify_subscriber(subscription, bytes_to_send.clone(), &mut needs_unsubscribing);
334		}
335
336		// Unsubscribe any subscriptions that failed to receive messages, to give them a chance to
337		// recover and not miss statements.
338		for sub_id in needs_unsubscribing {
339			self.unsubscribe(sub_id);
340		}
341	}
342
343	// Unsubscribe a subscription by its ID.
344	fn unsubscribe(&mut self, id: SeqID) {
345		let Some(entry) = self.by_sub_id.remove(&id) else {
346			return;
347		};
348
349		let topics = match &entry {
350			OptimizedTopicFilter::Any => {
351				self.subscriptions_any.remove(&id);
352				return;
353			},
354			OptimizedTopicFilter::MatchAll(topics) => topics,
355			OptimizedTopicFilter::MatchAny(topics) => topics,
356		};
357
358		// Remove subscription from relevant maps.
359		for topic in topics {
360			// Check MatchAny map.
361			if let Entry::Occupied(mut entry) = self.subscriptions_match_any_by_topic.entry(*topic)
362			{
363				entry.get_mut().remove(&id);
364				if entry.get().is_empty() {
365					entry.remove();
366				}
367			}
368			// Check MatchAll map.
369			if let Entry::Occupied(mut entry) = self.subscriptions_match_all_by_topic.entry(*topic)
370			{
371				for subscriptions in entry.get_mut().iter_mut() {
372					if subscriptions.remove(&id).is_some() {
373						break;
374					}
375				}
376				if entry.get().iter().all(|s| s.is_empty()) {
377					entry.remove();
378				}
379			}
380		}
381	}
382}
383
384// Handlers to communicate with subscription matcher tasks.
385#[derive(Clone)]
386pub struct SubscriptionsMatchersHandlers {
387	// Channels to send messages to matcher tasks.
388	matchers: Vec<async_channel::Sender<MatcherMessage>>,
389}
390
391impl SubscriptionsMatchersHandlers {
392	/// Create new SubscriptionsMatchersHandlers with the given matcher task senders.
393	fn new(matchers: Vec<async_channel::Sender<MatcherMessage>>) -> SubscriptionsMatchersHandlers {
394		SubscriptionsMatchersHandlers { matchers }
395	}
396
397	// Send a message to the matcher task responsible for the given subscription ID.
398	fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) {
399		let index: u64 = id.into();
400		// If matchers channels are full we backpressure the sender, in this case it will be the
401		// processing of new statements.
402		if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message)
403		{
404			log::error!(
405				target: LOG_TARGET,
406				"Failed to send statement to matcher task: {:?}", err
407			);
408		}
409	}
410
411	// Send a message to all matcher tasks.
412	fn send_all(&self, message: MatcherMessage) {
413		for sender in &self.matchers {
414			if let Err(err) = sender.send_blocking(message.clone()) {
415				log::error!(
416					target: LOG_TARGET,
417					"Failed to send message to matcher task: {:?}", err
418				);
419			}
420		}
421	}
422}
423
424// Stream of statements for a subscription.
425pub struct SubscriptionStatementsStream {
426	// Channel to receive statements.
427	pub rx: async_channel::Receiver<StatementEvent>,
428	// Subscription ID, used for cleanup on drop.
429	sub_id: SeqID,
430	// Reference to the matchers for cleanup.
431	matchers: SubscriptionsMatchersHandlers,
432}
433
434// When the stream is dropped, unsubscribe from the matchers.
435impl Drop for SubscriptionStatementsStream {
436	fn drop(&mut self) {
437		self.matchers
438			.send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id));
439	}
440}
441
442impl Stream for SubscriptionStatementsStream {
443	type Item = StatementEvent;
444
445	fn poll_next(
446		mut self: std::pin::Pin<&mut Self>,
447		cx: &mut std::task::Context<'_>,
448	) -> std::task::Poll<Option<Self::Item>> {
449		self.rx.poll_next_unpin(cx)
450	}
451}
452
453#[cfg(test)]
454mod tests {
455
456	use super::super::tests::signed_statement;
457
458	use super::*;
459	use soil_statement_store::Topic;
460	use subsoil::core::Decode;
461
462	fn unwrap_statement(item: StatementEvent) -> Bytes {
463		match item {
464			StatementEvent::NewStatements { mut statements, .. } => {
465				assert_eq!(statements.len(), 1, "Expected exactly one statement in batch");
466				statements.remove(0)
467			},
468		}
469	}
470	#[test]
471	fn test_subscribe_unsubscribe() {
472		let mut subscriptions = SubscriptionsInfo::new();
473
474		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
475		let topic1 = Topic::from([8u8; 32]);
476		let topic2 = Topic::from([9u8; 32]);
477		let sub_info1 = SubscriptionInfo {
478			topic_filter: OptimizedTopicFilter::MatchAll(
479				vec![topic1, topic2].into_iter().collect(),
480			),
481			seq_id: SeqID::from(1),
482			tx: tx1,
483		};
484		subscriptions.subscribe(sub_info1.clone());
485		assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
486		assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
487		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
488		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
489
490		subscriptions.unsubscribe(sub_info1.seq_id);
491		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
492		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
493	}
494
495	#[test]
496	fn test_subscribe_any() {
497		let mut subscriptions = SubscriptionsInfo::new();
498		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
499		let sub_info1 = SubscriptionInfo {
500			topic_filter: OptimizedTopicFilter::Any,
501			seq_id: SeqID::from(1),
502			tx: tx1,
503		};
504		subscriptions.subscribe(sub_info1.clone());
505		assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
506		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
507		subscriptions.unsubscribe(sub_info1.seq_id);
508		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
509	}
510
511	#[test]
512	fn test_subscribe_match_any() {
513		let mut subscriptions = SubscriptionsInfo::new();
514
515		let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
516		let topic1 = Topic::from([8u8; 32]);
517		let topic2 = Topic::from([9u8; 32]);
518		let sub_info1 = SubscriptionInfo {
519			topic_filter: OptimizedTopicFilter::MatchAny(
520				vec![topic1, topic2].into_iter().collect(),
521			),
522			seq_id: SeqID::from(1),
523			tx: tx1,
524		};
525		subscriptions.subscribe(sub_info1.clone());
526		assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
527		assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic2));
528		assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
529		assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
530
531		subscriptions.unsubscribe(sub_info1.seq_id);
532		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
533		assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
534	}
535
536	#[test]
537	fn test_notify_any_subscribers() {
538		let mut subscriptions = SubscriptionsInfo::new();
539
540		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
541		let sub_info1 = SubscriptionInfo {
542			topic_filter: OptimizedTopicFilter::Any,
543			seq_id: SeqID::from(1),
544			tx: tx1,
545		};
546		subscriptions.subscribe(sub_info1.clone());
547
548		let statement = signed_statement(1);
549		subscriptions.notify_matching_filters(&statement);
550
551		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
552		let decoded_statement: Statement =
553			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
554		assert_eq!(decoded_statement, statement);
555	}
556
557	#[test]
558	fn test_notify_match_all_subscribers() {
559		let mut subscriptions = SubscriptionsInfo::new();
560
561		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
562		let topic1 = Topic::from([8u8; 32]);
563		let topic2 = Topic::from([9u8; 32]);
564		let sub_info1 = SubscriptionInfo {
565			topic_filter: OptimizedTopicFilter::MatchAll(
566				vec![topic1, topic2].into_iter().collect(),
567			),
568			seq_id: SeqID::from(1),
569			tx: tx1,
570		};
571		subscriptions.subscribe(sub_info1.clone());
572
573		let mut statement = signed_statement(1);
574		statement.set_topic(0, topic2);
575		subscriptions.notify_matching_filters(&statement);
576
577		// Should not receive yet, only one topic matched.
578		assert!(rx1.try_recv().is_err());
579
580		statement.set_topic(1, topic1);
581		subscriptions.notify_matching_filters(&statement);
582
583		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
584		let decoded_statement: Statement =
585			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
586		assert_eq!(decoded_statement, statement);
587	}
588
589	#[test]
590	fn test_notify_match_any_subscribers() {
591		let mut subscriptions = SubscriptionsInfo::new();
592		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
593		let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
594
595		let topic1 = Topic::from([8u8; 32]);
596		let topic2 = Topic::from([9u8; 32]);
597		let sub_info1 = SubscriptionInfo {
598			topic_filter: OptimizedTopicFilter::MatchAny(
599				vec![topic1, topic2].into_iter().collect(),
600			),
601			seq_id: SeqID::from(1),
602			tx: tx1,
603		};
604
605		let sub_info2 = SubscriptionInfo {
606			topic_filter: OptimizedTopicFilter::MatchAny(vec![topic2].into_iter().collect()),
607			seq_id: SeqID::from(2),
608			tx: tx2,
609		};
610
611		subscriptions.subscribe(sub_info1.clone());
612		subscriptions.subscribe(sub_info2.clone());
613
614		let mut statement = signed_statement(1);
615		statement.set_topic(0, topic1);
616		statement.set_topic(1, topic2);
617		subscriptions.notify_match_any_subscribers(&statement);
618
619		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
620		let decoded_statement: Statement =
621			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
622		assert_eq!(decoded_statement, statement);
623
624		let received = unwrap_statement(rx2.try_recv().expect("Should receive statement"));
625		let decoded_statement: Statement =
626			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
627		assert_eq!(decoded_statement, statement);
628	}
629
630	#[tokio::test]
631	async fn test_subscription_handle_with_different_workers_number() {
632		for num_workers in 1..5 {
633			let subscriptions_handle = SubscriptionsHandle::new(
634				Box::new(subsoil::core::testing::TaskExecutor::new()),
635				num_workers,
636			);
637
638			let topic1 = Topic::from([8u8; 32]);
639			let topic2 = Topic::from([9u8; 32]);
640
641			let streams = (0..5)
642				.into_iter()
643				.map(|_| {
644					subscriptions_handle.subscribe(OptimizedTopicFilter::MatchAll(
645						vec![topic1, topic2].into_iter().collect(),
646					))
647				})
648				.collect::<Vec<_>>();
649
650			let mut statement = signed_statement(1);
651			statement.set_topic(0, topic2);
652			subscriptions_handle.notify(statement.clone());
653
654			statement.set_topic(1, topic1);
655			subscriptions_handle.notify(statement.clone());
656
657			for (_tx, mut stream) in streams {
658				let received =
659					unwrap_statement(stream.next().await.expect("Should receive statement"));
660				let decoded_statement: Statement =
661					Statement::decode(&mut &received.0[..]).expect("Should decode statement");
662				assert_eq!(decoded_statement, statement);
663			}
664		}
665	}
666
667	#[tokio::test]
668	async fn test_handle_unsubscribe() {
669		let subscriptions_handle =
670			SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
671
672		let topic1 = Topic::from([8u8; 32]);
673		let topic2 = Topic::from([9u8; 32]);
674
675		let (tx, mut stream) = subscriptions_handle
676			.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
677
678		let mut statement = signed_statement(1);
679		statement.set_topic(0, topic1);
680		statement.set_topic(1, topic2);
681
682		// Send a statement and verify it's received.
683		subscriptions_handle.notify(statement.clone());
684
685		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
686		let decoded_statement: Statement =
687			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
688		assert_eq!(decoded_statement, statement);
689
690		// Drop the stream to trigger unsubscribe.
691		drop(stream);
692
693		// Give some time for the unsubscribe message to be processed.
694		tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
695
696		// Send another statement after unsubscribe.
697		let mut statement2 = signed_statement(2);
698		statement2.set_topic(0, topic1);
699		statement2.set_topic(1, topic2);
700		subscriptions_handle.notify(statement2.clone());
701
702		// The tx channel should be closed/disconnected since the subscription was removed.
703		// Give some time for the notification to potentially arrive (it shouldn't).
704		tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
705
706		// The sender should fail to send since the subscription is gone.
707		// We verify by checking that the tx channel is disconnected.
708		assert!(tx.is_closed(), "Sender should be closed after unsubscribe");
709	}
710
711	#[test]
712	fn test_unsubscribe_nonexistent() {
713		let mut subscriptions = SubscriptionsInfo::new();
714		// Unsubscribing a non-existent subscription should not panic.
715		subscriptions.unsubscribe(SeqID::from(999));
716		// Verify internal state is still valid.
717		assert!(subscriptions.by_sub_id.is_empty());
718		assert!(subscriptions.subscriptions_any.is_empty());
719		assert!(subscriptions.subscriptions_match_all_by_topic.is_empty());
720		assert!(subscriptions.subscriptions_match_any_by_topic.is_empty());
721	}
722
723	#[test]
724	fn test_multiple_subscriptions_same_topic() {
725		let mut subscriptions = SubscriptionsInfo::new();
726
727		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
728		let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
729		let topic1 = Topic::from([8u8; 32]);
730		let topic2 = Topic::from([9u8; 32]);
731
732		let sub_info1 = SubscriptionInfo {
733			topic_filter: OptimizedTopicFilter::MatchAll(
734				vec![topic1, topic2].into_iter().collect(),
735			),
736			seq_id: SeqID::from(1),
737			tx: tx1,
738		};
739		let sub_info2 = SubscriptionInfo {
740			topic_filter: OptimizedTopicFilter::MatchAll(
741				vec![topic1, topic2].into_iter().collect(),
742			),
743			seq_id: SeqID::from(2),
744			tx: tx2,
745		};
746
747		subscriptions.subscribe(sub_info1.clone());
748		subscriptions.subscribe(sub_info2.clone());
749
750		// Both subscriptions should be registered under each topic.
751		assert_eq!(
752			subscriptions
753				.subscriptions_match_all_by_topic
754				.get(&topic1)
755				.unwrap()
756				.iter()
757				.map(|s| s.len())
758				.sum::<usize>(),
759			2
760		);
761		assert_eq!(
762			subscriptions
763				.subscriptions_match_all_by_topic
764				.get(&topic2)
765				.unwrap()
766				.iter()
767				.map(|s| s.len())
768				.sum::<usize>(),
769			2
770		);
771
772		// Send a matching statement.
773		let mut statement = signed_statement(1);
774		statement.set_topic(0, topic1);
775		statement.set_topic(1, topic2);
776		subscriptions.notify_matching_filters(&statement);
777
778		// Both should receive.
779		assert!(rx1.try_recv().is_ok());
780		assert!(rx2.try_recv().is_ok());
781
782		// Unsubscribe one.
783		subscriptions.unsubscribe(sub_info1.seq_id);
784
785		// Only one subscription should remain.
786		assert_eq!(
787			subscriptions
788				.subscriptions_match_all_by_topic
789				.get(&topic1)
790				.unwrap()
791				.iter()
792				.map(|s| s.len())
793				.sum::<usize>(),
794			1
795		);
796		assert_eq!(
797			subscriptions
798				.subscriptions_match_all_by_topic
799				.get(&topic2)
800				.unwrap()
801				.iter()
802				.map(|s| s.len())
803				.sum::<usize>(),
804			1
805		);
806		assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
807		assert!(subscriptions.by_sub_id.contains_key(&sub_info2.seq_id));
808
809		// Send another statement.
810		subscriptions.notify_matching_filters(&statement);
811
812		// Only sub2 should receive.
813		assert!(rx2.try_recv().is_ok());
814		assert!(rx1.try_recv().is_err());
815	}
816
817	#[test]
818	fn test_subscriber_auto_unsubscribe_on_channel_full() {
819		let mut subscriptions = SubscriptionsInfo::new();
820
821		// Create a channel with capacity 1.
822		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(1);
823		let topic1 = Topic::from([8u8; 32]);
824
825		let sub_info1 = SubscriptionInfo {
826			topic_filter: OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()),
827			seq_id: SeqID::from(1),
828			tx: tx1,
829		};
830		subscriptions.subscribe(sub_info1.clone());
831
832		let mut statement = signed_statement(1);
833		statement.set_topic(0, topic1);
834
835		// First notification should succeed.
836		subscriptions.notify_matching_filters(&statement);
837		assert!(rx1.try_recv().is_ok());
838
839		// Fill the channel.
840		subscriptions.notify_matching_filters(&statement);
841		// Channel is now full.
842
843		// Next notification should trigger auto-unsubscribe.
844		subscriptions.notify_matching_filters(&statement);
845
846		// Subscription should be removed.
847		assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
848		assert!(!subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
849	}
850
851	#[test]
852	fn test_match_any_receives_once_per_statement() {
853		let mut subscriptions = SubscriptionsInfo::new();
854
855		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
856		let topic1 = Topic::from([8u8; 32]);
857		let topic2 = Topic::from([9u8; 32]);
858
859		// Subscribe to MatchAny with both topics.
860		let sub_info1 = SubscriptionInfo {
861			topic_filter: OptimizedTopicFilter::MatchAny(
862				vec![topic1, topic2].into_iter().collect(),
863			),
864			seq_id: SeqID::from(1),
865			tx: tx1,
866		};
867		subscriptions.subscribe(sub_info1.clone());
868
869		// Create a statement that matches BOTH topics.
870		let mut statement = signed_statement(1);
871		statement.set_topic(0, topic1);
872		statement.set_topic(1, topic2);
873
874		subscriptions.notify_match_any_subscribers(&statement);
875
876		// Should receive exactly once, not twice.
877		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
878		let decoded_statement: Statement =
879			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
880		assert_eq!(decoded_statement, statement);
881
882		// No more messages.
883		assert!(rx1.try_recv().is_err());
884	}
885
886	#[test]
887	fn test_match_all_with_single_topic_matches_statement_with_two_topics() {
888		let mut subscriptions = SubscriptionsInfo::new();
889
890		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
891		let topic1 = Topic::from([8u8; 32]);
892		let topic2 = Topic::from([9u8; 32]);
893
894		// Subscribe with MatchAll on only topic1.
895		let sub_info1 = SubscriptionInfo {
896			topic_filter: OptimizedTopicFilter::MatchAll(vec![topic1].into_iter().collect()),
897			seq_id: SeqID::from(1),
898			tx: tx1,
899		};
900		subscriptions.subscribe(sub_info1.clone());
901
902		// Create a statement that has BOTH topic1 and topic2.
903		let mut statement = signed_statement(1);
904		statement.set_topic(0, topic1);
905		statement.set_topic(1, topic2);
906
907		subscriptions.notify_matching_filters(&statement);
908
909		// Should receive because the statement contains topic1 (which is the only required topic).
910		let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
911		let decoded_statement: Statement =
912			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
913		assert_eq!(decoded_statement, statement);
914
915		// No more messages.
916		assert!(rx1.try_recv().is_err());
917	}
918
919	#[test]
920	fn test_match_all_no_matching_topics() {
921		let mut subscriptions = SubscriptionsInfo::new();
922
923		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
924		let topic1 = Topic::from([8u8; 32]);
925		let topic2 = Topic::from([9u8; 32]);
926		let topic3 = Topic::from([10u8; 32]);
927
928		let sub_info1 = SubscriptionInfo {
929			topic_filter: OptimizedTopicFilter::MatchAll(
930				vec![topic1, topic2].into_iter().collect(),
931			),
932			seq_id: SeqID::from(1),
933			tx: tx1,
934		};
935		subscriptions.subscribe(sub_info1.clone());
936
937		// Statement with completely different topics.
938		let mut statement = signed_statement(1);
939		statement.set_topic(0, topic3);
940
941		subscriptions.notify_matching_filters(&statement);
942
943		// Should not receive anything.
944		assert!(rx1.try_recv().is_err());
945	}
946
947	#[test]
948	fn test_match_all_with_unsubscribed_topic_first_in_statement() {
949		// This test exposes a bug where `return` is used instead of `continue` in
950		// `notify_match_all_subscribers_best`. When a statement has a topic that has no
951		// subscriptions (not in the map), the function returns early instead of checking
952		// subsequent topic combinations.
953		let mut subscriptions = SubscriptionsInfo::new();
954
955		let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
956		// topic1 will have NO subscriptions
957		let topic1 = Topic::from([1u8; 32]);
958		// topic2 WILL have a subscription
959		let topic2 = Topic::from([2u8; 32]);
960
961		// Subscribe only to topic2 with MatchAll filter.
962		let sub_info1 = SubscriptionInfo {
963			topic_filter: OptimizedTopicFilter::MatchAll(vec![topic2].into_iter().collect()),
964			seq_id: SeqID::from(1),
965			tx: tx1,
966		};
967		subscriptions.subscribe(sub_info1);
968
969		// Create a statement with BOTH topics. topic1 comes first (lower bytes).
970		// When iterating combinations(1), [topic1] is checked before [topic2].
971		// Since topic1 has no subscriptions, the buggy `return` exits early,
972		// preventing the [topic2] combination from being checked.
973		let mut statement = signed_statement(1);
974		statement.set_topic(0, topic1);
975		statement.set_topic(1, topic2);
976
977		subscriptions.notify_match_all_subscribers_best(&statement);
978
979		// With the bug: rx1.try_recv() fails because the function returned early.
980		// With the fix: rx1.try_recv() succeeds because [topic2] combination is checked.
981		let received = unwrap_statement(rx1.try_recv().expect(
982			"Should receive statement - if this fails, the `return` bug in \
983			 notify_match_all_subscribers_best is present (should be `continue`)",
984		));
985		let decoded_statement: Statement =
986			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
987		assert_eq!(decoded_statement, statement);
988	}
989
990	#[tokio::test]
991	async fn test_handle_with_match_any_filter() {
992		let subscriptions_handle =
993			SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
994
995		let topic1 = Topic::from([8u8; 32]);
996		let topic2 = Topic::from([9u8; 32]);
997
998		let (_tx, mut stream) = subscriptions_handle
999			.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1, topic2].into_iter().collect()));
1000
1001		// Statement matching only topic1.
1002		let mut statement1 = signed_statement(1);
1003		statement1.set_topic(0, topic1);
1004		subscriptions_handle.notify(statement1.clone());
1005
1006		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1007		let decoded_statement: Statement =
1008			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1009		assert_eq!(decoded_statement, statement1);
1010
1011		// Statement matching only topic2.
1012		let mut statement2 = signed_statement(2);
1013		statement2.set_topic(0, topic2);
1014		subscriptions_handle.notify(statement2.clone());
1015
1016		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1017		let decoded_statement: Statement =
1018			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1019		assert_eq!(decoded_statement, statement2);
1020	}
1021
1022	#[tokio::test]
1023	async fn test_handle_with_any_filter() {
1024		let subscriptions_handle =
1025			SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
1026
1027		let (_tx, mut stream) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1028
1029		// Send statements with various topics.
1030		let statement1 = signed_statement(1);
1031		subscriptions_handle.notify(statement1.clone());
1032
1033		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1034		let decoded_statement: Statement =
1035			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1036		assert_eq!(decoded_statement, statement1);
1037
1038		let mut statement2 = signed_statement(2);
1039		statement2.set_topic(0, Topic::from([99u8; 32]));
1040		subscriptions_handle.notify(statement2.clone());
1041
1042		let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1043		let decoded_statement: Statement =
1044			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1045		assert_eq!(decoded_statement, statement2);
1046	}
1047
1048	#[tokio::test]
1049	async fn test_handle_multiple_subscribers_different_filters() {
1050		let subscriptions_handle =
1051			SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
1052
1053		let topic1 = Topic::from([8u8; 32]);
1054		let topic2 = Topic::from([9u8; 32]);
1055
1056		// Subscriber 1: MatchAll on topic1 and topic2.
1057		let (_tx1, mut stream1) = subscriptions_handle
1058			.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
1059
1060		// Subscriber 2: MatchAny on topic1.
1061		let (_tx2, mut stream2) = subscriptions_handle
1062			.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()));
1063
1064		// Subscriber 3: Any.
1065		let (_tx3, mut stream3) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1066
1067		// Statement matching only topic1.
1068		let mut statement1 = signed_statement(1);
1069		statement1.set_topic(0, topic1);
1070		subscriptions_handle.notify(statement1.clone());
1071
1072		// stream1 should NOT receive (needs both topics).
1073		// stream2 should receive (MatchAny topic1).
1074		// stream3 should receive (Any).
1075
1076		let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1077		let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1078		assert_eq!(decoded2, statement1);
1079
1080		let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1081		let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1082		assert_eq!(decoded3, statement1);
1083
1084		// Statement matching both topics.
1085		let mut statement2 = signed_statement(2);
1086		statement2.set_topic(0, topic1);
1087		statement2.set_topic(1, topic2);
1088		subscriptions_handle.notify(statement2.clone());
1089
1090		// All should receive.
1091		let received1 = unwrap_statement(stream1.next().await.expect("stream1 should receive"));
1092		let decoded1: Statement = Statement::decode(&mut &received1.0[..]).unwrap();
1093		assert_eq!(decoded1, statement2);
1094
1095		let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1096		let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1097		assert_eq!(decoded2, statement2);
1098
1099		let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1100		let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1101		assert_eq!(decoded3, statement2);
1102	}
1103
1104	#[test]
1105	fn test_statement_without_topics_matches_only_any_filter() {
1106		let mut subscriptions = SubscriptionsInfo::new();
1107
1108		let (tx_match_all, rx_match_all) = async_channel::bounded::<StatementEvent>(10);
1109		let (tx_match_any, rx_match_any) = async_channel::bounded::<StatementEvent>(10);
1110		let (tx_any, rx_any) = async_channel::bounded::<StatementEvent>(10);
1111
1112		let topic1 = Topic::from([8u8; 32]);
1113		let topic2 = Topic::from([9u8; 32]);
1114
1115		// Subscribe with MatchAll filter.
1116		let sub_match_all = SubscriptionInfo {
1117			topic_filter: OptimizedTopicFilter::MatchAll(
1118				vec![topic1, topic2].into_iter().collect(),
1119			),
1120			seq_id: SeqID::from(1),
1121			tx: tx_match_all,
1122		};
1123		subscriptions.subscribe(sub_match_all);
1124
1125		// Subscribe with MatchAny filter.
1126		let sub_match_any = SubscriptionInfo {
1127			topic_filter: OptimizedTopicFilter::MatchAny(
1128				vec![topic1, topic2].into_iter().collect(),
1129			),
1130			seq_id: SeqID::from(2),
1131			tx: tx_match_any,
1132		};
1133		subscriptions.subscribe(sub_match_any);
1134
1135		// Subscribe with Any filter.
1136		let sub_any = SubscriptionInfo {
1137			topic_filter: OptimizedTopicFilter::Any,
1138			seq_id: SeqID::from(3),
1139			tx: tx_any,
1140		};
1141		subscriptions.subscribe(sub_any);
1142
1143		// Create a statement without any topics set.
1144		let statement = signed_statement(1);
1145		assert!(statement.topics().is_empty(), "Statement should have no topics");
1146
1147		// Notify all matching filters.
1148		subscriptions.notify_matching_filters(&statement);
1149
1150		// Any should receive (matches all statements regardless of topics).
1151		let received =
1152			unwrap_statement(rx_any.try_recv().expect("Any filter should receive statement"));
1153		let decoded_statement: Statement =
1154			Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1155		assert_eq!(decoded_statement, statement);
1156
1157		// MatchAll should NOT receive (statement has no topics, filter requires topic1 AND topic2).
1158		assert!(
1159			rx_match_all.try_recv().is_err(),
1160			"MatchAll should not receive statement without topics"
1161		);
1162
1163		// MatchAny should NOT receive (statement has no topics, filter requires topic1 OR topic2).
1164		assert!(
1165			rx_match_any.try_recv().is_err(),
1166			"MatchAny should not receive statement without topics"
1167		);
1168	}
1169}