kaspa_notify/
subscriber.rs1use async_trait::async_trait;
2use core::fmt::Debug;
3use kaspa_core::{debug, trace};
4use std::sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7};
8extern crate derive_more;
9use crate::events::{EventSwitches, EventType};
10
11use super::{
12 error::Result,
13 listener::ListenerId,
14 scope::Scope,
15 subscription::{Command, Mutation},
16};
17use workflow_core::channel::Channel;
18
19#[async_trait]
21pub trait SubscriptionManager: Send + Sync + Debug {
22 async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()>;
23 async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()>;
24
25 async fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
26 match command {
27 Command::Start => self.start_notify(id, scope).await,
28 Command::Stop => self.stop_notify(id, scope).await,
29 }
30 }
31}
32
33pub type DynSubscriptionManager = Arc<dyn SubscriptionManager>;
34
35#[derive(Debug)]
43pub struct Subscriber {
44 name: &'static str,
45
46 enabled_events: EventSwitches,
48
49 subscription_manager: DynSubscriptionManager,
51
52 listener_id: ListenerId,
54
55 started: Arc<AtomicBool>,
57
58 incoming: Channel<Mutation>,
59 shutdown: Channel<()>,
60}
61
62impl Subscriber {
63 pub fn new(
64 name: &'static str,
65 enabled_events: EventSwitches,
66 subscription_manager: DynSubscriptionManager,
67 listener_id: ListenerId,
68 ) -> Self {
69 Self {
70 name,
71 enabled_events,
72 subscription_manager,
73 listener_id,
74 started: Arc::new(AtomicBool::default()),
75 incoming: Channel::unbounded(),
76 shutdown: Channel::oneshot(),
77 }
78 }
79
80 pub fn handles_event_type(&self, event_type: EventType) -> bool {
81 self.enabled_events[event_type]
82 }
83
84 pub fn start(self: &Arc<Self>) {
85 self.clone().spawn_subscription_receiver_task();
86 }
87
88 fn spawn_subscription_receiver_task(self: Arc<Self>) {
90 if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
92 return;
93 }
94 trace!("[Subscriber {}] starting subscription receiving task", self.name);
95 workflow_core::task::spawn(async move {
96 while let Ok(mutation) = self.incoming.recv().await {
97 if self.handles_event_type(mutation.event_type()) {
98 if let Err(err) = self
99 .subscription_manager
100 .clone()
101 .execute_subscribe_command(self.listener_id, mutation.scope, mutation.command)
102 .await
103 {
104 trace!("[Subscriber {}] the subscription command returned an error: {:?}", self.name, err);
105 }
106 }
107 }
108
109 debug!("[Subscriber {}] subscription stream ended", self.name);
110 let _ = self.shutdown.drain();
111 let _ = self.shutdown.try_send(());
112 });
113 }
114
115 pub fn mutate(self: &Arc<Self>, mutation: Mutation) -> Result<()> {
116 self.incoming.try_send(mutation)?;
117 Ok(())
118 }
119
120 async fn join_subscription_receiver_task(self: &Arc<Self>) -> Result<()> {
121 self.shutdown.recv().await?;
122 Ok(())
123 }
124
125 pub async fn join(self: &Arc<Self>) -> Result<()> {
126 trace!("[Subscriber {}] joining", self.name);
127 let result = self.join_subscription_receiver_task().await;
128 debug!("[Subscriber {}] terminated", self.name);
129 result
130 }
131
132 pub fn close(&self) {
133 self.incoming.sender.close();
134 }
135}
136
137pub mod test_helpers {
138 use super::*;
139 use async_channel::Sender;
140
141 #[derive(Clone, Debug, PartialEq, Eq)]
142 pub struct SubscriptionMessage {
143 pub listener_id: ListenerId,
144 pub mutation: Mutation,
145 }
146
147 impl SubscriptionMessage {
148 pub fn new(listener_id: ListenerId, command: Command, scope: Scope) -> Self {
149 Self { listener_id, mutation: Mutation::new(command, scope) }
150 }
151 }
152
153 #[derive(Debug)]
154 pub struct SubscriptionManagerMock {
155 sender: Sender<SubscriptionMessage>,
156 }
157
158 impl SubscriptionManagerMock {
159 pub fn new(sender: Sender<SubscriptionMessage>) -> Self {
160 Self { sender }
161 }
162 }
163
164 #[async_trait]
165 impl SubscriptionManager for SubscriptionManagerMock {
166 async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
167 Ok(self.sender.send(SubscriptionMessage::new(id, Command::Start, scope)).await?)
168 }
169
170 async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
171 Ok(self.sender.send(SubscriptionMessage::new(id, Command::Stop, scope)).await?)
172 }
173 }
174}