1use crate::{
2 events::EVENT_TYPE_ARRAY,
3 listener::ListenerLifespan,
4 subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
5};
6
7use super::{
8 broadcaster::Broadcaster,
9 collector::DynCollector,
10 connection::Connection,
11 error::{Error, Result},
12 events::{EventArray, EventSwitches, EventType},
13 listener::{Listener, ListenerId},
14 notification::Notification,
15 scope::Scope,
16 subscriber::{Subscriber, SubscriptionManager},
17 subscription::{array::ArrayBuilder, Command, CompoundedSubscription, Mutation},
18};
19use async_channel::Sender;
20use async_trait::async_trait;
21use core::fmt::Debug;
22use futures::future::join_all;
23use itertools::Itertools;
24use kaspa_core::{debug, trace};
25use parking_lot::Mutex;
26use std::{
27 collections::{hash_map::Entry, HashMap},
28 sync::{
29 atomic::{AtomicBool, Ordering},
30 Arc,
31 },
32};
33use workflow_core::channel::Channel;
34
35pub trait Notify<N>: Send + Sync + Debug
36where
37 N: Notification,
38{
39 fn notify(&self, notification: N) -> Result<()>;
40}
41
42pub type DynNotify<N> = Arc<dyn Notify<N>>;
43
44#[derive(Debug)]
109pub struct Notifier<N, C>
110where
111 N: Notification,
112 C: Connection<Notification = N>,
113{
114 inner: Arc<Inner<N, C>>,
115}
116
117impl<N, C> Notifier<N, C>
118where
119 N: Notification,
120 C: Connection<Notification = N>,
121{
122 pub fn new(
123 name: &'static str,
124 enabled_events: EventSwitches,
125 collectors: Vec<DynCollector<N>>,
126 subscribers: Vec<Arc<Subscriber>>,
127 subscription_context: SubscriptionContext,
128 broadcasters: usize,
129 policies: MutationPolicies,
130 ) -> Self {
131 Self::with_sync(name, enabled_events, collectors, subscribers, subscription_context, broadcasters, policies, None)
132 }
133
134 pub fn with_sync(
135 name: &'static str,
136 enabled_events: EventSwitches,
137 collectors: Vec<DynCollector<N>>,
138 subscribers: Vec<Arc<Subscriber>>,
139 subscription_context: SubscriptionContext,
140 broadcasters: usize,
141 policies: MutationPolicies,
142 _sync: Option<Sender<()>>,
143 ) -> Self {
144 Self {
145 inner: Arc::new(Inner::new(
146 name,
147 enabled_events,
148 collectors,
149 subscribers,
150 subscription_context,
151 broadcasters,
152 policies,
153 _sync,
154 )),
155 }
156 }
157
158 pub fn subscription_context(&self) -> &SubscriptionContext {
159 &self.inner.subscription_context
160 }
161
162 pub fn enabled_events(&self) -> &EventSwitches {
163 &self.inner.enabled_events
164 }
165
166 pub fn start(self: Arc<Self>) {
167 self.inner.clone().start(self.clone());
168 }
169
170 pub fn register_new_listener(&self, connection: C, lifespan: ListenerLifespan) -> ListenerId {
171 self.inner.register_new_listener(connection, lifespan)
172 }
173
174 pub fn try_renew_subscriptions(&self) -> Result<()> {
178 self.inner.clone().renew_subscriptions()
179 }
180
181 pub fn try_start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
182 self.inner.clone().start_notify(id, scope)
183 }
184
185 pub fn try_execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
186 self.inner.clone().execute_subscribe_command(id, scope, command)
187 }
188
189 pub fn try_stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
190 self.inner.clone().stop_notify(id, scope)
191 }
192
193 pub fn unregister_listener(&self, id: ListenerId) -> Result<()> {
194 self.inner.unregister_listener(id)
195 }
196
197 pub async fn join(&self) -> Result<()> {
198 self.inner.clone().join().await
199 }
200}
201
202impl<N, C> Notify<N> for Notifier<N, C>
203where
204 N: Notification,
205 C: Connection<Notification = N>,
206{
207 fn notify(&self, notification: N) -> Result<()> {
208 self.inner.notify(notification)
209 }
210}
211
212#[async_trait]
213impl<N, C> SubscriptionManager for Notifier<N, C>
214where
215 N: Notification,
216 C: Connection<Notification = N>,
217{
218 async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
219 trace!("[Notifier {}] start sending to listener {} notifications of scope {:?}", self.inner.name, id, scope);
220 self.inner.start_notify(id, scope)?;
221 Ok(())
222 }
223
224 async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
225 trace!("[Notifier {}] stop sending to listener {} notifications of scope {:?}", self.inner.name, id, scope);
226 self.inner.stop_notify(id, scope)?;
227 Ok(())
228 }
229}
230
231#[derive(Debug)]
232struct Inner<N, C>
233where
234 N: Notification,
235 C: Connection,
236{
237 enabled_events: EventSwitches,
239
240 listeners: Mutex<HashMap<ListenerId, Listener<C>>>,
242
243 subscriptions: Mutex<EventArray<CompoundedSubscription>>,
245
246 started: Arc<AtomicBool>,
248
249 notification_channel: Channel<N>,
251
252 broadcasters: Vec<Arc<Broadcaster<N, C>>>,
254
255 collectors: Vec<DynCollector<N>>,
257
258 subscribers: Vec<Arc<Subscriber>>,
260
261 enabled_subscriber: EventArray<Option<Arc<Subscriber>>>,
263
264 subscription_context: SubscriptionContext,
266
267 policies: MutationPolicies,
269
270 pub name: &'static str,
272
273 _sync: Option<Sender<()>>,
275}
276
277impl<N, C> Inner<N, C>
278where
279 N: Notification,
280 C: Connection<Notification = N>,
281{
282 fn new(
283 name: &'static str,
284 enabled_events: EventSwitches,
285 collectors: Vec<DynCollector<N>>,
286 subscribers: Vec<Arc<Subscriber>>,
287 subscription_context: SubscriptionContext,
288 broadcasters: usize,
289 policies: MutationPolicies,
290 _sync: Option<Sender<()>>,
291 ) -> Self {
292 assert!(broadcasters > 0, "a notifier requires a minimum of one broadcaster");
293 let notification_channel = Channel::unbounded();
294 let broadcasters = (0..broadcasters)
295 .map(|idx| {
296 Arc::new(Broadcaster::new(
297 name,
298 idx,
299 subscription_context.clone(),
300 notification_channel.receiver.clone(),
301 _sync.clone(),
302 ))
303 })
304 .collect::<Vec<_>>();
305 let enabled_subscriber = EventArray::from_fn(|index| {
306 let event: EventType = index.try_into().unwrap();
307 let mut iter = subscribers.iter().filter(|&x| x.handles_event_type(event)).cloned();
308 let subscriber = iter.next();
309 assert!(iter.next().is_none(), "A notifier is not allowed to have more than one subscriber per event type");
310 subscriber
311 });
312 let utxos_changed_capacity = match policies.utxo_changed {
313 UtxosChangedMutationPolicy::AddressSet => subscription_context.address_tracker.addresses_preallocation(),
314 UtxosChangedMutationPolicy::Wildcard => None,
315 };
316 Self {
317 enabled_events,
318 listeners: Mutex::new(HashMap::new()),
319 subscriptions: Mutex::new(ArrayBuilder::compounded(utxos_changed_capacity)),
320 started: Arc::new(AtomicBool::new(false)),
321 notification_channel,
322 broadcasters,
323 collectors,
324 subscribers,
325 enabled_subscriber,
326 subscription_context,
327 policies,
328 name,
329 _sync,
330 }
331 }
332
333 fn start(&self, notifier: Arc<Notifier<N, C>>) {
334 if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
335 trace!("[Notifier {}] starting", self.name);
336 self.subscribers.iter().for_each(|x| x.start());
337 self.collectors.iter().for_each(|x| x.clone().start(notifier.clone()));
338 self.broadcasters.iter().for_each(|x| x.start());
339 trace!("[Notifier {}] started", self.name);
340 } else {
341 trace!("[Notifier {}] start ignored since already started", self.name);
342 }
343 }
344
345 fn register_new_listener(self: &Arc<Self>, connection: C, lifespan: ListenerLifespan) -> ListenerId {
346 let mut listeners = self.listeners.lock();
347 loop {
348 let id = u64::from_le_bytes(rand::random::<[u8; 8]>());
349
350 if let Entry::Vacant(e) = listeners.entry(id) {
352 trace!("[Notifier {}] registering listener {id}", self.name);
353 let listener = match lifespan {
354 ListenerLifespan::Static(policies) => Listener::new_static(id, connection, &self.subscription_context, policies),
355 ListenerLifespan::Dynamic => Listener::new(id, connection),
356 };
357 e.insert(listener);
358 return id;
359 }
360 }
361 }
362
363 fn unregister_listener(self: &Arc<Self>, id: ListenerId) -> Result<()> {
364 let listener = self.listeners.lock().remove(&id);
366 if let Some(mut listener) = listener {
367 trace!("[Notifier {}] unregistering listener {id}", self.name);
368
369 let mut events = listener
371 .subscriptions
372 .iter()
373 .filter_map(|subscription| if subscription.active() { Some(subscription.event_type()) } else { None })
374 .collect_vec();
375 events.drain(..).for_each(|event| {
376 let _ = self.execute_subscribe_command_impl(id, &mut listener, event.into(), Command::Stop);
377 });
378
379 trace!("[Notifier {}] closing listener {id}", self.name);
381 listener.close();
382 } else {
383 trace!("[Notifier {}] unregistering listener {id} error: unknown listener id", self.name);
384 }
385 Ok(())
386 }
387
388 pub fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
389 let event = scope.event_type();
390 if self.enabled_events[event] {
391 let mut listeners = self.listeners.lock();
392 if let Some(listener) = listeners.get_mut(&id) {
393 self.execute_subscribe_command_impl(id, listener, scope, command)?;
394 } else {
395 trace!("[Notifier {}] {command} notifying listener {id} about {scope} error: listener id not found", self.name);
396 }
397 } else {
398 trace!("[Notifier {}] {command} notifying listener {id} about {scope} error: event type {event:?} is disabled", self.name);
399 return Err(Error::EventTypeDisabled);
400 }
401 Ok(())
402 }
403
404 fn execute_subscribe_command_impl(
405 &self,
406 id: ListenerId,
407 listener: &mut Listener<C>,
408 scope: Scope,
409 command: Command,
410 ) -> Result<()> {
411 let mut sync_feedback: bool = false;
412 let event = scope.event_type();
413 let scope_trace = format!("{scope}");
414 debug!("[Notifier {}] {command} notifying about {scope_trace} to listener {id} - {}", self.name, listener.connection());
415 let outcome = listener.mutate(Mutation::new(command, scope), self.policies, &self.subscription_context)?;
416 if outcome.has_changes() {
417 trace!(
418 "[Notifier {}] {command} notifying listener {id} about {scope_trace} involves {} mutations",
419 self.name,
420 outcome.mutations.len(),
421 );
422 match (listener.subscriptions[event].active(), outcome.mutated) {
424 (true, Some(subscription)) => {
425 self.broadcasters
426 .iter()
427 .try_for_each(|broadcaster| broadcaster.register(subscription.clone(), id, listener.connection()))?;
428 }
429 (true, None) => {
430 sync_feedback = true;
431 }
432 (false, _) => {
433 self.broadcasters.iter().try_for_each(|broadcaster| broadcaster.unregister(event, id))?;
434 }
435 }
436 self.apply_mutations(event, outcome.mutations, &self.subscription_context)?;
437 } else {
438 trace!("[Notifier {}] {command} notifying listener {id} about {scope_trace} is ignored (no mutation)", self.name);
439 sync_feedback = true;
440 }
441 if sync_feedback {
442 if let Some(ref sync) = self._sync {
445 let _ = sync.try_send(());
446 }
447 }
448 Ok(())
449 }
450
451 fn apply_mutations(&self, event: EventType, mutations: Vec<Mutation>, context: &SubscriptionContext) -> Result<()> {
452 let mut subscriptions = self.subscriptions.lock();
453 let mut compound_result = None;
455 for mutation in mutations {
456 compound_result = subscriptions[event].compound(mutation, context);
457 }
458 if let Some(mutation) = compound_result {
460 if let Some(ref subscriber) = self.enabled_subscriber[event] {
461 subscriber.mutate(mutation)?;
462 }
463 }
464 Ok(())
465 }
466
467 fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
468 self.execute_subscribe_command(id, scope, Command::Start)
469 }
470
471 fn notify(&self, notification: N) -> Result<()> {
472 if self.enabled_events[notification.event_type()] {
473 self.notification_channel.try_send(notification)?;
474 }
475 Ok(())
476 }
477
478 fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
479 self.execute_subscribe_command(id, scope, Command::Stop)
480 }
481
482 fn renew_subscriptions(&self) -> Result<()> {
483 let subscriptions = self.subscriptions.lock();
484 EVENT_TYPE_ARRAY.iter().copied().filter(|x| self.enabled_events[*x] && subscriptions[*x].active()).try_for_each(|x| {
485 let mutation = Mutation::new(Command::Start, subscriptions[x].scope(&self.subscription_context));
486 self.subscribers.iter().try_for_each(|subscriber| subscriber.mutate(mutation.clone()))?;
487 Ok(())
488 })
489 }
490
491 async fn join(self: Arc<Self>) -> Result<()> {
492 trace!("[Notifier {}] joining", self.name);
493 if self.started.load(Ordering::SeqCst) {
494 debug!("[Notifier {}] stopping collectors", self.name);
495 join_all(self.collectors.iter().map(|x| x.clone().join()))
496 .await
497 .into_iter()
498 .collect::<std::result::Result<Vec<()>, _>>()?;
499 debug!("[Notifier {}] stopped collectors", self.name);
500
501 self.notification_channel.sender.close();
503
504 debug!("[Notifier {}] stopping broadcasters", self.name);
505 join_all(self.broadcasters.iter().map(|x| x.join())).await.into_iter().collect::<std::result::Result<Vec<()>, _>>()?;
506
507 self.subscribers.iter().for_each(|s| s.close());
509
510 debug!("[Notifier {}] stopping subscribers", self.name);
511 join_all(self.subscribers.iter().map(|x| x.join())).await.into_iter().collect::<std::result::Result<Vec<()>, _>>()?;
512
513 debug!("[Notifier {}] closing listeners", self.name);
516 let listener_ids = self.listeners.lock().keys().cloned().collect_vec();
517 listener_ids.iter().for_each(|id| {
518 let listener = self.listeners.lock().remove(id);
519 if let Some(listener) = listener {
520 listener.close();
521 }
522 });
523 } else {
524 trace!("[Notifier {}] join ignored since it was never started", self.name);
525 return Err(Error::AlreadyStoppedError);
526 }
527 debug!("[Notifier {}] terminated", self.name);
528 Ok(())
529 }
530}
531
532pub mod test_helpers {
534 use super::*;
535 use crate::{
536 address::test_helpers::get_3_addresses,
537 connection::ChannelConnection,
538 notification::test_helpers::{
539 BlockAddedNotification, Data, TestNotification, UtxosChangedNotification, VirtualChainChangedNotification,
540 },
541 scope::{BlockAddedScope, UtxosChangedScope, VirtualChainChangedScope},
542 subscriber::test_helpers::SubscriptionMessage,
543 };
544 use async_channel::Sender;
545 use std::time::Duration;
546
547 pub const SYNC_MAX_DELAY: Duration = Duration::from_secs(2);
548
549 pub type TestConnection = ChannelConnection<TestNotification>;
550 pub type TestNotifier = Notifier<TestNotification, ChannelConnection<TestNotification>>;
551
552 #[derive(Debug)]
553 pub struct NotifyMock<N>
554 where
555 N: Notification,
556 {
557 sender: Sender<N>,
558 }
559
560 impl<N> NotifyMock<N>
561 where
562 N: Notification,
563 {
564 pub fn new(sender: Sender<N>) -> Self {
565 Self { sender }
566 }
567 }
568
569 impl<N> Notify<N> for NotifyMock<N>
570 where
571 N: Notification,
572 {
573 fn notify(&self, notification: N) -> Result<()> {
574 Ok(self.sender.try_send(notification)?)
575 }
576 }
577
578 #[derive(Debug)]
579 pub struct Step {
580 pub name: &'static str,
581 pub mutations: Vec<Option<Mutation>>,
582 pub expected_subscriptions: Vec<Option<SubscriptionMessage>>,
583 pub notification: TestNotification,
584 pub expected_notifications: Vec<Option<TestNotification>>,
585 }
586
587 impl Step {
588 pub fn set_data(&mut self, data: u64) {
589 *self.notification.data_mut() = data;
590 self.expected_notifications.iter_mut().for_each(|x| {
591 if let Some(notification) = x.as_mut() {
592 *notification.data_mut() = data;
593 }
594 });
595 }
596 }
597
598 pub fn overall_test_steps(listener_id: ListenerId) -> Vec<Step> {
599 fn m(command: Command) -> Option<Mutation> {
600 Some(Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) })
601 }
602 let s = |command: Command| -> Option<SubscriptionMessage> {
603 Some(SubscriptionMessage { listener_id, mutation: Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) } })
604 };
605 fn n() -> TestNotification {
606 TestNotification::BlockAdded(BlockAddedNotification::default())
607 }
608 fn e() -> Option<TestNotification> {
609 Some(TestNotification::BlockAdded(BlockAddedNotification::default()))
610 }
611
612 set_steps_data(vec![
613 Step {
614 name: "do nothing",
615 mutations: vec![],
616 expected_subscriptions: vec![],
617 notification: n(),
618 expected_notifications: vec![None, None],
619 },
620 Step {
621 name: "L0 on",
622 mutations: vec![m(Command::Start), None],
623 expected_subscriptions: vec![s(Command::Start), None],
624 notification: n(),
625 expected_notifications: vec![e(), None],
626 },
627 Step {
628 name: "L0 & L1 on",
629 mutations: vec![None, m(Command::Start)],
630 expected_subscriptions: vec![None, None],
631 notification: n(),
632 expected_notifications: vec![e(), e()],
633 },
634 Step {
635 name: "L1 on",
636 mutations: vec![m(Command::Stop), None],
637 expected_subscriptions: vec![None, None],
638 notification: n(),
639 expected_notifications: vec![None, e()],
640 },
641 Step {
642 name: "all off",
643 mutations: vec![None, m(Command::Stop)],
644 expected_subscriptions: vec![None, s(Command::Stop)],
645 notification: n(),
646 expected_notifications: vec![None, None],
647 },
648 ])
649 }
650
651 pub fn virtual_chain_changed_test_steps(listener_id: ListenerId) -> Vec<Step> {
652 fn m(command: Command, include_accepted_transaction_ids: bool) -> Option<Mutation> {
653 Some(Mutation {
654 command,
655 scope: Scope::VirtualChainChanged(VirtualChainChangedScope::new(include_accepted_transaction_ids)),
656 })
657 }
658 let s = |command: Command, include_accepted_transaction_ids: bool| -> Option<SubscriptionMessage> {
659 Some(SubscriptionMessage {
660 listener_id,
661 mutation: Mutation {
662 command,
663 scope: Scope::VirtualChainChanged(VirtualChainChangedScope::new(include_accepted_transaction_ids)),
664 },
665 })
666 };
667 fn n(accepted_transaction_ids: Option<u64>) -> TestNotification {
668 TestNotification::VirtualChainChanged(VirtualChainChangedNotification { data: 0, accepted_transaction_ids })
669 }
670 fn e(accepted_transaction_ids: Option<u64>) -> Option<TestNotification> {
671 Some(TestNotification::VirtualChainChanged(VirtualChainChangedNotification { data: 0, accepted_transaction_ids }))
672 }
673
674 set_steps_data(vec![
675 Step {
676 name: "do nothing",
677 mutations: vec![],
678 expected_subscriptions: vec![],
679 notification: n(None),
680 expected_notifications: vec![None, None],
681 },
682 Step {
683 name: "L0+ on",
684 mutations: vec![m(Command::Start, true), None],
685 expected_subscriptions: vec![s(Command::Start, true), None],
686 notification: n(Some(21)),
687 expected_notifications: vec![e(Some(21)), None],
688 },
689 Step {
690 name: "L0+ & L1- on",
691 mutations: vec![None, m(Command::Start, false)],
692 expected_subscriptions: vec![None, None],
693 notification: n(Some(42)),
694 expected_notifications: vec![e(Some(42)), e(None)],
695 },
696 Step {
697 name: "L0- & L1+ on",
698 mutations: vec![m(Command::Start, false), m(Command::Start, true)],
699 expected_subscriptions: vec![s(Command::Start, false), s(Command::Start, true)],
700 notification: n(Some(63)),
701 expected_notifications: vec![e(None), e(Some(63))],
702 },
703 Step {
704 name: "L1+ on",
705 mutations: vec![m(Command::Stop, false), None],
706 expected_subscriptions: vec![None, None],
707 notification: n(Some(84)),
708 expected_notifications: vec![None, e(Some(84))],
709 },
710 Step {
711 name: "all off",
712 mutations: vec![None, m(Command::Stop, true)],
713 expected_subscriptions: vec![None, s(Command::Stop, true)],
714 notification: n(Some(21)),
715 expected_notifications: vec![None, None],
716 },
717 ])
718 }
719
720 pub fn utxos_changed_test_steps(listener_id: ListenerId) -> Vec<Step> {
721 let a_stock = get_3_addresses(true);
722
723 let a = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
724 let m = |command: Command, indexes: &[usize]| {
725 Some(Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(a(indexes))) })
726 };
727 let s = |command: Command, indexes: &[usize]| {
728 Some(SubscriptionMessage {
729 listener_id,
730 mutation: Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(a(indexes))) },
731 })
732 };
733 let n =
734 |indexes: &[usize]| TestNotification::UtxosChanged(UtxosChangedNotification { data: 0, addresses: Arc::new(a(indexes)) });
735 let e = |indexes: &[usize]| {
736 Some(TestNotification::UtxosChanged(UtxosChangedNotification { data: 0, addresses: Arc::new(a(indexes)) }))
737 };
738
739 set_steps_data(vec![
740 Step {
741 name: "do nothing",
742 mutations: vec![],
743 expected_subscriptions: vec![],
744 notification: n(&[]),
745 expected_notifications: vec![None, None, None],
746 },
747 Step {
748 name: "L0[0] <= N[0]",
749 mutations: vec![m(Command::Start, &[0]), None, None],
750 expected_subscriptions: vec![s(Command::Start, &[0]), None, None],
751 notification: n(&[0]),
752 expected_notifications: vec![e(&[0]), None, None],
753 },
754 Step {
755 name: "L0[0] <= N[0,1,2]",
756 mutations: vec![m(Command::Start, &[0]), None, None],
757 expected_subscriptions: vec![None, None, None],
758 notification: n(&[0, 1, 2]),
759 expected_notifications: vec![e(&[0]), None, None],
760 },
761 Step {
762 name: "L0[0], L1[1] <= N[0,1,2]",
763 mutations: vec![None, m(Command::Start, &[1]), None],
764 expected_subscriptions: vec![None, s(Command::Start, &[1]), None],
765 notification: n(&[0, 1, 2]),
766 expected_notifications: vec![e(&[0]), e(&[1]), None],
767 },
768 Step {
769 name: "L0[0], L1[1], L2[2] <= N[0,1,2]",
770 mutations: vec![None, None, m(Command::Start, &[2])],
771 expected_subscriptions: vec![None, None, s(Command::Start, &[2])],
772 notification: n(&[0, 1, 2]),
773 expected_notifications: vec![e(&[0]), e(&[1]), e(&[2])],
774 },
775 Step {
776 name: "L0[0, 2], L1[*], L2[1, 2] <= N[0,1,2]",
777 mutations: vec![m(Command::Start, &[2]), m(Command::Start, &[]), m(Command::Start, &[1])],
778 expected_subscriptions: vec![None, s(Command::Start, &[]), None],
779 notification: n(&[0, 1, 2]),
780 expected_notifications: vec![e(&[0, 2]), e(&[0, 1, 2]), e(&[1, 2])],
781 },
782 Step {
783 name: "L0[0, 2], L1[*], L2[1, 2] <= N[0]",
784 mutations: vec![None, None, None],
785 expected_subscriptions: vec![None, None, None],
786 notification: n(&[0]),
787 expected_notifications: vec![e(&[0]), e(&[0]), None],
788 },
789 Step {
790 name: "L0[2], L1[1], L2[*] <= N[0, 1]",
791 mutations: vec![m(Command::Stop, &[0]), m(Command::Start, &[1]), m(Command::Start, &[])],
792 expected_subscriptions: vec![None, s(Command::Start, &[1, 2]), s(Command::Start, &[])],
793 notification: n(&[0, 1]),
794 expected_notifications: vec![None, e(&[1]), e(&[0, 1])],
795 },
796 Step {
797 name: "L2[*] <= N[0, 1, 2]",
798 mutations: vec![m(Command::Stop, &[]), m(Command::Stop, &[1]), m(Command::Stop, &[1])],
799 expected_subscriptions: vec![None, None, None],
800 notification: n(&[0, 1, 2]),
801 expected_notifications: vec![None, None, e(&[0, 1, 2])],
802 },
803 Step {
804 name: "all off",
805 mutations: vec![None, None, m(Command::Stop, &[])],
806 expected_subscriptions: vec![None, None, s(Command::Stop, &[])],
807 notification: n(&[0, 1, 2]),
808 expected_notifications: vec![None, None, None],
809 },
810 ])
811 }
812
813 fn set_steps_data(mut steps: Vec<Step>) -> Vec<Step> {
814 for (idx, step) in steps.iter_mut().enumerate() {
816 step.set_data(idx as u64);
817 }
818 steps
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::{test_helpers::*, *};
825 use crate::{
826 collector::CollectorFrom,
827 connection::ChannelType,
828 converter::ConverterFrom,
829 events::EVENT_TYPE_ARRAY,
830 notification::test_helpers::*,
831 subscriber::test_helpers::{SubscriptionManagerMock, SubscriptionMessage},
832 };
833 use async_channel::{unbounded, Receiver, Sender};
834 use tokio::time::timeout;
835
836 const SUBSCRIPTION_MANAGER_ID: u64 = 0;
837
838 struct Test {
839 name: &'static str,
840 notifier: Arc<TestNotifier>,
841 subscription_receiver: Receiver<SubscriptionMessage>,
842 listeners: Vec<ListenerId>,
843 notification_sender: Sender<TestNotification>,
844 notification_receivers: Vec<Receiver<TestNotification>>,
845 sync_receiver: Receiver<()>,
846 steps: Vec<Step>,
847 }
848
849 impl Test {
850 fn new(name: &'static str, listener_count: usize, steps: Vec<Step>) -> Self {
851 const IDENT: &str = "test";
852 type TestConverter = ConverterFrom<TestNotification, TestNotification>;
853 type TestCollector = CollectorFrom<TestConverter>;
854 let (sync_sender, sync_receiver) = unbounded();
856 let (notification_sender, notification_receiver) = unbounded();
857 let (subscription_sender, subscription_receiver) = unbounded();
858 let collector = Arc::new(TestCollector::new(IDENT, notification_receiver, Arc::new(TestConverter::new())));
859 let subscription_manager = Arc::new(SubscriptionManagerMock::new(subscription_sender));
860 let subscription_context = SubscriptionContext::new();
861 let subscriber =
862 Arc::new(Subscriber::new("test", EVENT_TYPE_ARRAY[..].into(), subscription_manager, SUBSCRIPTION_MANAGER_ID));
863 let notifier = Arc::new(TestNotifier::with_sync(
864 "test",
865 EVENT_TYPE_ARRAY[..].into(),
866 vec![collector],
867 vec![subscriber],
868 subscription_context,
869 1,
870 Default::default(),
871 Some(sync_sender),
872 ));
873 let mut listeners = Vec::with_capacity(listener_count);
875 let mut notification_receivers = Vec::with_capacity(listener_count);
876 for _ in 0..listener_count {
877 let (sender, receiver) = unbounded();
878 let connection = TestConnection::new(IDENT, sender, ChannelType::Closable);
879 listeners.push(notifier.register_new_listener(connection, ListenerLifespan::Dynamic));
880 notification_receivers.push(receiver);
881 }
882 Self {
884 name,
885 notifier,
886 subscription_receiver,
887 listeners,
888 notification_sender,
889 notification_receivers,
890 sync_receiver,
891 steps,
892 }
893 }
894
895 async fn run(&self) {
896 self.notifier.clone().start();
897
898 for (step_idx, step) in self.steps.iter().enumerate() {
900 trace!("Execute test step #{step_idx}: {}", step.name);
901 for (idx, mutation) in step.mutations.iter().enumerate() {
904 if let Some(ref mutation) = mutation {
905 trace!("Mutation #{idx}");
906 assert!(
907 self.notifier
908 .execute_subscribe_command(self.listeners[idx], mutation.scope.clone(), mutation.command)
909 .await
910 .is_ok(),
911 "executing the subscription command {mutation:?} failed"
912 );
913 trace!("Receiving sync message #{step_idx} after subscribing");
914 assert!(
915 timeout(SYNC_MAX_DELAY, self.sync_receiver.recv()).await.unwrap().is_ok(),
916 "{} - {}: receiving a sync message failed",
917 self.name,
918 step.name
919 );
920 if let Some(ref expected_subscription) = step.expected_subscriptions[idx] {
921 let subscription = self.subscription_receiver.recv().await.unwrap();
922 assert_eq!(
923 *expected_subscription, subscription,
924 "{} - {}: the listener[{}] mutation {mutation:?} yielded the wrong subscription",
925 self.name, step.name, idx
926 );
927 assert!(
928 self.subscription_receiver.is_empty(),
929 "{} - {}: listener[{}] mutation {mutation:?} yielded an extra subscription but should not",
930 self.name,
931 step.name,
932 idx
933 );
934 } else {
935 assert!(
936 self.subscription_receiver.is_empty(),
937 "{} - {}: listener[{}] mutation {mutation:?} yielded a subscription but should not",
938 self.name,
939 step.name,
940 idx
941 );
942 }
943 }
944 }
945
946 trace!("Sending notification #{step_idx}");
948 assert!(
949 self.notification_sender.send_blocking(step.notification.clone()).is_ok(),
950 "{} - {}: sending the notification failed",
951 self.name,
952 step.name
953 );
954 trace!("Receiving sync message #{step_idx} after notifying");
955 assert!(
956 timeout(SYNC_MAX_DELAY, self.sync_receiver.recv()).await.unwrap().is_ok(),
957 "{} - {}: receiving a sync message failed",
958 self.name,
959 step.name
960 );
961
962 for (idx, expected_notifications) in step.expected_notifications.iter().enumerate() {
964 if let Some(ref expected_notifications) = expected_notifications {
965 let notification = self.notification_receivers[idx].recv().await.unwrap();
966 assert_eq!(
967 *expected_notifications, notification,
968 "{} - {}: listener[{}] got wrong notification",
969 self.name, step.name, idx
970 );
971 } else {
972 assert!(
973 self.notification_receivers[idx].is_empty(),
974 "{} - {}: listener[{}] has a notification in its channel but should not",
975 self.name,
976 step.name,
977 idx
978 );
979 }
980 }
981 }
982 self.notification_sender.close();
983 assert!(self.notifier.join().await.is_ok(), "notifier failed to stop");
984 }
985 }
986
987 #[tokio::test]
988 async fn test_overall() {
989 kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
990 let test = Test::new("BlockAdded broadcast (OverallSubscription type)", 2, overall_test_steps(SUBSCRIPTION_MANAGER_ID));
991 test.run().await;
992 }
993
994 #[tokio::test]
995 async fn test_virtual_chain_changed() {
996 kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
997 let test = Test::new("VirtualChainChanged broadcast", 2, virtual_chain_changed_test_steps(SUBSCRIPTION_MANAGER_ID));
998 test.run().await;
999 }
1000
1001 #[tokio::test]
1002 async fn test_utxos_changed() {
1003 kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
1004 let test = Test::new("UtxosChanged broadcast", 3, utxos_changed_test_steps(SUBSCRIPTION_MANAGER_ID));
1005 test.run().await;
1006 }
1007}