1const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000;
23
24const SUBSCRIPTION_BUFFER_SIZE: usize = 128;
26
27use futures::{Stream, StreamExt};
28use itertools::Itertools;
29
30use super::LOG_TARGET;
31use soil_client::utils::id_sequence::SeqID;
32pub use soil_statement_store::StatementStore;
33use soil_statement_store::{
34 OptimizedTopicFilter, Result, Statement, StatementEvent, Topic, MAX_TOPICS,
35};
36use std::{
37 collections::{hash_map::Entry, HashMap, HashSet},
38 sync::atomic::AtomicU64,
39};
40use subsoil::core::{traits::SpawnNamed, Bytes, Encode};
41
42pub trait StatementStoreSubscriptionApi: Send + Sync {
44 fn subscribe_statement(
49 &self,
50 topic_filter: OptimizedTopicFilter,
51 ) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>;
52}
53
54#[derive(Clone, Debug)]
56pub enum MatcherMessage {
57 NewStatement(Statement),
59 Subscribe(SubscriptionInfo),
61 Unsubscribe(SeqID),
63}
64
65pub struct SubscriptionsHandle {
67 id_sequence: AtomicU64,
70 matchers: SubscriptionsMatchersHandlers,
72}
73
74impl SubscriptionsHandle {
75 pub(crate) fn new(
77 task_spawner: Box<dyn SpawnNamed>,
78 num_matcher_workers: usize,
79 ) -> SubscriptionsHandle {
80 let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers);
81
82 for task in 0..num_matcher_workers {
83 let (subscription_matcher_sender, subscription_matcher_receiver) =
84 async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
85 subscriptions_matchers_senders.push(subscription_matcher_sender);
86 task_spawner.spawn_blocking(
87 "statement-store-subscription-filters",
88 Some("statement-store"),
89 Box::pin(async move {
90 let mut subscriptions = SubscriptionsInfo::new();
91 log::debug!(
92 target: LOG_TARGET,
93 "Started statement subscription matcher task: {task}"
94 );
95 loop {
96 let res = subscription_matcher_receiver.recv().await;
97 match res {
98 Ok(MatcherMessage::NewStatement(statement)) => {
99 subscriptions.notify_matching_filters(&statement);
100 },
101 Ok(MatcherMessage::Subscribe(info)) => {
102 subscriptions.subscribe(info);
103 },
104 Ok(MatcherMessage::Unsubscribe(seq_id)) => {
105 subscriptions.unsubscribe(seq_id);
106 },
107 Err(_) => {
108 log::error!(
110 target: LOG_TARGET,
111 "Statement subscription matcher channel closed: {task}"
112 );
113 break;
114 },
115 };
116 }
117 }),
118 );
119 }
120 SubscriptionsHandle {
121 id_sequence: AtomicU64::new(0),
122 matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders),
123 }
124 }
125
126 fn next_id(&self) -> SeqID {
128 let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
129 SeqID::from(id)
130 }
131
132 pub(crate) fn subscribe(
134 &self,
135 topic_filter: OptimizedTopicFilter,
136 ) -> (async_channel::Sender<StatementEvent>, SubscriptionStatementsStream) {
137 let next_id = self.next_id();
138 let (tx, rx) = async_channel::bounded(SUBSCRIPTION_BUFFER_SIZE);
139 let subscription_info =
140 SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx };
141
142 let result = (
143 subscription_info.tx.clone(),
144 SubscriptionStatementsStream {
145 rx,
146 sub_id: subscription_info.seq_id,
147 matchers: self.matchers.clone(),
148 },
149 );
150
151 self.matchers
152 .send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info));
153 result
154 }
155
156 pub(crate) fn notify(&self, statement: Statement) {
157 self.matchers.send_all(MatcherMessage::NewStatement(statement));
158 }
159}
160
161struct SubscriptionsInfo {
164 subscriptions_match_all_by_topic:
173 HashMap<Topic, [HashMap<SeqID, SubscriptionInfo>; MAX_TOPICS]>,
174 subscriptions_match_any_by_topic: HashMap<Topic, HashMap<SeqID, SubscriptionInfo>>,
176 subscriptions_any: HashMap<SeqID, SubscriptionInfo>,
178 by_sub_id: HashMap<SeqID, OptimizedTopicFilter>,
180}
181
182#[derive(Clone, Debug)]
184pub(crate) struct SubscriptionInfo {
185 topic_filter: OptimizedTopicFilter,
187 seq_id: SeqID,
189 tx: async_channel::Sender<StatementEvent>,
191}
192
193impl SubscriptionsInfo {
194 fn new() -> SubscriptionsInfo {
195 SubscriptionsInfo {
196 subscriptions_match_all_by_topic: HashMap::new(),
197 subscriptions_match_any_by_topic: HashMap::new(),
198 subscriptions_any: HashMap::new(),
199 by_sub_id: HashMap::new(),
200 }
201 }
202
203 fn subscribe(&mut self, subscription_info: SubscriptionInfo) {
205 self.by_sub_id
206 .insert(subscription_info.seq_id, subscription_info.topic_filter.clone());
207 match &subscription_info.topic_filter {
208 OptimizedTopicFilter::Any => {
209 self.subscriptions_any.insert(subscription_info.seq_id, subscription_info);
210 },
211 OptimizedTopicFilter::MatchAll(topics) => {
212 for topic in topics {
213 self.subscriptions_match_all_by_topic.entry(*topic).or_default()
214 [topics.len() - 1]
215 .insert(subscription_info.seq_id, subscription_info.clone());
216 }
217 },
218 OptimizedTopicFilter::MatchAny(topics) => {
219 for topic in topics {
220 self.subscriptions_match_any_by_topic
221 .entry(*topic)
222 .or_default()
223 .insert(subscription_info.seq_id, subscription_info.clone());
224 }
225 },
226 };
227 }
228
229 fn notify_subscriber(
231 &self,
232 subscription: &SubscriptionInfo,
233 bytes_to_send: Bytes,
234 needs_unsubscribing: &mut HashSet<SeqID>,
235 ) {
236 if let Err(err) = subscription.tx.try_send(StatementEvent::NewStatements {
237 statements: vec![bytes_to_send],
238 remaining: None,
239 }) {
240 log::debug!(
241 target: LOG_TARGET,
242 "Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err
243 );
244 needs_unsubscribing.insert(subscription.seq_id);
247 }
248 }
249
250 fn notify_matching_filters(&mut self, statement: &Statement) {
251 self.notify_match_all_subscribers_best(statement);
252 self.notify_match_any_subscribers(statement);
253 self.notify_any_subscribers(statement);
254 }
255
256 fn notify_match_any_subscribers(&mut self, statement: &Statement) {
258 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
259 let mut already_notified: HashSet<SeqID> = HashSet::new();
260
261 let bytes_to_send: Bytes = statement.encode().into();
262 for statement_topic in statement.topics() {
263 if let Some(subscriptions) = self.subscriptions_match_any_by_topic.get(statement_topic)
264 {
265 for subscription in subscriptions
266 .values()
267 .filter(|subscription| already_notified.insert(subscription.seq_id))
268 {
269 self.notify_subscriber(
270 subscription,
271 bytes_to_send.clone(),
272 &mut needs_unsubscribing,
273 );
274 }
275 }
276 }
277
278 for sub_id in needs_unsubscribing {
281 self.unsubscribe(sub_id);
282 }
283 }
284
285 fn notify_match_all_subscribers_best(&mut self, statement: &Statement) {
287 let bytes_to_send: Bytes = statement.encode().into();
288 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
289 let num_topics = statement.topics().len();
290
291 for num_topics_to_check in 1..=num_topics {
294 for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
295 let Some(Some(topic_with_fewest)) = topics_combination
297 .iter()
298 .map(|topic| self.subscriptions_match_all_by_topic.get(*topic))
299 .min_by_key(|subscriptions| {
300 subscriptions.map_or(0, |subscriptions_by_length| {
301 subscriptions_by_length[num_topics_to_check - 1].len()
302 })
303 })
304 else {
305 continue;
306 };
307
308 for subscription in topic_with_fewest[num_topics_to_check - 1]
309 .values()
310 .filter(|subscription| subscription.topic_filter.matches(statement))
311 {
312 self.notify_subscriber(
313 subscription,
314 bytes_to_send.clone(),
315 &mut needs_unsubscribing,
316 );
317 }
318 }
319 }
320 for sub_id in needs_unsubscribing {
323 self.unsubscribe(sub_id);
324 }
325 }
326
327 fn notify_any_subscribers(&mut self, statement: &Statement) {
329 let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
330
331 let bytes_to_send: Bytes = statement.encode().into();
332 for subscription in self.subscriptions_any.values() {
333 self.notify_subscriber(subscription, bytes_to_send.clone(), &mut needs_unsubscribing);
334 }
335
336 for sub_id in needs_unsubscribing {
339 self.unsubscribe(sub_id);
340 }
341 }
342
343 fn unsubscribe(&mut self, id: SeqID) {
345 let Some(entry) = self.by_sub_id.remove(&id) else {
346 return;
347 };
348
349 let topics = match &entry {
350 OptimizedTopicFilter::Any => {
351 self.subscriptions_any.remove(&id);
352 return;
353 },
354 OptimizedTopicFilter::MatchAll(topics) => topics,
355 OptimizedTopicFilter::MatchAny(topics) => topics,
356 };
357
358 for topic in topics {
360 if let Entry::Occupied(mut entry) = self.subscriptions_match_any_by_topic.entry(*topic)
362 {
363 entry.get_mut().remove(&id);
364 if entry.get().is_empty() {
365 entry.remove();
366 }
367 }
368 if let Entry::Occupied(mut entry) = self.subscriptions_match_all_by_topic.entry(*topic)
370 {
371 for subscriptions in entry.get_mut().iter_mut() {
372 if subscriptions.remove(&id).is_some() {
373 break;
374 }
375 }
376 if entry.get().iter().all(|s| s.is_empty()) {
377 entry.remove();
378 }
379 }
380 }
381 }
382}
383
384#[derive(Clone)]
386pub struct SubscriptionsMatchersHandlers {
387 matchers: Vec<async_channel::Sender<MatcherMessage>>,
389}
390
391impl SubscriptionsMatchersHandlers {
392 fn new(matchers: Vec<async_channel::Sender<MatcherMessage>>) -> SubscriptionsMatchersHandlers {
394 SubscriptionsMatchersHandlers { matchers }
395 }
396
397 fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) {
399 let index: u64 = id.into();
400 if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message)
403 {
404 log::error!(
405 target: LOG_TARGET,
406 "Failed to send statement to matcher task: {:?}", err
407 );
408 }
409 }
410
411 fn send_all(&self, message: MatcherMessage) {
413 for sender in &self.matchers {
414 if let Err(err) = sender.send_blocking(message.clone()) {
415 log::error!(
416 target: LOG_TARGET,
417 "Failed to send message to matcher task: {:?}", err
418 );
419 }
420 }
421 }
422}
423
424pub struct SubscriptionStatementsStream {
426 pub rx: async_channel::Receiver<StatementEvent>,
428 sub_id: SeqID,
430 matchers: SubscriptionsMatchersHandlers,
432}
433
434impl Drop for SubscriptionStatementsStream {
436 fn drop(&mut self) {
437 self.matchers
438 .send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id));
439 }
440}
441
442impl Stream for SubscriptionStatementsStream {
443 type Item = StatementEvent;
444
445 fn poll_next(
446 mut self: std::pin::Pin<&mut Self>,
447 cx: &mut std::task::Context<'_>,
448 ) -> std::task::Poll<Option<Self::Item>> {
449 self.rx.poll_next_unpin(cx)
450 }
451}
452
453#[cfg(test)]
454mod tests {
455
456 use super::super::tests::signed_statement;
457
458 use super::*;
459 use soil_statement_store::Topic;
460 use subsoil::core::Decode;
461
462 fn unwrap_statement(item: StatementEvent) -> Bytes {
463 match item {
464 StatementEvent::NewStatements { mut statements, .. } => {
465 assert_eq!(statements.len(), 1, "Expected exactly one statement in batch");
466 statements.remove(0)
467 },
468 }
469 }
470 #[test]
471 fn test_subscribe_unsubscribe() {
472 let mut subscriptions = SubscriptionsInfo::new();
473
474 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
475 let topic1 = Topic::from([8u8; 32]);
476 let topic2 = Topic::from([9u8; 32]);
477 let sub_info1 = SubscriptionInfo {
478 topic_filter: OptimizedTopicFilter::MatchAll(
479 vec![topic1, topic2].into_iter().collect(),
480 ),
481 seq_id: SeqID::from(1),
482 tx: tx1,
483 };
484 subscriptions.subscribe(sub_info1.clone());
485 assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
486 assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
487 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
488 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
489
490 subscriptions.unsubscribe(sub_info1.seq_id);
491 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
492 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
493 }
494
495 #[test]
496 fn test_subscribe_any() {
497 let mut subscriptions = SubscriptionsInfo::new();
498 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
499 let sub_info1 = SubscriptionInfo {
500 topic_filter: OptimizedTopicFilter::Any,
501 seq_id: SeqID::from(1),
502 tx: tx1,
503 };
504 subscriptions.subscribe(sub_info1.clone());
505 assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
506 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
507 subscriptions.unsubscribe(sub_info1.seq_id);
508 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
509 }
510
511 #[test]
512 fn test_subscribe_match_any() {
513 let mut subscriptions = SubscriptionsInfo::new();
514
515 let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
516 let topic1 = Topic::from([8u8; 32]);
517 let topic2 = Topic::from([9u8; 32]);
518 let sub_info1 = SubscriptionInfo {
519 topic_filter: OptimizedTopicFilter::MatchAny(
520 vec![topic1, topic2].into_iter().collect(),
521 ),
522 seq_id: SeqID::from(1),
523 tx: tx1,
524 };
525 subscriptions.subscribe(sub_info1.clone());
526 assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
527 assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic2));
528 assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
529 assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
530
531 subscriptions.unsubscribe(sub_info1.seq_id);
532 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
533 assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
534 }
535
536 #[test]
537 fn test_notify_any_subscribers() {
538 let mut subscriptions = SubscriptionsInfo::new();
539
540 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
541 let sub_info1 = SubscriptionInfo {
542 topic_filter: OptimizedTopicFilter::Any,
543 seq_id: SeqID::from(1),
544 tx: tx1,
545 };
546 subscriptions.subscribe(sub_info1.clone());
547
548 let statement = signed_statement(1);
549 subscriptions.notify_matching_filters(&statement);
550
551 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
552 let decoded_statement: Statement =
553 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
554 assert_eq!(decoded_statement, statement);
555 }
556
557 #[test]
558 fn test_notify_match_all_subscribers() {
559 let mut subscriptions = SubscriptionsInfo::new();
560
561 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
562 let topic1 = Topic::from([8u8; 32]);
563 let topic2 = Topic::from([9u8; 32]);
564 let sub_info1 = SubscriptionInfo {
565 topic_filter: OptimizedTopicFilter::MatchAll(
566 vec![topic1, topic2].into_iter().collect(),
567 ),
568 seq_id: SeqID::from(1),
569 tx: tx1,
570 };
571 subscriptions.subscribe(sub_info1.clone());
572
573 let mut statement = signed_statement(1);
574 statement.set_topic(0, topic2);
575 subscriptions.notify_matching_filters(&statement);
576
577 assert!(rx1.try_recv().is_err());
579
580 statement.set_topic(1, topic1);
581 subscriptions.notify_matching_filters(&statement);
582
583 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
584 let decoded_statement: Statement =
585 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
586 assert_eq!(decoded_statement, statement);
587 }
588
589 #[test]
590 fn test_notify_match_any_subscribers() {
591 let mut subscriptions = SubscriptionsInfo::new();
592 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
593 let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
594
595 let topic1 = Topic::from([8u8; 32]);
596 let topic2 = Topic::from([9u8; 32]);
597 let sub_info1 = SubscriptionInfo {
598 topic_filter: OptimizedTopicFilter::MatchAny(
599 vec![topic1, topic2].into_iter().collect(),
600 ),
601 seq_id: SeqID::from(1),
602 tx: tx1,
603 };
604
605 let sub_info2 = SubscriptionInfo {
606 topic_filter: OptimizedTopicFilter::MatchAny(vec![topic2].into_iter().collect()),
607 seq_id: SeqID::from(2),
608 tx: tx2,
609 };
610
611 subscriptions.subscribe(sub_info1.clone());
612 subscriptions.subscribe(sub_info2.clone());
613
614 let mut statement = signed_statement(1);
615 statement.set_topic(0, topic1);
616 statement.set_topic(1, topic2);
617 subscriptions.notify_match_any_subscribers(&statement);
618
619 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
620 let decoded_statement: Statement =
621 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
622 assert_eq!(decoded_statement, statement);
623
624 let received = unwrap_statement(rx2.try_recv().expect("Should receive statement"));
625 let decoded_statement: Statement =
626 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
627 assert_eq!(decoded_statement, statement);
628 }
629
630 #[tokio::test]
631 async fn test_subscription_handle_with_different_workers_number() {
632 for num_workers in 1..5 {
633 let subscriptions_handle = SubscriptionsHandle::new(
634 Box::new(subsoil::core::testing::TaskExecutor::new()),
635 num_workers,
636 );
637
638 let topic1 = Topic::from([8u8; 32]);
639 let topic2 = Topic::from([9u8; 32]);
640
641 let streams = (0..5)
642 .into_iter()
643 .map(|_| {
644 subscriptions_handle.subscribe(OptimizedTopicFilter::MatchAll(
645 vec![topic1, topic2].into_iter().collect(),
646 ))
647 })
648 .collect::<Vec<_>>();
649
650 let mut statement = signed_statement(1);
651 statement.set_topic(0, topic2);
652 subscriptions_handle.notify(statement.clone());
653
654 statement.set_topic(1, topic1);
655 subscriptions_handle.notify(statement.clone());
656
657 for (_tx, mut stream) in streams {
658 let received =
659 unwrap_statement(stream.next().await.expect("Should receive statement"));
660 let decoded_statement: Statement =
661 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
662 assert_eq!(decoded_statement, statement);
663 }
664 }
665 }
666
667 #[tokio::test]
668 async fn test_handle_unsubscribe() {
669 let subscriptions_handle =
670 SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
671
672 let topic1 = Topic::from([8u8; 32]);
673 let topic2 = Topic::from([9u8; 32]);
674
675 let (tx, mut stream) = subscriptions_handle
676 .subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
677
678 let mut statement = signed_statement(1);
679 statement.set_topic(0, topic1);
680 statement.set_topic(1, topic2);
681
682 subscriptions_handle.notify(statement.clone());
684
685 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
686 let decoded_statement: Statement =
687 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
688 assert_eq!(decoded_statement, statement);
689
690 drop(stream);
692
693 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
695
696 let mut statement2 = signed_statement(2);
698 statement2.set_topic(0, topic1);
699 statement2.set_topic(1, topic2);
700 subscriptions_handle.notify(statement2.clone());
701
702 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
705
706 assert!(tx.is_closed(), "Sender should be closed after unsubscribe");
709 }
710
711 #[test]
712 fn test_unsubscribe_nonexistent() {
713 let mut subscriptions = SubscriptionsInfo::new();
714 subscriptions.unsubscribe(SeqID::from(999));
716 assert!(subscriptions.by_sub_id.is_empty());
718 assert!(subscriptions.subscriptions_any.is_empty());
719 assert!(subscriptions.subscriptions_match_all_by_topic.is_empty());
720 assert!(subscriptions.subscriptions_match_any_by_topic.is_empty());
721 }
722
723 #[test]
724 fn test_multiple_subscriptions_same_topic() {
725 let mut subscriptions = SubscriptionsInfo::new();
726
727 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
728 let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
729 let topic1 = Topic::from([8u8; 32]);
730 let topic2 = Topic::from([9u8; 32]);
731
732 let sub_info1 = SubscriptionInfo {
733 topic_filter: OptimizedTopicFilter::MatchAll(
734 vec![topic1, topic2].into_iter().collect(),
735 ),
736 seq_id: SeqID::from(1),
737 tx: tx1,
738 };
739 let sub_info2 = SubscriptionInfo {
740 topic_filter: OptimizedTopicFilter::MatchAll(
741 vec![topic1, topic2].into_iter().collect(),
742 ),
743 seq_id: SeqID::from(2),
744 tx: tx2,
745 };
746
747 subscriptions.subscribe(sub_info1.clone());
748 subscriptions.subscribe(sub_info2.clone());
749
750 assert_eq!(
752 subscriptions
753 .subscriptions_match_all_by_topic
754 .get(&topic1)
755 .unwrap()
756 .iter()
757 .map(|s| s.len())
758 .sum::<usize>(),
759 2
760 );
761 assert_eq!(
762 subscriptions
763 .subscriptions_match_all_by_topic
764 .get(&topic2)
765 .unwrap()
766 .iter()
767 .map(|s| s.len())
768 .sum::<usize>(),
769 2
770 );
771
772 let mut statement = signed_statement(1);
774 statement.set_topic(0, topic1);
775 statement.set_topic(1, topic2);
776 subscriptions.notify_matching_filters(&statement);
777
778 assert!(rx1.try_recv().is_ok());
780 assert!(rx2.try_recv().is_ok());
781
782 subscriptions.unsubscribe(sub_info1.seq_id);
784
785 assert_eq!(
787 subscriptions
788 .subscriptions_match_all_by_topic
789 .get(&topic1)
790 .unwrap()
791 .iter()
792 .map(|s| s.len())
793 .sum::<usize>(),
794 1
795 );
796 assert_eq!(
797 subscriptions
798 .subscriptions_match_all_by_topic
799 .get(&topic2)
800 .unwrap()
801 .iter()
802 .map(|s| s.len())
803 .sum::<usize>(),
804 1
805 );
806 assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
807 assert!(subscriptions.by_sub_id.contains_key(&sub_info2.seq_id));
808
809 subscriptions.notify_matching_filters(&statement);
811
812 assert!(rx2.try_recv().is_ok());
814 assert!(rx1.try_recv().is_err());
815 }
816
817 #[test]
818 fn test_subscriber_auto_unsubscribe_on_channel_full() {
819 let mut subscriptions = SubscriptionsInfo::new();
820
821 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(1);
823 let topic1 = Topic::from([8u8; 32]);
824
825 let sub_info1 = SubscriptionInfo {
826 topic_filter: OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()),
827 seq_id: SeqID::from(1),
828 tx: tx1,
829 };
830 subscriptions.subscribe(sub_info1.clone());
831
832 let mut statement = signed_statement(1);
833 statement.set_topic(0, topic1);
834
835 subscriptions.notify_matching_filters(&statement);
837 assert!(rx1.try_recv().is_ok());
838
839 subscriptions.notify_matching_filters(&statement);
841 subscriptions.notify_matching_filters(&statement);
845
846 assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
848 assert!(!subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
849 }
850
851 #[test]
852 fn test_match_any_receives_once_per_statement() {
853 let mut subscriptions = SubscriptionsInfo::new();
854
855 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
856 let topic1 = Topic::from([8u8; 32]);
857 let topic2 = Topic::from([9u8; 32]);
858
859 let sub_info1 = SubscriptionInfo {
861 topic_filter: OptimizedTopicFilter::MatchAny(
862 vec![topic1, topic2].into_iter().collect(),
863 ),
864 seq_id: SeqID::from(1),
865 tx: tx1,
866 };
867 subscriptions.subscribe(sub_info1.clone());
868
869 let mut statement = signed_statement(1);
871 statement.set_topic(0, topic1);
872 statement.set_topic(1, topic2);
873
874 subscriptions.notify_match_any_subscribers(&statement);
875
876 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
878 let decoded_statement: Statement =
879 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
880 assert_eq!(decoded_statement, statement);
881
882 assert!(rx1.try_recv().is_err());
884 }
885
886 #[test]
887 fn test_match_all_with_single_topic_matches_statement_with_two_topics() {
888 let mut subscriptions = SubscriptionsInfo::new();
889
890 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
891 let topic1 = Topic::from([8u8; 32]);
892 let topic2 = Topic::from([9u8; 32]);
893
894 let sub_info1 = SubscriptionInfo {
896 topic_filter: OptimizedTopicFilter::MatchAll(vec![topic1].into_iter().collect()),
897 seq_id: SeqID::from(1),
898 tx: tx1,
899 };
900 subscriptions.subscribe(sub_info1.clone());
901
902 let mut statement = signed_statement(1);
904 statement.set_topic(0, topic1);
905 statement.set_topic(1, topic2);
906
907 subscriptions.notify_matching_filters(&statement);
908
909 let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
911 let decoded_statement: Statement =
912 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
913 assert_eq!(decoded_statement, statement);
914
915 assert!(rx1.try_recv().is_err());
917 }
918
919 #[test]
920 fn test_match_all_no_matching_topics() {
921 let mut subscriptions = SubscriptionsInfo::new();
922
923 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
924 let topic1 = Topic::from([8u8; 32]);
925 let topic2 = Topic::from([9u8; 32]);
926 let topic3 = Topic::from([10u8; 32]);
927
928 let sub_info1 = SubscriptionInfo {
929 topic_filter: OptimizedTopicFilter::MatchAll(
930 vec![topic1, topic2].into_iter().collect(),
931 ),
932 seq_id: SeqID::from(1),
933 tx: tx1,
934 };
935 subscriptions.subscribe(sub_info1.clone());
936
937 let mut statement = signed_statement(1);
939 statement.set_topic(0, topic3);
940
941 subscriptions.notify_matching_filters(&statement);
942
943 assert!(rx1.try_recv().is_err());
945 }
946
947 #[test]
948 fn test_match_all_with_unsubscribed_topic_first_in_statement() {
949 let mut subscriptions = SubscriptionsInfo::new();
954
955 let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
956 let topic1 = Topic::from([1u8; 32]);
958 let topic2 = Topic::from([2u8; 32]);
960
961 let sub_info1 = SubscriptionInfo {
963 topic_filter: OptimizedTopicFilter::MatchAll(vec![topic2].into_iter().collect()),
964 seq_id: SeqID::from(1),
965 tx: tx1,
966 };
967 subscriptions.subscribe(sub_info1);
968
969 let mut statement = signed_statement(1);
974 statement.set_topic(0, topic1);
975 statement.set_topic(1, topic2);
976
977 subscriptions.notify_match_all_subscribers_best(&statement);
978
979 let received = unwrap_statement(rx1.try_recv().expect(
982 "Should receive statement - if this fails, the `return` bug in \
983 notify_match_all_subscribers_best is present (should be `continue`)",
984 ));
985 let decoded_statement: Statement =
986 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
987 assert_eq!(decoded_statement, statement);
988 }
989
990 #[tokio::test]
991 async fn test_handle_with_match_any_filter() {
992 let subscriptions_handle =
993 SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
994
995 let topic1 = Topic::from([8u8; 32]);
996 let topic2 = Topic::from([9u8; 32]);
997
998 let (_tx, mut stream) = subscriptions_handle
999 .subscribe(OptimizedTopicFilter::MatchAny(vec![topic1, topic2].into_iter().collect()));
1000
1001 let mut statement1 = signed_statement(1);
1003 statement1.set_topic(0, topic1);
1004 subscriptions_handle.notify(statement1.clone());
1005
1006 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1007 let decoded_statement: Statement =
1008 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1009 assert_eq!(decoded_statement, statement1);
1010
1011 let mut statement2 = signed_statement(2);
1013 statement2.set_topic(0, topic2);
1014 subscriptions_handle.notify(statement2.clone());
1015
1016 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1017 let decoded_statement: Statement =
1018 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1019 assert_eq!(decoded_statement, statement2);
1020 }
1021
1022 #[tokio::test]
1023 async fn test_handle_with_any_filter() {
1024 let subscriptions_handle =
1025 SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
1026
1027 let (_tx, mut stream) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1028
1029 let statement1 = signed_statement(1);
1031 subscriptions_handle.notify(statement1.clone());
1032
1033 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1034 let decoded_statement: Statement =
1035 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1036 assert_eq!(decoded_statement, statement1);
1037
1038 let mut statement2 = signed_statement(2);
1039 statement2.set_topic(0, Topic::from([99u8; 32]));
1040 subscriptions_handle.notify(statement2.clone());
1041
1042 let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
1043 let decoded_statement: Statement =
1044 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1045 assert_eq!(decoded_statement, statement2);
1046 }
1047
1048 #[tokio::test]
1049 async fn test_handle_multiple_subscribers_different_filters() {
1050 let subscriptions_handle =
1051 SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
1052
1053 let topic1 = Topic::from([8u8; 32]);
1054 let topic2 = Topic::from([9u8; 32]);
1055
1056 let (_tx1, mut stream1) = subscriptions_handle
1058 .subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
1059
1060 let (_tx2, mut stream2) = subscriptions_handle
1062 .subscribe(OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()));
1063
1064 let (_tx3, mut stream3) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
1066
1067 let mut statement1 = signed_statement(1);
1069 statement1.set_topic(0, topic1);
1070 subscriptions_handle.notify(statement1.clone());
1071
1072 let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1077 let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1078 assert_eq!(decoded2, statement1);
1079
1080 let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1081 let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1082 assert_eq!(decoded3, statement1);
1083
1084 let mut statement2 = signed_statement(2);
1086 statement2.set_topic(0, topic1);
1087 statement2.set_topic(1, topic2);
1088 subscriptions_handle.notify(statement2.clone());
1089
1090 let received1 = unwrap_statement(stream1.next().await.expect("stream1 should receive"));
1092 let decoded1: Statement = Statement::decode(&mut &received1.0[..]).unwrap();
1093 assert_eq!(decoded1, statement2);
1094
1095 let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
1096 let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
1097 assert_eq!(decoded2, statement2);
1098
1099 let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
1100 let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
1101 assert_eq!(decoded3, statement2);
1102 }
1103
1104 #[test]
1105 fn test_statement_without_topics_matches_only_any_filter() {
1106 let mut subscriptions = SubscriptionsInfo::new();
1107
1108 let (tx_match_all, rx_match_all) = async_channel::bounded::<StatementEvent>(10);
1109 let (tx_match_any, rx_match_any) = async_channel::bounded::<StatementEvent>(10);
1110 let (tx_any, rx_any) = async_channel::bounded::<StatementEvent>(10);
1111
1112 let topic1 = Topic::from([8u8; 32]);
1113 let topic2 = Topic::from([9u8; 32]);
1114
1115 let sub_match_all = SubscriptionInfo {
1117 topic_filter: OptimizedTopicFilter::MatchAll(
1118 vec![topic1, topic2].into_iter().collect(),
1119 ),
1120 seq_id: SeqID::from(1),
1121 tx: tx_match_all,
1122 };
1123 subscriptions.subscribe(sub_match_all);
1124
1125 let sub_match_any = SubscriptionInfo {
1127 topic_filter: OptimizedTopicFilter::MatchAny(
1128 vec![topic1, topic2].into_iter().collect(),
1129 ),
1130 seq_id: SeqID::from(2),
1131 tx: tx_match_any,
1132 };
1133 subscriptions.subscribe(sub_match_any);
1134
1135 let sub_any = SubscriptionInfo {
1137 topic_filter: OptimizedTopicFilter::Any,
1138 seq_id: SeqID::from(3),
1139 tx: tx_any,
1140 };
1141 subscriptions.subscribe(sub_any);
1142
1143 let statement = signed_statement(1);
1145 assert!(statement.topics().is_empty(), "Statement should have no topics");
1146
1147 subscriptions.notify_matching_filters(&statement);
1149
1150 let received =
1152 unwrap_statement(rx_any.try_recv().expect("Any filter should receive statement"));
1153 let decoded_statement: Statement =
1154 Statement::decode(&mut &received.0[..]).expect("Should decode statement");
1155 assert_eq!(decoded_statement, statement);
1156
1157 assert!(
1159 rx_match_all.try_recv().is_err(),
1160 "MatchAll should not receive statement without topics"
1161 );
1162
1163 assert!(
1165 rx_match_any.try_recv().is_err(),
1166 "MatchAny should not receive statement without topics"
1167 );
1168 }
1169}