kaspa_notify/
root.rs

1use crate::{
2    error::Result,
3    events::{EventArray, EventType},
4    listener::ListenerId,
5    notification::Notification,
6    notifier::Notify,
7    scope::Scope,
8    subscriber::SubscriptionManager,
9    subscription::{
10        array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies,
11        UtxosChangedMutationPolicy,
12    },
13};
14use async_channel::Sender;
15use async_trait::async_trait;
16use kaspa_core::{debug, trace};
17use parking_lot::RwLock;
18use std::sync::Arc;
19
20/// Root of a notification system
21///
22/// The [`Root`] receives new notifications via its `notify` function, transforms them according
23/// to its internal subscription scope and, when any, sends them through a channel.
24///
25/// It is a [`SubscriptionManager`], so the notification scope can be dynamically configured
26/// according to the needs of the whole notification system.
27#[derive(Clone, Debug)]
28pub struct Root<N>
29where
30    N: Notification,
31{
32    inner: Arc<Inner<N>>,
33}
34
35impl<N> Root<N>
36where
37    N: Notification,
38{
39    pub fn new(sender: Sender<N>) -> Self {
40        let subscription_context = SubscriptionContext::new();
41        Self::with_context(sender, subscription_context)
42    }
43
44    pub fn with_context(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
45        let inner = Arc::new(Inner::new(sender, subscription_context));
46        Self { inner }
47    }
48
49    pub fn send(&self, notification: N) -> Result<()> {
50        self.inner.send(notification)
51    }
52
53    pub fn close(&self) -> bool {
54        debug!("[Notification root] closing");
55        self.inner.sender.close()
56    }
57
58    pub fn is_closed(&self) -> bool {
59        self.inner.sender.is_closed()
60    }
61
62    pub fn has_subscription(&self, event: EventType) -> bool {
63        self.inner.has_subscription(event)
64    }
65}
66
67impl<N> Notify<N> for Root<N>
68where
69    N: Notification,
70{
71    fn notify(&self, notification: N) -> Result<()> {
72        self.inner.notify(notification)
73    }
74}
75
76#[async_trait]
77impl<N> SubscriptionManager for Root<N>
78where
79    N: Notification,
80{
81    async fn start_notify(&self, _: ListenerId, scope: Scope) -> Result<()> {
82        trace!("[Notification root] start sending notifications of scope {scope:?}");
83        self.inner.start_notify(scope)?;
84        Ok(())
85    }
86
87    async fn stop_notify(&self, _: ListenerId, scope: Scope) -> Result<()> {
88        trace!("[Notification root] stop notifications of scope {scope:?}");
89        self.inner.stop_notify(scope)?;
90        Ok(())
91    }
92}
93
94#[derive(Debug)]
95struct Inner<N>
96where
97    N: Notification,
98{
99    sender: Sender<N>,
100    subscriptions: RwLock<EventArray<DynSubscription>>,
101    subscription_context: SubscriptionContext,
102    policies: MutationPolicies,
103}
104
105impl<N> Inner<N>
106where
107    N: Notification,
108{
109    const ROOT_LISTENER_ID: ListenerId = 1;
110
111    fn new(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
112        let subscriptions = RwLock::new(ArrayBuilder::single(Self::ROOT_LISTENER_ID, None));
113        let policies = MutationPolicies::new(UtxosChangedMutationPolicy::Wildcard);
114        Self { sender, subscriptions, subscription_context, policies }
115    }
116
117    fn send(&self, notification: N) -> Result<()> {
118        let event = notification.event_type();
119        let subscription = &self.subscriptions.read()[event];
120        if let Some(applied_notification) = notification.apply_subscription(&**subscription, &self.subscription_context) {
121            self.sender.try_send(applied_notification)?;
122        }
123        Ok(())
124    }
125
126    pub fn execute_subscribe_command(&self, scope: Scope, command: Command) -> Result<()> {
127        let mutation = Mutation::new(command, scope);
128        let mut subscriptions = self.subscriptions.write();
129        subscriptions[mutation.event_type()].mutate(mutation, self.policies, &self.subscription_context)?;
130        Ok(())
131    }
132
133    fn start_notify(&self, scope: Scope) -> Result<()> {
134        self.execute_subscribe_command(scope, Command::Start)
135    }
136
137    fn notify(&self, notification: N) -> Result<()> {
138        let event = notification.event_type();
139        let subscription = &self.subscriptions.read()[event];
140        if subscription.active() {
141            if let Some(applied_notification) = notification.apply_subscription(&**subscription, &self.subscription_context) {
142                self.sender.try_send(applied_notification)?;
143            }
144        }
145        Ok(())
146    }
147
148    fn stop_notify(&self, scope: Scope) -> Result<()> {
149        self.execute_subscribe_command(scope, Command::Stop)
150    }
151
152    fn has_subscription(&self, event: EventType) -> bool {
153        let subscription = &self.subscriptions.read()[event];
154        subscription.active()
155    }
156}
157
158// TODO: tests