kaspa_consensus_notify/
service.rs

1use crate::{
2    collector::{ConsensusCollector, ConsensusConverter},
3    notification::Notification,
4    notifier::ConsensusNotifier,
5    root::ConsensusNotificationRoot,
6};
7use async_channel::Receiver;
8use kaspa_core::{
9    task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture},
10    trace, warn,
11};
12use kaspa_notify::{
13    events::{EventSwitches, EVENT_TYPE_ARRAY},
14    subscriber::Subscriber,
15    subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
16};
17use kaspa_utils::triggers::SingleTrigger;
18use std::sync::Arc;
19
20const NOTIFY_SERVICE: &str = "notify-service";
21
22pub struct NotifyService {
23    notifier: Arc<ConsensusNotifier>,
24    shutdown: SingleTrigger,
25}
26
27impl NotifyService {
28    pub fn new(
29        root: Arc<ConsensusNotificationRoot>,
30        notification_receiver: Receiver<Notification>,
31        subscription_context: SubscriptionContext,
32    ) -> Self {
33        let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into();
34        let collector = Arc::new(ConsensusCollector::new(NOTIFY_SERVICE, notification_receiver, Arc::new(ConsensusConverter::new())));
35        let subscriber = Arc::new(Subscriber::new(NOTIFY_SERVICE, root_events, root, 0));
36        let policies = MutationPolicies::new(UtxosChangedMutationPolicy::Wildcard);
37        let notifier = Arc::new(ConsensusNotifier::new(
38            NOTIFY_SERVICE,
39            root_events,
40            vec![collector],
41            vec![subscriber],
42            subscription_context,
43            1,
44            policies,
45        ));
46        Self { notifier, shutdown: SingleTrigger::default() }
47    }
48
49    pub fn notifier(&self) -> Arc<ConsensusNotifier> {
50        self.notifier.clone()
51    }
52}
53
54impl AsyncService for NotifyService {
55    fn ident(self: Arc<Self>) -> &'static str {
56        NOTIFY_SERVICE
57    }
58
59    fn start(self: Arc<Self>) -> AsyncServiceFuture {
60        trace!("{} starting", NOTIFY_SERVICE);
61
62        // Prepare a shutdown signal receiver
63        let shutdown_signal = self.shutdown.listener.clone();
64
65        // Launch the service and wait for a shutdown signal
66        Box::pin(async move {
67            self.notifier.clone().start();
68
69            // Keep the notifier running until a service shutdown signal is received
70            shutdown_signal.await;
71            match self.notifier.join().await {
72                Ok(_) => {
73                    trace!("{} terminated the notifier", NOTIFY_SERVICE);
74                    Ok(())
75                }
76                Err(err) => {
77                    warn!("Error while stopping {}: {}", NOTIFY_SERVICE, err);
78                    Err(AsyncServiceError::Service(err.to_string()))
79                }
80            }
81        })
82    }
83
84    fn signal_exit(self: Arc<Self>) {
85        trace!("sending an exit signal to {}", NOTIFY_SERVICE);
86        self.shutdown.trigger.trigger();
87    }
88
89    fn stop(self: Arc<Self>) -> AsyncServiceFuture {
90        Box::pin(async move {
91            trace!("{} stopped", NOTIFY_SERVICE);
92            Ok(())
93        })
94    }
95}