1use crate::{
2 error::Result,
3 events::{EventArray, EventType},
4 listener::ListenerId,
5 notification::Notification,
6 notifier::Notify,
7 scope::Scope,
8 subscriber::SubscriptionManager,
9 subscription::{
10 array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies,
11 UtxosChangedMutationPolicy,
12 },
13};
14use async_channel::Sender;
15use async_trait::async_trait;
16use kaspa_core::{debug, trace};
17use parking_lot::RwLock;
18use std::sync::Arc;
19
20#[derive(Clone, Debug)]
28pub struct Root<N>
29where
30 N: Notification,
31{
32 inner: Arc<Inner<N>>,
33}
34
35impl<N> Root<N>
36where
37 N: Notification,
38{
39 pub fn new(sender: Sender<N>) -> Self {
40 let subscription_context = SubscriptionContext::new();
41 Self::with_context(sender, subscription_context)
42 }
43
44 pub fn with_context(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
45 let inner = Arc::new(Inner::new(sender, subscription_context));
46 Self { inner }
47 }
48
49 pub fn send(&self, notification: N) -> Result<()> {
50 self.inner.send(notification)
51 }
52
53 pub fn close(&self) -> bool {
54 debug!("[Notification root] closing");
55 self.inner.sender.close()
56 }
57
58 pub fn is_closed(&self) -> bool {
59 self.inner.sender.is_closed()
60 }
61
62 pub fn has_subscription(&self, event: EventType) -> bool {
63 self.inner.has_subscription(event)
64 }
65}
66
67impl<N> Notify<N> for Root<N>
68where
69 N: Notification,
70{
71 fn notify(&self, notification: N) -> Result<()> {
72 self.inner.notify(notification)
73 }
74}
75
76#[async_trait]
77impl<N> SubscriptionManager for Root<N>
78where
79 N: Notification,
80{
81 async fn start_notify(&self, _: ListenerId, scope: Scope) -> Result<()> {
82 trace!("[Notification root] start sending notifications of scope {scope:?}");
83 self.inner.start_notify(scope)?;
84 Ok(())
85 }
86
87 async fn stop_notify(&self, _: ListenerId, scope: Scope) -> Result<()> {
88 trace!("[Notification root] stop notifications of scope {scope:?}");
89 self.inner.stop_notify(scope)?;
90 Ok(())
91 }
92}
93
94#[derive(Debug)]
95struct Inner<N>
96where
97 N: Notification,
98{
99 sender: Sender<N>,
100 subscriptions: RwLock<EventArray<DynSubscription>>,
101 subscription_context: SubscriptionContext,
102 policies: MutationPolicies,
103}
104
105impl<N> Inner<N>
106where
107 N: Notification,
108{
109 const ROOT_LISTENER_ID: ListenerId = 1;
110
111 fn new(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
112 let subscriptions = RwLock::new(ArrayBuilder::single(Self::ROOT_LISTENER_ID, None));
113 let policies = MutationPolicies::new(UtxosChangedMutationPolicy::Wildcard);
114 Self { sender, subscriptions, subscription_context, policies }
115 }
116
117 fn send(&self, notification: N) -> Result<()> {
118 let event = notification.event_type();
119 let subscription = &self.subscriptions.read()[event];
120 if let Some(applied_notification) = notification.apply_subscription(&**subscription, &self.subscription_context) {
121 self.sender.try_send(applied_notification)?;
122 }
123 Ok(())
124 }
125
126 pub fn execute_subscribe_command(&self, scope: Scope, command: Command) -> Result<()> {
127 let mutation = Mutation::new(command, scope);
128 let mut subscriptions = self.subscriptions.write();
129 subscriptions[mutation.event_type()].mutate(mutation, self.policies, &self.subscription_context)?;
130 Ok(())
131 }
132
133 fn start_notify(&self, scope: Scope) -> Result<()> {
134 self.execute_subscribe_command(scope, Command::Start)
135 }
136
137 fn notify(&self, notification: N) -> Result<()> {
138 let event = notification.event_type();
139 let subscription = &self.subscriptions.read()[event];
140 if subscription.active() {
141 if let Some(applied_notification) = notification.apply_subscription(&**subscription, &self.subscription_context) {
142 self.sender.try_send(applied_notification)?;
143 }
144 }
145 Ok(())
146 }
147
148 fn stop_notify(&self, scope: Scope) -> Result<()> {
149 self.execute_subscribe_command(scope, Command::Stop)
150 }
151
152 fn has_subscription(&self, event: EventType) -> bool {
153 let subscription = &self.subscriptions.read()[event];
154 subscription.active()
155 }
156}
157
158