kaspa_notify/
subscriber.rs

1use async_trait::async_trait;
2use core::fmt::Debug;
3use kaspa_core::{debug, trace};
4use std::sync::{
5    atomic::{AtomicBool, Ordering},
6    Arc,
7};
8extern crate derive_more;
9use crate::events::{EventSwitches, EventType};
10
11use super::{
12    error::Result,
13    listener::ListenerId,
14    scope::Scope,
15    subscription::{Command, Mutation},
16};
17use workflow_core::channel::Channel;
18
19/// A manager of subscriptions (see [`Scope`]) for registered listeners
20#[async_trait]
21pub trait SubscriptionManager: Send + Sync + Debug {
22    async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()>;
23    async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()>;
24
25    async fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> Result<()> {
26        match command {
27            Command::Start => self.start_notify(id, scope).await,
28            Command::Stop => self.stop_notify(id, scope).await,
29        }
30    }
31}
32
33pub type DynSubscriptionManager = Arc<dyn SubscriptionManager>;
34
35/// A subscriber handling subscription messages as [`Mutation`] and executing them into a [SubscriptionManager]
36///
37/// A subscriber has a set of enabled event type (see [`EventType`]). It only handles subscriptions
38/// whose event type is enabled and drops all others.
39///
40/// A subscriber has a listener ID identifying its owner (usually a [`Notifier`](crate::notifier::Notifier)) as a listener of its manager
41/// (usually also a [`Notifier`](crate::notifier::Notifier)).
42#[derive(Debug)]
43pub struct Subscriber {
44    name: &'static str,
45
46    /// Event types this subscriber is configured to subscribe to
47    enabled_events: EventSwitches,
48
49    /// Subscription manager
50    subscription_manager: DynSubscriptionManager,
51
52    /// Listener ID
53    listener_id: ListenerId,
54
55    /// Has this subscriber been started?
56    started: Arc<AtomicBool>,
57
58    incoming: Channel<Mutation>,
59    shutdown: Channel<()>,
60}
61
62impl Subscriber {
63    pub fn new(
64        name: &'static str,
65        enabled_events: EventSwitches,
66        subscription_manager: DynSubscriptionManager,
67        listener_id: ListenerId,
68    ) -> Self {
69        Self {
70            name,
71            enabled_events,
72            subscription_manager,
73            listener_id,
74            started: Arc::new(AtomicBool::default()),
75            incoming: Channel::unbounded(),
76            shutdown: Channel::oneshot(),
77        }
78    }
79
80    pub fn handles_event_type(&self, event_type: EventType) -> bool {
81        self.enabled_events[event_type]
82    }
83
84    pub fn start(self: &Arc<Self>) {
85        self.clone().spawn_subscription_receiver_task();
86    }
87
88    /// Launch the subscription receiver
89    fn spawn_subscription_receiver_task(self: Arc<Self>) {
90        // The task can only be spawned once
91        if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
92            return;
93        }
94        trace!("[Subscriber {}] starting subscription receiving task", self.name);
95        workflow_core::task::spawn(async move {
96            while let Ok(mutation) = self.incoming.recv().await {
97                if self.handles_event_type(mutation.event_type()) {
98                    if let Err(err) = self
99                        .subscription_manager
100                        .clone()
101                        .execute_subscribe_command(self.listener_id, mutation.scope, mutation.command)
102                        .await
103                    {
104                        trace!("[Subscriber {}] the subscription command returned an error: {:?}", self.name, err);
105                    }
106                }
107            }
108
109            debug!("[Subscriber {}] subscription stream ended", self.name);
110            let _ = self.shutdown.drain();
111            let _ = self.shutdown.try_send(());
112        });
113    }
114
115    pub fn mutate(self: &Arc<Self>, mutation: Mutation) -> Result<()> {
116        self.incoming.try_send(mutation)?;
117        Ok(())
118    }
119
120    async fn join_subscription_receiver_task(self: &Arc<Self>) -> Result<()> {
121        self.shutdown.recv().await?;
122        Ok(())
123    }
124
125    pub async fn join(self: &Arc<Self>) -> Result<()> {
126        trace!("[Subscriber {}] joining", self.name);
127        let result = self.join_subscription_receiver_task().await;
128        debug!("[Subscriber {}] terminated", self.name);
129        result
130    }
131
132    pub fn close(&self) {
133        self.incoming.sender.close();
134    }
135}
136
137pub mod test_helpers {
138    use super::*;
139    use async_channel::Sender;
140
141    #[derive(Clone, Debug, PartialEq, Eq)]
142    pub struct SubscriptionMessage {
143        pub listener_id: ListenerId,
144        pub mutation: Mutation,
145    }
146
147    impl SubscriptionMessage {
148        pub fn new(listener_id: ListenerId, command: Command, scope: Scope) -> Self {
149            Self { listener_id, mutation: Mutation::new(command, scope) }
150        }
151    }
152
153    #[derive(Debug)]
154    pub struct SubscriptionManagerMock {
155        sender: Sender<SubscriptionMessage>,
156    }
157
158    impl SubscriptionManagerMock {
159        pub fn new(sender: Sender<SubscriptionMessage>) -> Self {
160            Self { sender }
161        }
162    }
163
164    #[async_trait]
165    impl SubscriptionManager for SubscriptionManagerMock {
166        async fn start_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
167            Ok(self.sender.send(SubscriptionMessage::new(id, Command::Start, scope)).await?)
168        }
169
170        async fn stop_notify(&self, id: ListenerId, scope: Scope) -> Result<()> {
171            Ok(self.sender.send(SubscriptionMessage::new(id, Command::Stop, scope)).await?)
172        }
173    }
174}