kaspa_notify/
notifier.rs

1use crate::{
2    events::EVENT_TYPE_ARRAY,
3    listener::ListenerLifespan,
4    subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
5};
6
7use super::{
8    broadcaster::Broadcaster,
9    collector::DynCollector,
10    connection::Connection,
11    error::{Error, Result},
12    events::{EventArray, EventSwitches, EventType},
13    listener::{Listener, ListenerId},
14    notification::Notification,
15    scope::Scope,
16    subscriber::{Subscriber, SubscriptionManager},
17    subscription::{array::ArrayBuilder, Command, CompoundedSubscription, Mutation},
18};
19use async_channel::Sender;
20use async_trait::async_trait;
21use core::fmt::Debug;
22use futures::future::join_all;
23use itertools::Itertools;
24use kaspa_core::{debug, trace};
25use parking_lot::Mutex;
26use std::{
27    collections::{hash_map::Entry, HashMap},
28    sync::{
29        atomic::{AtomicBool, Ordering},
30        Arc,
31    },
32};
33use workflow_core::channel::Channel;
34
35pub trait Notify<N>: Send + Sync + Debug
36where
37    N: Notification,
38{
39    fn notify(&self, notification: N) -> Result<()>;
40}
41
42pub type DynNotify<N> = Arc<dyn Notify<N>>;
43
44// pub trait Registrar<N>: Send + Sync + Debug
45// where
46//     N: Notification,
47// {
48// }
49
50// pub type DynRegistrar<N> = Arc<dyn Registrar<N>>;
51
52/// A notifier is a notification broadcaster. It receives notifications from upstream _parents_ and
53/// broadcasts those downstream to its _children_ listeners. Symmetrically, it receives subscriptions
54/// from its downward listeners, compounds those internally and pushes upward the subscriptions resulting
55/// of the compounding, if any, to the _parents_.
56///
57/// ### Enabled event types
58///
59/// A notifier has a set of enabled event type (see [`EventType`]). It only broadcasts notifications whose
60/// event type is enabled and drops the others. The same goes for subscriptions.
61///
62/// Each subscriber has a set of enabled event type. No two subscribers may have the same event type enabled.
63/// The union of the sets of all subscribers should match the set of the notifier, though this is not mandatory.
64///
65/// ### Mutation policies
66///
67/// The notifier is built with some mutation policies defining how an processed listener mutation must be propagated
68/// to the _parent_.
69///
70/// ### Architecture
71///
72/// #### Internal structure
73///
74/// The notifier notably owns:
75///
76/// - a vector of [`DynCollector`]
77/// - a vector of [`Subscriber`]
78/// - a pool of `Broadcaster`
79/// - a map of `Listener`
80///
81/// Collectors and subscribers form the scaffold. They are provided to the ctor, are immutable and share its
82/// lifespan. Both do materialize a connection to the notifier _parents_, collectors for incoming notifications
83/// and subscribers for outgoing subscriptions. They may usually be paired by index in their respective
84/// vector but this by no means is mandatory, opening a field for special edge cases.
85///
86/// The broadcasters are built in the ctor according to a provided count. They act as a pool of workers competing
87/// for the processing of an incoming notification.
88///
89/// The listeners are managed dynamically through registration/unregistration calls.
90///
91/// #### External conformation
92///
93/// The notifier is designed so that many instances can be interconnected and form a DAG of notifiers.
94///
95/// However, the notifications path from the root all the way downstream to the final clients is forming a tree,
96/// not a DAG. This is because, for a given type of notification (see [`EventType`]), a notifier has at most a single
97/// _parent_ provider.
98///
99/// The same is symmetrically true about subscriptions which travel upstream from clients to the root along a tree,
100/// meaning that, for a given type of subscription (see [`EventType`]), a notifier has at most a single subscriber,
101/// targeting a single _parent_.
102///
103/// ### Special considerations
104///
105/// A notifier is built with a specific set of enabled event types. It is however possible to manually subscribe
106/// to a disabled scope and thus have a custom-made collector of the notifier receive notifications of this disabled scope,
107/// allowing some handling of the notification into the collector before it gets dropped by the notifier.
108#[derive(Debug)]
109pub struct Notifier<N, C>
110where
111    N: Notification,
112    C: Connection<Notification = N>,
113{
114    inner: Arc<Inner<N, C>>,
115}
116
117impl<N, C> Notifier<N, C>
118where
119    N: Notification,
120    C: Connection<Notification = N>,
121{
122    pub fn new(
123        name: &'static str,
124        enabled_events: EventSwitches,
125        collectors: Vec<DynCollector<N>>,
126        subscribers: Vec<Arc<Subscriber>>,
127        subscription_context: SubscriptionContext,
128        broadcasters: usize,
129        policies: MutationPolicies,
130    ) -> Self {
131        Self::with_sync(name, enabled_events, collectors, subscribers, subscription_context, broadcasters, policies, None)
132    }
133
134    pub fn with_sync(
135        name: &'static str,
136        enabled_events: EventSwitches,
137        collectors: Vec<DynCollector<N>>,
138        subscribers: Vec<Arc<Subscriber>>,
139        subscription_context: SubscriptionContext,
140        broadcasters: usize,
141        policies: MutationPolicies,
142        _sync: Option<Sender<()>>,
143    ) -> Self {
144        Self {
145            inner: Arc::new(Inner::new(
146                name,
147                enabled_events,
148                collectors,
149                subscribers,
150                subscription_context,
151                broadcasters,
152                policies,
153                _sync,
154            )),
155        }
156    }
157
158    pub fn subscription_context(&self) -> &SubscriptionContext {
159        &self.inner.subscription_context
160    }
161
162    pub fn enabled_events(&self) -> &EventSwitches {
163        &self.inner.enabled_events
164    }
165
166    pub fn start(self: Arc<Self>) {
167        self.inner.clone().start(self.clone());
168    }
169
170    pub fn register_new_listener(&self, connection: C, lifespan: ListenerLifespan) -> ListenerId {
171        self.inner.register_new_listener(connection, lifespan)
172    }
173
174    /// Resend the compounded subscription state of the notifier to its subscribers (its parents).
175    ///
176    /// The typical use case is a RPC client reconnecting to a server and resending the compounded subscriptions of its listeners.
177    pub fn try_renew_subscriptions(&self) -> Result<()> {
178        self.inner.clone().renew_subscriptions()
179    }
180
181    pub fn try_start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
182        self.inner.clone().start_notify(id, scope)
183    }
184
185    pub fn try_execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
186        self.inner.clone().execute_subscribe_command(id, scope, command)
187    }
188
189    pub fn try_stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
190        self.inner.clone().stop_notify(id, scope)
191    }
192
193    pub fn unregister_listener(&self, id: ListenerId) -> Result<()> {
194        self.inner.unregister_listener(id)
195    }
196
197    pub async fn join(&self) -> Result<()> {
198        self.inner.clone().join().await
199    }
200}
201
202impl<N, C> Notify<N> for Notifier<N, C>
203where
204    N: Notification,
205    C: Connection<Notification = N>,
206{
207    fn notify(&self, notification: N) -> Result<()> {
208        self.inner.notify(notification)
209    }
210}
211
212#[async_trait]
213impl<N, C> SubscriptionManager for Notifier<N, C>
214where
215    N: Notification,
216    C: Connection<Notification = N>,
217{
218    async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
219        trace!("[Notifier {}] start sending to listener {} notifications of scope {:?}", self.inner.name, id, scope);
220        self.inner.start_notify(id, scope)?;
221        Ok(())
222    }
223
224    async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
225        trace!("[Notifier {}] stop sending to listener {} notifications of scope {:?}", self.inner.name, id, scope);
226        self.inner.stop_notify(id, scope)?;
227        Ok(())
228    }
229}
230
231#[derive(Debug)]
232struct Inner<N, C>
233where
234    N: Notification,
235    C: Connection,
236{
237    /// Event types this notifier is configured to accept, broadcast and subscribe to
238    enabled_events: EventSwitches,
239
240    /// Map of registered listeners
241    listeners: Mutex<HashMap<ListenerId, Listener<C>>>,
242
243    /// Compounded subscriptions by event type
244    subscriptions: Mutex<EventArray<CompoundedSubscription>>,
245
246    /// Has this notifier been started?
247    started: Arc<AtomicBool>,
248
249    /// Channel used to send the notifications to the broadcasters
250    notification_channel: Channel<N>,
251
252    /// Array of notification broadcasters
253    broadcasters: Vec<Arc<Broadcaster<N, C>>>,
254
255    /// Collectors
256    collectors: Vec<DynCollector<N>>,
257
258    /// Subscribers
259    subscribers: Vec<Arc<Subscriber>>,
260
261    /// Enabled Subscriber by event type
262    enabled_subscriber: EventArray<Option<Arc<Subscriber>>>,
263
264    /// Subscription context
265    subscription_context: SubscriptionContext,
266
267    /// Mutation policies
268    policies: MutationPolicies,
269
270    /// Name of the notifier, used in logs
271    pub name: &'static str,
272
273    /// Sync channel, for handling of messages in predictable sequence; exclusively intended for tests.
274    _sync: Option<Sender<()>>,
275}
276
277impl<N, C> Inner<N, C>
278where
279    N: Notification,
280    C: Connection<Notification = N>,
281{
282    fn new(
283        name: &'static str,
284        enabled_events: EventSwitches,
285        collectors: Vec<DynCollector<N>>,
286        subscribers: Vec<Arc<Subscriber>>,
287        subscription_context: SubscriptionContext,
288        broadcasters: usize,
289        policies: MutationPolicies,
290        _sync: Option<Sender<()>>,
291    ) -> Self {
292        assert!(broadcasters > 0, "a notifier requires a minimum of one broadcaster");
293        let notification_channel = Channel::unbounded();
294        let broadcasters = (0..broadcasters)
295            .map(|idx| {
296                Arc::new(Broadcaster::new(
297                    name,
298                    idx,
299                    subscription_context.clone(),
300                    notification_channel.receiver.clone(),
301                    _sync.clone(),
302                ))
303            })
304            .collect::<Vec<_>>();
305        let enabled_subscriber = EventArray::from_fn(|index| {
306            let event: EventType = index.try_into().unwrap();
307            let mut iter = subscribers.iter().filter(|&x| x.handles_event_type(event)).cloned();
308            let subscriber = iter.next();
309            assert!(iter.next().is_none(), "A notifier is not allowed to have more than one subscriber per event type");
310            subscriber
311        });
312        let utxos_changed_capacity = match policies.utxo_changed {
313            UtxosChangedMutationPolicy::AddressSet => subscription_context.address_tracker.addresses_preallocation(),
314            UtxosChangedMutationPolicy::Wildcard => None,
315        };
316        Self {
317            enabled_events,
318            listeners: Mutex::new(HashMap::new()),
319            subscriptions: Mutex::new(ArrayBuilder::compounded(utxos_changed_capacity)),
320            started: Arc::new(AtomicBool::new(false)),
321            notification_channel,
322            broadcasters,
323            collectors,
324            subscribers,
325            enabled_subscriber,
326            subscription_context,
327            policies,
328            name,
329            _sync,
330        }
331    }
332
333    fn start(&self, notifier: Arc<Notifier<N, C>>) {
334        if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
335            trace!("[Notifier {}] starting", self.name);
336            self.subscribers.iter().for_each(|x| x.start());
337            self.collectors.iter().for_each(|x| x.clone().start(notifier.clone()));
338            self.broadcasters.iter().for_each(|x| x.start());
339            trace!("[Notifier {}] started", self.name);
340        } else {
341            trace!("[Notifier {}] start ignored since already started", self.name);
342        }
343    }
344
345    fn register_new_listener(self: &Arc<Self>, connection: C, lifespan: ListenerLifespan) -> ListenerId {
346        let mut listeners = self.listeners.lock();
347        loop {
348            let id = u64::from_le_bytes(rand::random::<[u8; 8]>());
349
350            // This is very unlikely to happen but still, check for duplicates
351            if let Entry::Vacant(e) = listeners.entry(id) {
352                trace!("[Notifier {}] registering listener {id}", self.name);
353                let listener = match lifespan {
354                    ListenerLifespan::Static(policies) => Listener::new_static(id, connection, &self.subscription_context, policies),
355                    ListenerLifespan::Dynamic => Listener::new(id, connection),
356                };
357                e.insert(listener);
358                return id;
359            }
360        }
361    }
362
363    fn unregister_listener(self: &Arc<Self>, id: ListenerId) -> Result<()> {
364        // Try to remove the listener, preventing any possible new subscription
365        let listener = self.listeners.lock().remove(&id);
366        if let Some(mut listener) = listener {
367            trace!("[Notifier {}] unregistering listener {id}", self.name);
368
369            // Cancel all remaining active subscriptions
370            let mut events = listener
371                .subscriptions
372                .iter()
373                .filter_map(|subscription| if subscription.active() { Some(subscription.event_type()) } else { None })
374                .collect_vec();
375            events.drain(..).for_each(|event| {
376                let _ = self.execute_subscribe_command_impl(id, &mut listener, event.into(), Command::Stop);
377            });
378
379            // Close the listener
380            trace!("[Notifier {}] closing listener {id}", self.name);
381            listener.close();
382        } else {
383            trace!("[Notifier {}] unregistering listener {id} error: unknown listener id", self.name);
384        }
385        Ok(())
386    }
387
388    pub fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
389        let event = scope.event_type();
390        if self.enabled_events[event] {
391            let mut listeners = self.listeners.lock();
392            if let Some(listener) = listeners.get_mut(&id) {
393                self.execute_subscribe_command_impl(id, listener, scope, command)?;
394            } else {
395                trace!("[Notifier {}] {command} notifying listener {id} about {scope} error: listener id not found", self.name);
396            }
397        } else {
398            trace!("[Notifier {}] {command} notifying listener {id} about {scope} error: event type {event:?} is disabled", self.name);
399            return Err(Error::EventTypeDisabled);
400        }
401        Ok(())
402    }
403
404    fn execute_subscribe_command_impl(
405        &self,
406        id: ListenerId,
407        listener: &mut Listener<C>,
408        scope: Scope,
409        command: Command,
410    ) -> Result<()> {
411        let mut sync_feedback: bool = false;
412        let event = scope.event_type();
413        let scope_trace = format!("{scope}");
414        debug!("[Notifier {}] {command} notifying about {scope_trace} to listener {id} - {}", self.name, listener.connection());
415        let outcome = listener.mutate(Mutation::new(command, scope), self.policies, &self.subscription_context)?;
416        if outcome.has_changes() {
417            trace!(
418                "[Notifier {}] {command} notifying listener {id} about {scope_trace} involves {} mutations",
419                self.name,
420                outcome.mutations.len(),
421            );
422            // Update broadcasters
423            match (listener.subscriptions[event].active(), outcome.mutated) {
424                (true, Some(subscription)) => {
425                    self.broadcasters
426                        .iter()
427                        .try_for_each(|broadcaster| broadcaster.register(subscription.clone(), id, listener.connection()))?;
428                }
429                (true, None) => {
430                    sync_feedback = true;
431                }
432                (false, _) => {
433                    self.broadcasters.iter().try_for_each(|broadcaster| broadcaster.unregister(event, id))?;
434                }
435            }
436            self.apply_mutations(event, outcome.mutations, &self.subscription_context)?;
437        } else {
438            trace!("[Notifier {}] {command} notifying listener {id} about {scope_trace} is ignored (no mutation)", self.name);
439            sync_feedback = true;
440        }
441        if sync_feedback {
442            // In case we have a sync channel, report that the command was processed.
443            // This is for test only.
444            if let Some(ref sync) = self._sync {
445                let _ = sync.try_send(());
446            }
447        }
448        Ok(())
449    }
450
451    fn apply_mutations(&self, event: EventType, mutations: Vec<Mutation>, context: &SubscriptionContext) -> Result<()> {
452        let mut subscriptions = self.subscriptions.lock();
453        // Compound mutations
454        let mut compound_result = None;
455        for mutation in mutations {
456            compound_result = subscriptions[event].compound(mutation, context);
457        }
458        // Report to the parent if any
459        if let Some(mutation) = compound_result {
460            if let Some(ref subscriber) = self.enabled_subscriber[event] {
461                subscriber.mutate(mutation)?;
462            }
463        }
464        Ok(())
465    }
466
467    fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
468        self.execute_subscribe_command(id, scope, Command::Start)
469    }
470
471    fn notify(&self, notification: N) -> Result<()> {
472        if self.enabled_events[notification.event_type()] {
473            self.notification_channel.try_send(notification)?;
474        }
475        Ok(())
476    }
477
478    fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
479        self.execute_subscribe_command(id, scope, Command::Stop)
480    }
481
482    fn renew_subscriptions(&self) -> Result<()> {
483        let subscriptions = self.subscriptions.lock();
484        EVENT_TYPE_ARRAY.iter().copied().filter(|x| self.enabled_events[*x] && subscriptions[*x].active()).try_for_each(|x| {
485            let mutation = Mutation::new(Command::Start, subscriptions[x].scope(&self.subscription_context));
486            self.subscribers.iter().try_for_each(|subscriber| subscriber.mutate(mutation.clone()))?;
487            Ok(())
488        })
489    }
490
491    async fn join(self: Arc<Self>) -> Result<()> {
492        trace!("[Notifier {}] joining", self.name);
493        if self.started.load(Ordering::SeqCst) {
494            debug!("[Notifier {}] stopping collectors", self.name);
495            join_all(self.collectors.iter().map(|x| x.clone().join()))
496                .await
497                .into_iter()
498                .collect::<std::result::Result<Vec<()>, _>>()?;
499            debug!("[Notifier {}] stopped collectors", self.name);
500
501            // Once collectors exit, we can signal broadcasters
502            self.notification_channel.sender.close();
503
504            debug!("[Notifier {}] stopping broadcasters", self.name);
505            join_all(self.broadcasters.iter().map(|x| x.join())).await.into_iter().collect::<std::result::Result<Vec<()>, _>>()?;
506
507            // Once broadcasters exit, we can close the subscribers
508            self.subscribers.iter().for_each(|s| s.close());
509
510            debug!("[Notifier {}] stopping subscribers", self.name);
511            join_all(self.subscribers.iter().map(|x| x.join())).await.into_iter().collect::<std::result::Result<Vec<()>, _>>()?;
512
513            // Finally, we close all listeners, propagating shutdown by closing their channel when they have one
514            // Note that unregistering listeners is no longer meaningful since both broadcasters and subscribers were stopped
515            debug!("[Notifier {}] closing listeners", self.name);
516            let listener_ids = self.listeners.lock().keys().cloned().collect_vec();
517            listener_ids.iter().for_each(|id| {
518                let listener = self.listeners.lock().remove(id);
519                if let Some(listener) = listener {
520                    listener.close();
521                }
522            });
523        } else {
524            trace!("[Notifier {}] join ignored since it was never started", self.name);
525            return Err(Error::AlreadyStoppedError);
526        }
527        debug!("[Notifier {}] terminated", self.name);
528        Ok(())
529    }
530}
531
532// #[cfg(test)]
533pub mod test_helpers {
534    use super::*;
535    use crate::{
536        address::test_helpers::get_3_addresses,
537        connection::ChannelConnection,
538        notification::test_helpers::{
539            BlockAddedNotification, Data, TestNotification, UtxosChangedNotification, VirtualChainChangedNotification,
540        },
541        scope::{BlockAddedScope, UtxosChangedScope, VirtualChainChangedScope},
542        subscriber::test_helpers::SubscriptionMessage,
543    };
544    use async_channel::Sender;
545    use std::time::Duration;
546
547    pub const SYNC_MAX_DELAY: Duration = Duration::from_secs(2);
548
549    pub type TestConnection = ChannelConnection<TestNotification>;
550    pub type TestNotifier = Notifier<TestNotification, ChannelConnection<TestNotification>>;
551
552    #[derive(Debug)]
553    pub struct NotifyMock<N>
554    where
555        N: Notification,
556    {
557        sender: Sender<N>,
558    }
559
560    impl<N> NotifyMock<N>
561    where
562        N: Notification,
563    {
564        pub fn new(sender: Sender<N>) -> Self {
565            Self { sender }
566        }
567    }
568
569    impl<N> Notify<N> for NotifyMock<N>
570    where
571        N: Notification,
572    {
573        fn notify(&self, notification: N) -> Result<()> {
574            Ok(self.sender.try_send(notification)?)
575        }
576    }
577
578    #[derive(Debug)]
579    pub struct Step {
580        pub name: &'static str,
581        pub mutations: Vec<Option<Mutation>>,
582        pub expected_subscriptions: Vec<Option<SubscriptionMessage>>,
583        pub notification: TestNotification,
584        pub expected_notifications: Vec<Option<TestNotification>>,
585    }
586
587    impl Step {
588        pub fn set_data(&mut self, data: u64) {
589            *self.notification.data_mut() = data;
590            self.expected_notifications.iter_mut().for_each(|x| {
591                if let Some(notification) = x.as_mut() {
592                    *notification.data_mut() = data;
593                }
594            });
595        }
596    }
597
598    pub fn overall_test_steps(listener_id: ListenerId) -> Vec<Step> {
599        fn m(command: Command) -> Option<Mutation> {
600            Some(Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) })
601        }
602        let s = |command: Command| -> Option<SubscriptionMessage> {
603            Some(SubscriptionMessage { listener_id, mutation: Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) } })
604        };
605        fn n() -> TestNotification {
606            TestNotification::BlockAdded(BlockAddedNotification::default())
607        }
608        fn e() -> Option<TestNotification> {
609            Some(TestNotification::BlockAdded(BlockAddedNotification::default()))
610        }
611
612        set_steps_data(vec![
613            Step {
614                name: "do nothing",
615                mutations: vec![],
616                expected_subscriptions: vec![],
617                notification: n(),
618                expected_notifications: vec![None, None],
619            },
620            Step {
621                name: "L0 on",
622                mutations: vec![m(Command::Start), None],
623                expected_subscriptions: vec![s(Command::Start), None],
624                notification: n(),
625                expected_notifications: vec![e(), None],
626            },
627            Step {
628                name: "L0 & L1 on",
629                mutations: vec![None, m(Command::Start)],
630                expected_subscriptions: vec![None, None],
631                notification: n(),
632                expected_notifications: vec![e(), e()],
633            },
634            Step {
635                name: "L1 on",
636                mutations: vec![m(Command::Stop), None],
637                expected_subscriptions: vec![None, None],
638                notification: n(),
639                expected_notifications: vec![None, e()],
640            },
641            Step {
642                name: "all off",
643                mutations: vec![None, m(Command::Stop)],
644                expected_subscriptions: vec![None, s(Command::Stop)],
645                notification: n(),
646                expected_notifications: vec![None, None],
647            },
648        ])
649    }
650
651    pub fn virtual_chain_changed_test_steps(listener_id: ListenerId) -> Vec<Step> {
652        fn m(command: Command, include_accepted_transaction_ids: bool) -> Option<Mutation> {
653            Some(Mutation {
654                command,
655                scope: Scope::VirtualChainChanged(VirtualChainChangedScope::new(include_accepted_transaction_ids)),
656            })
657        }
658        let s = |command: Command, include_accepted_transaction_ids: bool| -> Option<SubscriptionMessage> {
659            Some(SubscriptionMessage {
660                listener_id,
661                mutation: Mutation {
662                    command,
663                    scope: Scope::VirtualChainChanged(VirtualChainChangedScope::new(include_accepted_transaction_ids)),
664                },
665            })
666        };
667        fn n(accepted_transaction_ids: Option<u64>) -> TestNotification {
668            TestNotification::VirtualChainChanged(VirtualChainChangedNotification { data: 0, accepted_transaction_ids })
669        }
670        fn e(accepted_transaction_ids: Option<u64>) -> Option<TestNotification> {
671            Some(TestNotification::VirtualChainChanged(VirtualChainChangedNotification { data: 0, accepted_transaction_ids }))
672        }
673
674        set_steps_data(vec![
675            Step {
676                name: "do nothing",
677                mutations: vec![],
678                expected_subscriptions: vec![],
679                notification: n(None),
680                expected_notifications: vec![None, None],
681            },
682            Step {
683                name: "L0+ on",
684                mutations: vec![m(Command::Start, true), None],
685                expected_subscriptions: vec![s(Command::Start, true), None],
686                notification: n(Some(21)),
687                expected_notifications: vec![e(Some(21)), None],
688            },
689            Step {
690                name: "L0+ & L1- on",
691                mutations: vec![None, m(Command::Start, false)],
692                expected_subscriptions: vec![None, None],
693                notification: n(Some(42)),
694                expected_notifications: vec![e(Some(42)), e(None)],
695            },
696            Step {
697                name: "L0- & L1+ on",
698                mutations: vec![m(Command::Start, false), m(Command::Start, true)],
699                expected_subscriptions: vec![s(Command::Start, false), s(Command::Start, true)],
700                notification: n(Some(63)),
701                expected_notifications: vec![e(None), e(Some(63))],
702            },
703            Step {
704                name: "L1+ on",
705                mutations: vec![m(Command::Stop, false), None],
706                expected_subscriptions: vec![None, None],
707                notification: n(Some(84)),
708                expected_notifications: vec![None, e(Some(84))],
709            },
710            Step {
711                name: "all off",
712                mutations: vec![None, m(Command::Stop, true)],
713                expected_subscriptions: vec![None, s(Command::Stop, true)],
714                notification: n(Some(21)),
715                expected_notifications: vec![None, None],
716            },
717        ])
718    }
719
720    pub fn utxos_changed_test_steps(listener_id: ListenerId) -> Vec<Step> {
721        let a_stock = get_3_addresses(true);
722
723        let a = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
724        let m = |command: Command, indexes: &[usize]| {
725            Some(Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(a(indexes))) })
726        };
727        let s = |command: Command, indexes: &[usize]| {
728            Some(SubscriptionMessage {
729                listener_id,
730                mutation: Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(a(indexes))) },
731            })
732        };
733        let n =
734            |indexes: &[usize]| TestNotification::UtxosChanged(UtxosChangedNotification { data: 0, addresses: Arc::new(a(indexes)) });
735        let e = |indexes: &[usize]| {
736            Some(TestNotification::UtxosChanged(UtxosChangedNotification { data: 0, addresses: Arc::new(a(indexes)) }))
737        };
738
739        set_steps_data(vec![
740            Step {
741                name: "do nothing",
742                mutations: vec![],
743                expected_subscriptions: vec![],
744                notification: n(&[]),
745                expected_notifications: vec![None, None, None],
746            },
747            Step {
748                name: "L0[0] <= N[0]",
749                mutations: vec![m(Command::Start, &[0]), None, None],
750                expected_subscriptions: vec![s(Command::Start, &[0]), None, None],
751                notification: n(&[0]),
752                expected_notifications: vec![e(&[0]), None, None],
753            },
754            Step {
755                name: "L0[0] <= N[0,1,2]",
756                mutations: vec![m(Command::Start, &[0]), None, None],
757                expected_subscriptions: vec![None, None, None],
758                notification: n(&[0, 1, 2]),
759                expected_notifications: vec![e(&[0]), None, None],
760            },
761            Step {
762                name: "L0[0], L1[1] <= N[0,1,2]",
763                mutations: vec![None, m(Command::Start, &[1]), None],
764                expected_subscriptions: vec![None, s(Command::Start, &[1]), None],
765                notification: n(&[0, 1, 2]),
766                expected_notifications: vec![e(&[0]), e(&[1]), None],
767            },
768            Step {
769                name: "L0[0], L1[1], L2[2] <= N[0,1,2]",
770                mutations: vec![None, None, m(Command::Start, &[2])],
771                expected_subscriptions: vec![None, None, s(Command::Start, &[2])],
772                notification: n(&[0, 1, 2]),
773                expected_notifications: vec![e(&[0]), e(&[1]), e(&[2])],
774            },
775            Step {
776                name: "L0[0, 2], L1[*], L2[1, 2] <= N[0,1,2]",
777                mutations: vec![m(Command::Start, &[2]), m(Command::Start, &[]), m(Command::Start, &[1])],
778                expected_subscriptions: vec![None, s(Command::Start, &[]), None],
779                notification: n(&[0, 1, 2]),
780                expected_notifications: vec![e(&[0, 2]), e(&[0, 1, 2]), e(&[1, 2])],
781            },
782            Step {
783                name: "L0[0, 2], L1[*], L2[1, 2] <= N[0]",
784                mutations: vec![None, None, None],
785                expected_subscriptions: vec![None, None, None],
786                notification: n(&[0]),
787                expected_notifications: vec![e(&[0]), e(&[0]), None],
788            },
789            Step {
790                name: "L0[2], L1[1], L2[*] <= N[0, 1]",
791                mutations: vec![m(Command::Stop, &[0]), m(Command::Start, &[1]), m(Command::Start, &[])],
792                expected_subscriptions: vec![None, s(Command::Start, &[1, 2]), s(Command::Start, &[])],
793                notification: n(&[0, 1]),
794                expected_notifications: vec![None, e(&[1]), e(&[0, 1])],
795            },
796            Step {
797                name: "L2[*] <= N[0, 1, 2]",
798                mutations: vec![m(Command::Stop, &[]), m(Command::Stop, &[1]), m(Command::Stop, &[1])],
799                expected_subscriptions: vec![None, None, None],
800                notification: n(&[0, 1, 2]),
801                expected_notifications: vec![None, None, e(&[0, 1, 2])],
802            },
803            Step {
804                name: "all off",
805                mutations: vec![None, None, m(Command::Stop, &[])],
806                expected_subscriptions: vec![None, None, s(Command::Stop, &[])],
807                notification: n(&[0, 1, 2]),
808                expected_notifications: vec![None, None, None],
809            },
810        ])
811    }
812
813    fn set_steps_data(mut steps: Vec<Step>) -> Vec<Step> {
814        // Prepare the notification data markers for the test
815        for (idx, step) in steps.iter_mut().enumerate() {
816            step.set_data(idx as u64);
817        }
818        steps
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::{test_helpers::*, *};
825    use crate::{
826        collector::CollectorFrom,
827        connection::ChannelType,
828        converter::ConverterFrom,
829        events::EVENT_TYPE_ARRAY,
830        notification::test_helpers::*,
831        subscriber::test_helpers::{SubscriptionManagerMock, SubscriptionMessage},
832    };
833    use async_channel::{unbounded, Receiver, Sender};
834    use tokio::time::timeout;
835
836    const SUBSCRIPTION_MANAGER_ID: u64 = 0;
837
838    struct Test {
839        name: &'static str,
840        notifier: Arc<TestNotifier>,
841        subscription_receiver: Receiver<SubscriptionMessage>,
842        listeners: Vec<ListenerId>,
843        notification_sender: Sender<TestNotification>,
844        notification_receivers: Vec<Receiver<TestNotification>>,
845        sync_receiver: Receiver<()>,
846        steps: Vec<Step>,
847    }
848
849    impl Test {
850        fn new(name: &'static str, listener_count: usize, steps: Vec<Step>) -> Self {
851            const IDENT: &str = "test";
852            type TestConverter = ConverterFrom<TestNotification, TestNotification>;
853            type TestCollector = CollectorFrom<TestConverter>;
854            // Build the full-featured notifier
855            let (sync_sender, sync_receiver) = unbounded();
856            let (notification_sender, notification_receiver) = unbounded();
857            let (subscription_sender, subscription_receiver) = unbounded();
858            let collector = Arc::new(TestCollector::new(IDENT, notification_receiver, Arc::new(TestConverter::new())));
859            let subscription_manager = Arc::new(SubscriptionManagerMock::new(subscription_sender));
860            let subscription_context = SubscriptionContext::new();
861            let subscriber =
862                Arc::new(Subscriber::new("test", EVENT_TYPE_ARRAY[..].into(), subscription_manager, SUBSCRIPTION_MANAGER_ID));
863            let notifier = Arc::new(TestNotifier::with_sync(
864                "test",
865                EVENT_TYPE_ARRAY[..].into(),
866                vec![collector],
867                vec![subscriber],
868                subscription_context,
869                1,
870                Default::default(),
871                Some(sync_sender),
872            ));
873            // Create the listeners
874            let mut listeners = Vec::with_capacity(listener_count);
875            let mut notification_receivers = Vec::with_capacity(listener_count);
876            for _ in 0..listener_count {
877                let (sender, receiver) = unbounded();
878                let connection = TestConnection::new(IDENT, sender, ChannelType::Closable);
879                listeners.push(notifier.register_new_listener(connection, ListenerLifespan::Dynamic));
880                notification_receivers.push(receiver);
881            }
882            // Return the built test object
883            Self {
884                name,
885                notifier,
886                subscription_receiver,
887                listeners,
888                notification_sender,
889                notification_receivers,
890                sync_receiver,
891                steps,
892            }
893        }
894
895        async fn run(&self) {
896            self.notifier.clone().start();
897
898            // Execute the test steps
899            for (step_idx, step) in self.steps.iter().enumerate() {
900                trace!("Execute test step #{step_idx}: {}", step.name);
901                // Apply the subscription mutations and check the yielded subscriptions messages
902                // the subscription manager gets
903                for (idx, mutation) in step.mutations.iter().enumerate() {
904                    if let Some(ref mutation) = mutation {
905                        trace!("Mutation #{idx}");
906                        assert!(
907                            self.notifier
908                                .execute_subscribe_command(self.listeners[idx], mutation.scope.clone(), mutation.command)
909                                .await
910                                .is_ok(),
911                            "executing the subscription command {mutation:?} failed"
912                        );
913                        trace!("Receiving sync message #{step_idx} after subscribing");
914                        assert!(
915                            timeout(SYNC_MAX_DELAY, self.sync_receiver.recv()).await.unwrap().is_ok(),
916                            "{} - {}: receiving a sync message failed",
917                            self.name,
918                            step.name
919                        );
920                        if let Some(ref expected_subscription) = step.expected_subscriptions[idx] {
921                            let subscription = self.subscription_receiver.recv().await.unwrap();
922                            assert_eq!(
923                                *expected_subscription, subscription,
924                                "{} - {}: the listener[{}] mutation {mutation:?} yielded the wrong subscription",
925                                self.name, step.name, idx
926                            );
927                            assert!(
928                                self.subscription_receiver.is_empty(),
929                                "{} - {}: listener[{}] mutation {mutation:?} yielded an extra subscription but should not",
930                                self.name,
931                                step.name,
932                                idx
933                            );
934                        } else {
935                            assert!(
936                                self.subscription_receiver.is_empty(),
937                                "{} - {}: listener[{}] mutation {mutation:?} yielded a subscription but should not",
938                                self.name,
939                                step.name,
940                                idx
941                            );
942                        }
943                    }
944                }
945
946                // Send the notification
947                trace!("Sending notification #{step_idx}");
948                assert!(
949                    self.notification_sender.send_blocking(step.notification.clone()).is_ok(),
950                    "{} - {}: sending the notification failed",
951                    self.name,
952                    step.name
953                );
954                trace!("Receiving sync message #{step_idx} after notifying");
955                assert!(
956                    timeout(SYNC_MAX_DELAY, self.sync_receiver.recv()).await.unwrap().is_ok(),
957                    "{} - {}: receiving a sync message failed",
958                    self.name,
959                    step.name
960                );
961
962                // Check what the listeners do receive
963                for (idx, expected_notifications) in step.expected_notifications.iter().enumerate() {
964                    if let Some(ref expected_notifications) = expected_notifications {
965                        let notification = self.notification_receivers[idx].recv().await.unwrap();
966                        assert_eq!(
967                            *expected_notifications, notification,
968                            "{} - {}: listener[{}] got wrong notification",
969                            self.name, step.name, idx
970                        );
971                    } else {
972                        assert!(
973                            self.notification_receivers[idx].is_empty(),
974                            "{} - {}: listener[{}] has a notification in its channel but should not",
975                            self.name,
976                            step.name,
977                            idx
978                        );
979                    }
980                }
981            }
982            self.notification_sender.close();
983            assert!(self.notifier.join().await.is_ok(), "notifier failed to stop");
984        }
985    }
986
987    #[tokio::test]
988    async fn test_overall() {
989        kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
990        let test = Test::new("BlockAdded broadcast (OverallSubscription type)", 2, overall_test_steps(SUBSCRIPTION_MANAGER_ID));
991        test.run().await;
992    }
993
994    #[tokio::test]
995    async fn test_virtual_chain_changed() {
996        kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
997        let test = Test::new("VirtualChainChanged broadcast", 2, virtual_chain_changed_test_steps(SUBSCRIPTION_MANAGER_ID));
998        test.run().await;
999    }
1000
1001    #[tokio::test]
1002    async fn test_utxos_changed() {
1003        kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
1004        let test = Test::new("UtxosChanged broadcast", 3, utxos_changed_test_steps(SUBSCRIPTION_MANAGER_ID));
1005        test.run().await;
1006    }
1007}