const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000;
const SUBSCRIPTION_BUFFER_SIZE: usize = 128;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use super::LOG_TARGET;
use soil_client::utils::id_sequence::SeqID;
pub use soil_statement_store::StatementStore;
use soil_statement_store::{
OptimizedTopicFilter, Result, Statement, StatementEvent, Topic, MAX_TOPICS,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use subsoil::core::{traits::SpawnNamed, Bytes, Encode};
pub trait StatementStoreSubscriptionApi: Send + Sync {
fn subscribe_statement(
&self,
topic_filter: OptimizedTopicFilter,
) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>;
}
#[derive(Clone, Debug)]
pub enum MatcherMessage {
NewStatement(Statement),
Subscribe(SubscriptionInfo),
Unsubscribe(SeqID),
}
pub struct SubscriptionsHandle {
id_sequence: AtomicU64,
matchers: SubscriptionsMatchersHandlers,
}
impl SubscriptionsHandle {
pub(crate) fn new(
task_spawner: Box<dyn SpawnNamed>,
num_matcher_workers: usize,
) -> SubscriptionsHandle {
let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers);
for task in 0..num_matcher_workers {
let (subscription_matcher_sender, subscription_matcher_receiver) =
async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
subscriptions_matchers_senders.push(subscription_matcher_sender);
task_spawner.spawn_blocking(
"statement-store-subscription-filters",
Some("statement-store"),
Box::pin(async move {
let mut subscriptions = SubscriptionsInfo::new();
log::debug!(
target: LOG_TARGET,
"Started statement subscription matcher task: {task}"
);
loop {
let res = subscription_matcher_receiver.recv().await;
match res {
Ok(MatcherMessage::NewStatement(statement)) => {
subscriptions.notify_matching_filters(&statement);
},
Ok(MatcherMessage::Subscribe(info)) => {
subscriptions.subscribe(info);
},
Ok(MatcherMessage::Unsubscribe(seq_id)) => {
subscriptions.unsubscribe(seq_id);
},
Err(_) => {
log::error!(
target: LOG_TARGET,
"Statement subscription matcher channel closed: {task}"
);
break;
},
};
}
}),
);
}
SubscriptionsHandle {
id_sequence: AtomicU64::new(0),
matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders),
}
}
fn next_id(&self) -> SeqID {
let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
SeqID::from(id)
}
pub(crate) fn subscribe(
&self,
topic_filter: OptimizedTopicFilter,
) -> (async_channel::Sender<StatementEvent>, SubscriptionStatementsStream) {
let next_id = self.next_id();
let (tx, rx) = async_channel::bounded(SUBSCRIPTION_BUFFER_SIZE);
let subscription_info =
SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx };
let result = (
subscription_info.tx.clone(),
SubscriptionStatementsStream {
rx,
sub_id: subscription_info.seq_id,
matchers: self.matchers.clone(),
},
);
self.matchers
.send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info));
result
}
pub(crate) fn notify(&self, statement: Statement) {
self.matchers.send_all(MatcherMessage::NewStatement(statement));
}
}
struct SubscriptionsInfo {
subscriptions_match_all_by_topic:
HashMap<Topic, [HashMap<SeqID, SubscriptionInfo>; MAX_TOPICS]>,
subscriptions_match_any_by_topic: HashMap<Topic, HashMap<SeqID, SubscriptionInfo>>,
subscriptions_any: HashMap<SeqID, SubscriptionInfo>,
by_sub_id: HashMap<SeqID, OptimizedTopicFilter>,
}
#[derive(Clone, Debug)]
pub(crate) struct SubscriptionInfo {
topic_filter: OptimizedTopicFilter,
seq_id: SeqID,
tx: async_channel::Sender<StatementEvent>,
}
impl SubscriptionsInfo {
fn new() -> SubscriptionsInfo {
SubscriptionsInfo {
subscriptions_match_all_by_topic: HashMap::new(),
subscriptions_match_any_by_topic: HashMap::new(),
subscriptions_any: HashMap::new(),
by_sub_id: HashMap::new(),
}
}
fn subscribe(&mut self, subscription_info: SubscriptionInfo) {
self.by_sub_id
.insert(subscription_info.seq_id, subscription_info.topic_filter.clone());
match &subscription_info.topic_filter {
OptimizedTopicFilter::Any => {
self.subscriptions_any.insert(subscription_info.seq_id, subscription_info);
},
OptimizedTopicFilter::MatchAll(topics) => {
for topic in topics {
self.subscriptions_match_all_by_topic.entry(*topic).or_default()
[topics.len() - 1]
.insert(subscription_info.seq_id, subscription_info.clone());
}
},
OptimizedTopicFilter::MatchAny(topics) => {
for topic in topics {
self.subscriptions_match_any_by_topic
.entry(*topic)
.or_default()
.insert(subscription_info.seq_id, subscription_info.clone());
}
},
};
}
fn notify_subscriber(
&self,
subscription: &SubscriptionInfo,
bytes_to_send: Bytes,
needs_unsubscribing: &mut HashSet<SeqID>,
) {
if let Err(err) = subscription.tx.try_send(StatementEvent::NewStatements {
statements: vec![bytes_to_send],
remaining: None,
}) {
log::debug!(
target: LOG_TARGET,
"Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err
);
needs_unsubscribing.insert(subscription.seq_id);
}
}
fn notify_matching_filters(&mut self, statement: &Statement) {
self.notify_match_all_subscribers_best(statement);
self.notify_match_any_subscribers(statement);
self.notify_any_subscribers(statement);
}
fn notify_match_any_subscribers(&mut self, statement: &Statement) {
let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
let mut already_notified: HashSet<SeqID> = HashSet::new();
let bytes_to_send: Bytes = statement.encode().into();
for statement_topic in statement.topics() {
if let Some(subscriptions) = self.subscriptions_match_any_by_topic.get(statement_topic)
{
for subscription in subscriptions
.values()
.filter(|subscription| already_notified.insert(subscription.seq_id))
{
self.notify_subscriber(
subscription,
bytes_to_send.clone(),
&mut needs_unsubscribing,
);
}
}
}
for sub_id in needs_unsubscribing {
self.unsubscribe(sub_id);
}
}
fn notify_match_all_subscribers_best(&mut self, statement: &Statement) {
let bytes_to_send: Bytes = statement.encode().into();
let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
let num_topics = statement.topics().len();
for num_topics_to_check in 1..=num_topics {
for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
let Some(Some(topic_with_fewest)) = topics_combination
.iter()
.map(|topic| self.subscriptions_match_all_by_topic.get(*topic))
.min_by_key(|subscriptions| {
subscriptions.map_or(0, |subscriptions_by_length| {
subscriptions_by_length[num_topics_to_check - 1].len()
})
})
else {
continue;
};
for subscription in topic_with_fewest[num_topics_to_check - 1]
.values()
.filter(|subscription| subscription.topic_filter.matches(statement))
{
self.notify_subscriber(
subscription,
bytes_to_send.clone(),
&mut needs_unsubscribing,
);
}
}
}
for sub_id in needs_unsubscribing {
self.unsubscribe(sub_id);
}
}
fn notify_any_subscribers(&mut self, statement: &Statement) {
let mut needs_unsubscribing: HashSet<SeqID> = HashSet::new();
let bytes_to_send: Bytes = statement.encode().into();
for subscription in self.subscriptions_any.values() {
self.notify_subscriber(subscription, bytes_to_send.clone(), &mut needs_unsubscribing);
}
for sub_id in needs_unsubscribing {
self.unsubscribe(sub_id);
}
}
fn unsubscribe(&mut self, id: SeqID) {
let Some(entry) = self.by_sub_id.remove(&id) else {
return;
};
let topics = match &entry {
OptimizedTopicFilter::Any => {
self.subscriptions_any.remove(&id);
return;
},
OptimizedTopicFilter::MatchAll(topics) => topics,
OptimizedTopicFilter::MatchAny(topics) => topics,
};
for topic in topics {
if let Entry::Occupied(mut entry) = self.subscriptions_match_any_by_topic.entry(*topic)
{
entry.get_mut().remove(&id);
if entry.get().is_empty() {
entry.remove();
}
}
if let Entry::Occupied(mut entry) = self.subscriptions_match_all_by_topic.entry(*topic)
{
for subscriptions in entry.get_mut().iter_mut() {
if subscriptions.remove(&id).is_some() {
break;
}
}
if entry.get().iter().all(|s| s.is_empty()) {
entry.remove();
}
}
}
}
}
#[derive(Clone)]
pub struct SubscriptionsMatchersHandlers {
matchers: Vec<async_channel::Sender<MatcherMessage>>,
}
impl SubscriptionsMatchersHandlers {
fn new(matchers: Vec<async_channel::Sender<MatcherMessage>>) -> SubscriptionsMatchersHandlers {
SubscriptionsMatchersHandlers { matchers }
}
fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) {
let index: u64 = id.into();
if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message)
{
log::error!(
target: LOG_TARGET,
"Failed to send statement to matcher task: {:?}", err
);
}
}
fn send_all(&self, message: MatcherMessage) {
for sender in &self.matchers {
if let Err(err) = sender.send_blocking(message.clone()) {
log::error!(
target: LOG_TARGET,
"Failed to send message to matcher task: {:?}", err
);
}
}
}
}
pub struct SubscriptionStatementsStream {
pub rx: async_channel::Receiver<StatementEvent>,
sub_id: SeqID,
matchers: SubscriptionsMatchersHandlers,
}
impl Drop for SubscriptionStatementsStream {
fn drop(&mut self) {
self.matchers
.send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id));
}
}
impl Stream for SubscriptionStatementsStream {
type Item = StatementEvent;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.rx.poll_next_unpin(cx)
}
}
#[cfg(test)]
mod tests {
use super::super::tests::signed_statement;
use super::*;
use soil_statement_store::Topic;
use subsoil::core::Decode;
fn unwrap_statement(item: StatementEvent) -> Bytes {
match item {
StatementEvent::NewStatements { mut statements, .. } => {
assert_eq!(statements.len(), 1, "Expected exactly one statement in batch");
statements.remove(0)
},
}
}
#[test]
fn test_subscribe_unsubscribe() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
assert!(subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
subscriptions.unsubscribe(sub_info1.seq_id);
assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
}
#[test]
fn test_subscribe_any() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::Any,
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
subscriptions.unsubscribe(sub_info1.seq_id);
assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
}
#[test]
fn test_subscribe_match_any() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, _rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
assert!(subscriptions.subscriptions_match_any_by_topic.contains_key(&topic2));
assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id));
subscriptions.unsubscribe(sub_info1.seq_id);
assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic1));
assert!(!subscriptions.subscriptions_match_all_by_topic.contains_key(&topic2));
}
#[test]
fn test_notify_any_subscribers() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::Any,
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let statement = signed_statement(1);
subscriptions.notify_matching_filters(&statement);
let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
}
#[test]
fn test_notify_match_all_subscribers() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic2);
subscriptions.notify_matching_filters(&statement);
assert!(rx1.try_recv().is_err());
statement.set_topic(1, topic1);
subscriptions.notify_matching_filters(&statement);
let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
}
#[test]
fn test_notify_match_any_subscribers() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
let sub_info2 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(vec![topic2].into_iter().collect()),
seq_id: SeqID::from(2),
tx: tx2,
};
subscriptions.subscribe(sub_info1.clone());
subscriptions.subscribe(sub_info2.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions.notify_match_any_subscribers(&statement);
let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
let received = unwrap_statement(rx2.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
}
#[tokio::test]
async fn test_subscription_handle_with_different_workers_number() {
for num_workers in 1..5 {
let subscriptions_handle = SubscriptionsHandle::new(
Box::new(subsoil::core::testing::TaskExecutor::new()),
num_workers,
);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let streams = (0..5)
.into_iter()
.map(|_| {
subscriptions_handle.subscribe(OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
))
})
.collect::<Vec<_>>();
let mut statement = signed_statement(1);
statement.set_topic(0, topic2);
subscriptions_handle.notify(statement.clone());
statement.set_topic(1, topic1);
subscriptions_handle.notify(statement.clone());
for (_tx, mut stream) in streams {
let received =
unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
}
}
}
#[tokio::test]
async fn test_handle_unsubscribe() {
let subscriptions_handle =
SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let (tx, mut stream) = subscriptions_handle
.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions_handle.notify(statement.clone());
let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
drop(stream);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let mut statement2 = signed_statement(2);
statement2.set_topic(0, topic1);
statement2.set_topic(1, topic2);
subscriptions_handle.notify(statement2.clone());
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
assert!(tx.is_closed(), "Sender should be closed after unsubscribe");
}
#[test]
fn test_unsubscribe_nonexistent() {
let mut subscriptions = SubscriptionsInfo::new();
subscriptions.unsubscribe(SeqID::from(999));
assert!(subscriptions.by_sub_id.is_empty());
assert!(subscriptions.subscriptions_any.is_empty());
assert!(subscriptions.subscriptions_match_all_by_topic.is_empty());
assert!(subscriptions.subscriptions_match_any_by_topic.is_empty());
}
#[test]
fn test_multiple_subscriptions_same_topic() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let (tx2, rx2) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
let sub_info2 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(2),
tx: tx2,
};
subscriptions.subscribe(sub_info1.clone());
subscriptions.subscribe(sub_info2.clone());
assert_eq!(
subscriptions
.subscriptions_match_all_by_topic
.get(&topic1)
.unwrap()
.iter()
.map(|s| s.len())
.sum::<usize>(),
2
);
assert_eq!(
subscriptions
.subscriptions_match_all_by_topic
.get(&topic2)
.unwrap()
.iter()
.map(|s| s.len())
.sum::<usize>(),
2
);
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions.notify_matching_filters(&statement);
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
subscriptions.unsubscribe(sub_info1.seq_id);
assert_eq!(
subscriptions
.subscriptions_match_all_by_topic
.get(&topic1)
.unwrap()
.iter()
.map(|s| s.len())
.sum::<usize>(),
1
);
assert_eq!(
subscriptions
.subscriptions_match_all_by_topic
.get(&topic2)
.unwrap()
.iter()
.map(|s| s.len())
.sum::<usize>(),
1
);
assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
assert!(subscriptions.by_sub_id.contains_key(&sub_info2.seq_id));
subscriptions.notify_matching_filters(&statement);
assert!(rx2.try_recv().is_ok());
assert!(rx1.try_recv().is_err());
}
#[test]
fn test_subscriber_auto_unsubscribe_on_channel_full() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(1);
let topic1 = Topic::from([8u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
subscriptions.notify_matching_filters(&statement);
assert!(rx1.try_recv().is_ok());
subscriptions.notify_matching_filters(&statement);
subscriptions.notify_matching_filters(&statement);
assert!(!subscriptions.by_sub_id.contains_key(&sub_info1.seq_id));
assert!(!subscriptions.subscriptions_match_any_by_topic.contains_key(&topic1));
}
#[test]
fn test_match_any_receives_once_per_statement() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions.notify_match_any_subscribers(&statement);
let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
assert!(rx1.try_recv().is_err());
}
#[test]
fn test_match_all_with_single_topic_matches_statement_with_two_topics() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(vec![topic1].into_iter().collect()),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions.notify_matching_filters(&statement);
let received = unwrap_statement(rx1.try_recv().expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
assert!(rx1.try_recv().is_err());
}
#[test]
fn test_match_all_no_matching_topics() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let topic3 = Topic::from([10u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1.clone());
let mut statement = signed_statement(1);
statement.set_topic(0, topic3);
subscriptions.notify_matching_filters(&statement);
assert!(rx1.try_recv().is_err());
}
#[test]
fn test_match_all_with_unsubscribed_topic_first_in_statement() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx1, rx1) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([1u8; 32]);
let topic2 = Topic::from([2u8; 32]);
let sub_info1 = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(vec![topic2].into_iter().collect()),
seq_id: SeqID::from(1),
tx: tx1,
};
subscriptions.subscribe(sub_info1);
let mut statement = signed_statement(1);
statement.set_topic(0, topic1);
statement.set_topic(1, topic2);
subscriptions.notify_match_all_subscribers_best(&statement);
let received = unwrap_statement(rx1.try_recv().expect(
"Should receive statement - if this fails, the `return` bug in \
notify_match_all_subscribers_best is present (should be `continue`)",
));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
}
#[tokio::test]
async fn test_handle_with_match_any_filter() {
let subscriptions_handle =
SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let (_tx, mut stream) = subscriptions_handle
.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1, topic2].into_iter().collect()));
let mut statement1 = signed_statement(1);
statement1.set_topic(0, topic1);
subscriptions_handle.notify(statement1.clone());
let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement1);
let mut statement2 = signed_statement(2);
statement2.set_topic(0, topic2);
subscriptions_handle.notify(statement2.clone());
let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement2);
}
#[tokio::test]
async fn test_handle_with_any_filter() {
let subscriptions_handle =
SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
let (_tx, mut stream) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
let statement1 = signed_statement(1);
subscriptions_handle.notify(statement1.clone());
let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement1);
let mut statement2 = signed_statement(2);
statement2.set_topic(0, Topic::from([99u8; 32]));
subscriptions_handle.notify(statement2.clone());
let received = unwrap_statement(stream.next().await.expect("Should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement2);
}
#[tokio::test]
async fn test_handle_multiple_subscribers_different_filters() {
let subscriptions_handle =
SubscriptionsHandle::new(Box::new(subsoil::core::testing::TaskExecutor::new()), 2);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let (_tx1, mut stream1) = subscriptions_handle
.subscribe(OptimizedTopicFilter::MatchAll(vec![topic1, topic2].into_iter().collect()));
let (_tx2, mut stream2) = subscriptions_handle
.subscribe(OptimizedTopicFilter::MatchAny(vec![topic1].into_iter().collect()));
let (_tx3, mut stream3) = subscriptions_handle.subscribe(OptimizedTopicFilter::Any);
let mut statement1 = signed_statement(1);
statement1.set_topic(0, topic1);
subscriptions_handle.notify(statement1.clone());
let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
assert_eq!(decoded2, statement1);
let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
assert_eq!(decoded3, statement1);
let mut statement2 = signed_statement(2);
statement2.set_topic(0, topic1);
statement2.set_topic(1, topic2);
subscriptions_handle.notify(statement2.clone());
let received1 = unwrap_statement(stream1.next().await.expect("stream1 should receive"));
let decoded1: Statement = Statement::decode(&mut &received1.0[..]).unwrap();
assert_eq!(decoded1, statement2);
let received2 = unwrap_statement(stream2.next().await.expect("stream2 should receive"));
let decoded2: Statement = Statement::decode(&mut &received2.0[..]).unwrap();
assert_eq!(decoded2, statement2);
let received3 = unwrap_statement(stream3.next().await.expect("stream3 should receive"));
let decoded3: Statement = Statement::decode(&mut &received3.0[..]).unwrap();
assert_eq!(decoded3, statement2);
}
#[test]
fn test_statement_without_topics_matches_only_any_filter() {
let mut subscriptions = SubscriptionsInfo::new();
let (tx_match_all, rx_match_all) = async_channel::bounded::<StatementEvent>(10);
let (tx_match_any, rx_match_any) = async_channel::bounded::<StatementEvent>(10);
let (tx_any, rx_any) = async_channel::bounded::<StatementEvent>(10);
let topic1 = Topic::from([8u8; 32]);
let topic2 = Topic::from([9u8; 32]);
let sub_match_all = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAll(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(1),
tx: tx_match_all,
};
subscriptions.subscribe(sub_match_all);
let sub_match_any = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::MatchAny(
vec![topic1, topic2].into_iter().collect(),
),
seq_id: SeqID::from(2),
tx: tx_match_any,
};
subscriptions.subscribe(sub_match_any);
let sub_any = SubscriptionInfo {
topic_filter: OptimizedTopicFilter::Any,
seq_id: SeqID::from(3),
tx: tx_any,
};
subscriptions.subscribe(sub_any);
let statement = signed_statement(1);
assert!(statement.topics().is_empty(), "Statement should have no topics");
subscriptions.notify_matching_filters(&statement);
let received =
unwrap_statement(rx_any.try_recv().expect("Any filter should receive statement"));
let decoded_statement: Statement =
Statement::decode(&mut &received.0[..]).expect("Should decode statement");
assert_eq!(decoded_statement, statement);
assert!(
rx_match_all.try_recv().is_err(),
"MatchAll should not receive statement without topics"
);
assert!(
rx_match_any.try_recv().is_err(),
"MatchAny should not receive statement without topics"
);
}
}