kaspa_consensus_notify/
service.rs1use crate::{
2 collector::{ConsensusCollector, ConsensusConverter},
3 notification::Notification,
4 notifier::ConsensusNotifier,
5 root::ConsensusNotificationRoot,
6};
7use async_channel::Receiver;
8use kaspa_core::{
9 task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture},
10 trace, warn,
11};
12use kaspa_notify::{
13 events::{EventSwitches, EVENT_TYPE_ARRAY},
14 subscriber::Subscriber,
15 subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
16};
17use kaspa_utils::triggers::SingleTrigger;
18use std::sync::Arc;
19
20const NOTIFY_SERVICE: &str = "notify-service";
21
22pub struct NotifyService {
23 notifier: Arc<ConsensusNotifier>,
24 shutdown: SingleTrigger,
25}
26
27impl NotifyService {
28 pub fn new(
29 root: Arc<ConsensusNotificationRoot>,
30 notification_receiver: Receiver<Notification>,
31 subscription_context: SubscriptionContext,
32 ) -> Self {
33 let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into();
34 let collector = Arc::new(ConsensusCollector::new(NOTIFY_SERVICE, notification_receiver, Arc::new(ConsensusConverter::new())));
35 let subscriber = Arc::new(Subscriber::new(NOTIFY_SERVICE, root_events, root, 0));
36 let policies = MutationPolicies::new(UtxosChangedMutationPolicy::Wildcard);
37 let notifier = Arc::new(ConsensusNotifier::new(
38 NOTIFY_SERVICE,
39 root_events,
40 vec![collector],
41 vec![subscriber],
42 subscription_context,
43 1,
44 policies,
45 ));
46 Self { notifier, shutdown: SingleTrigger::default() }
47 }
48
49 pub fn notifier(&self) -> Arc<ConsensusNotifier> {
50 self.notifier.clone()
51 }
52}
53
54impl AsyncService for NotifyService {
55 fn ident(self: Arc<Self>) -> &'static str {
56 NOTIFY_SERVICE
57 }
58
59 fn start(self: Arc<Self>) -> AsyncServiceFuture {
60 trace!("{} starting", NOTIFY_SERVICE);
61
62 let shutdown_signal = self.shutdown.listener.clone();
64
65 Box::pin(async move {
67 self.notifier.clone().start();
68
69 shutdown_signal.await;
71 match self.notifier.join().await {
72 Ok(_) => {
73 trace!("{} terminated the notifier", NOTIFY_SERVICE);
74 Ok(())
75 }
76 Err(err) => {
77 warn!("Error while stopping {}: {}", NOTIFY_SERVICE, err);
78 Err(AsyncServiceError::Service(err.to_string()))
79 }
80 }
81 })
82 }
83
84 fn signal_exit(self: Arc<Self>) {
85 trace!("sending an exit signal to {}", NOTIFY_SERVICE);
86 self.shutdown.trigger.trigger();
87 }
88
89 fn stop(self: Arc<Self>) -> AsyncServiceFuture {
90 Box::pin(async move {
91 trace!("{} stopped", NOTIFY_SERVICE);
92 Ok(())
93 })
94 }
95}