ckb_notify/
lib.rs

1//! Notification service for blockchain events.
2//!
3//! This crate provides a publish-subscribe notification system for CKB blockchain events,
4//! including new blocks, transactions, and network alerts. Components can register to receive
5//! notifications about these events asynchronously.
6use ckb_app_config::NotifyConfig;
7use ckb_async_runtime::Handle;
8use ckb_logger::{debug, error, info, trace};
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::packed::Byte32;
11use ckb_types::{
12    core::{BlockView, tx_pool::Reject},
13    packed::Alert,
14};
15use std::{collections::HashMap, time::Duration};
16use tokio::process::Command;
17use tokio::sync::watch;
18use tokio::sync::{
19    mpsc::{self, Receiver, Sender},
20    oneshot,
21};
22use tokio::time::timeout;
23
24pub use ckb_types::core::tx_pool::PoolTransactionEntry;
25
26/// Asynchronous request sent to the service.
27pub struct Request<A, R> {
28    /// Oneshot channel for the service to send back the response.
29    pub responder: oneshot::Sender<R>,
30    /// Request arguments.
31    pub arguments: A,
32}
33
34impl<A, R> Request<A, R> {
35    /// Call the service with the arguments and wait for the response.
36    pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
37        let (responder, response) = oneshot::channel();
38        let _ = sender
39            .send(Request {
40                responder,
41                arguments,
42            })
43            .await;
44        response.await.ok()
45    }
46}
47
48/// Channel size for signal communication.
49pub const SIGNAL_CHANNEL_SIZE: usize = 1;
50/// Channel size for registration requests.
51pub const REGISTER_CHANNEL_SIZE: usize = 2;
52/// Channel size for notification messages.
53pub const NOTIFY_CHANNEL_SIZE: usize = 128;
54
55/// Type alias for notification registration sender.
56pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
57
58/// watcher request type alias
59pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
60
61/// Notify timeout config
62#[derive(Copy, Clone)]
63pub(crate) struct NotifyTimeout {
64    pub(crate) tx: Duration,
65    pub(crate) alert: Duration,
66    pub(crate) script: Duration,
67}
68
69const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
70const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
71const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
72
73impl NotifyTimeout {
74    pub(crate) fn new(config: &NotifyConfig) -> Self {
75        NotifyTimeout {
76            tx: config
77                .notify_tx_timeout
78                .map(Duration::from_millis)
79                .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
80            alert: config
81                .notify_alert_timeout
82                .map(Duration::from_millis)
83                .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
84            script: config
85                .script_timeout
86                .map(Duration::from_millis)
87                .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
88        }
89    }
90}
91
92/// Controller for the notification service.
93///
94/// Provides methods to subscribe to various blockchain events and notify subscribers
95/// of new blocks, transactions, and network alerts.
96#[derive(Clone)]
97pub struct NotifyController {
98    new_block_register: NotifyRegister<BlockView>,
99    new_block_watcher: NotifyWatcher<Byte32>,
100    new_block_notifier: Sender<BlockView>,
101    new_transaction_register: NotifyRegister<PoolTransactionEntry>,
102    new_transaction_notifier: Sender<PoolTransactionEntry>,
103    proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
104    proposed_transaction_notifier: Sender<PoolTransactionEntry>,
105    reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
106    reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
107    network_alert_register: NotifyRegister<Alert>,
108    network_alert_notifier: Sender<Alert>,
109    handle: Handle,
110}
111
112/// Background service that manages event subscriptions and notifications.
113///
114/// Runs asynchronously and dispatches events to registered subscribers.
115pub struct NotifyService {
116    config: NotifyConfig,
117    new_block_subscribers: HashMap<String, Sender<BlockView>>,
118    new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
119    new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
120    proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
121    reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
122    network_alert_subscribers: HashMap<String, Sender<Alert>>,
123    timeout: NotifyTimeout,
124    handle: Handle,
125}
126
127impl NotifyService {
128    /// Creates a new notification service with the given configuration and async runtime handle.
129    pub fn new(config: NotifyConfig, handle: Handle) -> Self {
130        let timeout = NotifyTimeout::new(&config);
131
132        Self {
133            config,
134            new_block_subscribers: HashMap::default(),
135            new_block_watchers: HashMap::default(),
136            new_transaction_subscribers: HashMap::default(),
137            proposed_transaction_subscribers: HashMap::default(),
138            reject_transaction_subscribers: HashMap::default(),
139            network_alert_subscribers: HashMap::default(),
140            timeout,
141            handle,
142        }
143    }
144
145    /// start background tokio spawned task.
146    pub fn start(mut self) -> NotifyController {
147        let signal_receiver: CancellationToken = new_tokio_exit_rx();
148        let handle = self.handle.clone();
149
150        let (new_block_register, mut new_block_register_receiver) =
151            mpsc::channel(REGISTER_CHANNEL_SIZE);
152        let (new_block_watcher, mut new_block_watcher_receiver) =
153            mpsc::channel(REGISTER_CHANNEL_SIZE);
154        let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
155
156        let (new_transaction_register, mut new_transaction_register_receiver) =
157            mpsc::channel(REGISTER_CHANNEL_SIZE);
158        let (new_transaction_sender, mut new_transaction_receiver) =
159            mpsc::channel(NOTIFY_CHANNEL_SIZE);
160
161        let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
162            mpsc::channel(REGISTER_CHANNEL_SIZE);
163        let (proposed_transaction_sender, mut proposed_transaction_receiver) =
164            mpsc::channel(NOTIFY_CHANNEL_SIZE);
165
166        let (reject_transaction_register, mut reject_transaction_register_receiver) =
167            mpsc::channel(REGISTER_CHANNEL_SIZE);
168        let (reject_transaction_sender, mut reject_transaction_receiver) =
169            mpsc::channel(NOTIFY_CHANNEL_SIZE);
170
171        let (network_alert_register, mut network_alert_register_receiver) =
172            mpsc::channel(REGISTER_CHANNEL_SIZE);
173        let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
174
175        handle.spawn(async move {
176            loop {
177                tokio::select! {
178                    Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
179                    Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
180                    Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
181                    Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
182                    Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
183                    Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
184                    Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
185                    Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
186                    Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
187                    Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
188                    Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
189                    _ = signal_receiver.cancelled() => {
190                        info!("NotifyService received exit signal, exit now");
191                        break;
192                    }
193                    else => break,
194                }
195            }
196        });
197
198        NotifyController {
199            new_block_register,
200            new_block_watcher,
201            new_block_notifier: new_block_sender,
202            new_transaction_register,
203            new_transaction_notifier: new_transaction_sender,
204            proposed_transaction_register,
205            proposed_transaction_notifier: proposed_transaction_sender,
206            reject_transaction_register,
207            reject_transaction_notifier: reject_transaction_sender,
208            network_alert_register,
209            network_alert_notifier: network_alert_sender,
210            handle,
211        }
212    }
213
214    fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
215        let Request {
216            responder,
217            arguments: name,
218        } = msg;
219        debug!("Watch new_block {:?}", name);
220        let (sender, receiver) = watch::channel(Byte32::zero());
221        self.new_block_watchers.insert(name, sender);
222        let _ = responder.send(receiver);
223    }
224
225    fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
226        let Request {
227            responder,
228            arguments: name,
229        } = msg;
230        debug!("Register new_block {:?}", name);
231        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
232        self.new_block_subscribers.insert(name, sender);
233        let _ = responder.send(receiver);
234    }
235
236    fn handle_notify_new_block(&self, block: BlockView) {
237        trace!("New block event {:?}", block);
238        let block_hash = block.hash();
239        // notify all subscribers
240        for subscriber in self.new_block_subscribers.values() {
241            let block = block.clone();
242            let subscriber = subscriber.clone();
243            self.handle.spawn(async move {
244                if let Err(e) = subscriber.send(block).await {
245                    error!("Failed to notify new block, error: {}", e);
246                }
247            });
248        }
249
250        // notify all watchers
251        for watcher in self.new_block_watchers.values() {
252            if let Err(e) = watcher.send(block_hash.clone()) {
253                error!("Failed to notify new block watcher, error: {}", e);
254            }
255        }
256
257        // notify script
258        if let Some(script) = self.config.new_block_notify_script.clone() {
259            let script_timeout = self.timeout.script;
260            self.handle.spawn(async move {
261                let args = [format!("{block_hash:#x}")];
262                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
263                    Ok(ret) => match ret {
264                        Ok(status) => debug!("The new_block_notify script exited with: {status}"),
265                        Err(e) => error!(
266                            "Failed to run new_block_notify_script: {} {:?}, error: {}",
267                            script, args[0], e
268                        ),
269                    },
270                    Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
271                }
272            });
273        }
274    }
275
276    fn handle_register_new_transaction(
277        &mut self,
278        msg: Request<String, Receiver<PoolTransactionEntry>>,
279    ) {
280        let Request {
281            responder,
282            arguments: name,
283        } = msg;
284        debug!("Register new_transaction {:?}", name);
285        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
286        self.new_transaction_subscribers.insert(name, sender);
287        let _ = responder.send(receiver);
288    }
289
290    fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
291        trace!("New tx event {:?}", tx_entry);
292        // notify all subscribers
293        let tx_timeout = self.timeout.tx;
294        // notify all subscribers
295        for subscriber in self.new_transaction_subscribers.values() {
296            let tx_entry = tx_entry.clone();
297            let subscriber = subscriber.clone();
298            self.handle.spawn(async move {
299                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
300                    error!("Failed to notify new transaction, error: {}", e);
301                }
302            });
303        }
304    }
305
306    fn handle_register_proposed_transaction(
307        &mut self,
308        msg: Request<String, Receiver<PoolTransactionEntry>>,
309    ) {
310        let Request {
311            responder,
312            arguments: name,
313        } = msg;
314        debug!("Register proposed_transaction {:?}", name);
315        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
316        self.proposed_transaction_subscribers.insert(name, sender);
317        let _ = responder.send(receiver);
318    }
319
320    fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
321        trace!("Proposed tx event {:?}", tx_entry);
322        // notify all subscribers
323        let tx_timeout = self.timeout.tx;
324        // notify all subscribers
325        for subscriber in self.proposed_transaction_subscribers.values() {
326            let tx_entry = tx_entry.clone();
327            let subscriber = subscriber.clone();
328            self.handle.spawn(async move {
329                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
330                    error!("Failed to notify proposed transaction, error {}", e);
331                }
332            });
333        }
334    }
335
336    fn handle_register_reject_transaction(
337        &mut self,
338        msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
339    ) {
340        let Request {
341            responder,
342            arguments: name,
343        } = msg;
344        debug!("Register reject_transaction {:?}", name);
345        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
346        self.reject_transaction_subscribers.insert(name, sender);
347        let _ = responder.send(receiver);
348    }
349
350    fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
351        trace!("Tx reject event {:?}", tx_entry);
352        // notify all subscribers
353        let tx_timeout = self.timeout.tx;
354        // notify all subscribers
355        for subscriber in self.reject_transaction_subscribers.values() {
356            let tx_entry = tx_entry.clone();
357            let subscriber = subscriber.clone();
358            self.handle.spawn(async move {
359                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
360                    error!("Failed to notify transaction reject, error: {}", e);
361                }
362            });
363        }
364    }
365
366    fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
367        let Request {
368            responder,
369            arguments: name,
370        } = msg;
371        debug!("Register network_alert {:?}", name);
372        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
373        self.network_alert_subscribers.insert(name, sender);
374        let _ = responder.send(receiver);
375    }
376
377    fn handle_notify_network_alert(&self, alert: Alert) {
378        trace!("Network alert event {:?}", alert);
379        let alert_timeout = self.timeout.alert;
380        let message = alert
381            .as_reader()
382            .raw()
383            .message()
384            .as_utf8()
385            .expect("alert message should be utf8")
386            .to_owned();
387        // notify all subscribers
388        for subscriber in self.network_alert_subscribers.values() {
389            let subscriber = subscriber.clone();
390            let alert = alert.clone();
391            self.handle.spawn(async move {
392                if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
393                    error!("Failed to notify network_alert, error: {}", e);
394                }
395            });
396        }
397
398        // notify script
399        if let Some(script) = self.config.network_alert_notify_script.clone() {
400            let script_timeout = self.timeout.script;
401            self.handle.spawn(async move {
402                let args = [message];
403                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
404                    Ok(ret) => match ret {
405                        Ok(status) => {
406                            debug!("the network_alert_notify script exited with: {}", status)
407                        }
408                        Err(e) => error!(
409                            "failed to run network_alert_notify_script: {} {}, error: {}",
410                            script, args[0], e
411                        ),
412                    },
413                    Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
414                }
415            });
416        }
417    }
418}
419
420impl NotifyController {
421    /// Subscribes to new block notifications with the given name.
422    ///
423    /// Returns a receiver channel that will receive new block events.
424    pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
425        Request::call(&self.new_block_register, name.to_string())
426            .await
427            .expect("Subscribe new block should be OK")
428    }
429
430    /// watch new block notify
431    pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
432        Request::call(&self.new_block_watcher, name.to_string())
433            .await
434            .expect("Watch new block should be OK")
435    }
436
437    /// Notifies all subscribers of a new block.
438    pub fn notify_new_block(&self, block: BlockView) {
439        let new_block_notifier = self.new_block_notifier.clone();
440        self.handle.spawn(async move {
441            if let Err(e) = new_block_notifier.send(block).await {
442                error!("notify_new_block channel is closed: {}", e);
443            }
444        });
445    }
446
447    /// Subscribes to new transaction notifications with the given name.
448    ///
449    /// Returns a receiver channel that will receive new transaction events from the transaction pool.
450    pub async fn subscribe_new_transaction<S: ToString>(
451        &self,
452        name: S,
453    ) -> Receiver<PoolTransactionEntry> {
454        Request::call(&self.new_transaction_register, name.to_string())
455            .await
456            .expect("Subscribe new transaction should be OK")
457    }
458
459    /// Notifies all subscribers of a new transaction in the transaction pool.
460    pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
461        let new_transaction_notifier = self.new_transaction_notifier.clone();
462        self.handle.spawn(async move {
463            if let Err(e) = new_transaction_notifier.send(tx_entry).await {
464                error!("notify_new_transaction channel is closed: {}", e);
465            }
466        });
467    }
468
469    /// Subscribes to proposed transaction notifications with the given name.
470    ///
471    /// Returns a receiver channel that will receive proposed transaction events.
472    pub async fn subscribe_proposed_transaction<S: ToString>(
473        &self,
474        name: S,
475    ) -> Receiver<PoolTransactionEntry> {
476        Request::call(&self.proposed_transaction_register, name.to_string())
477            .await
478            .expect("Subscribe proposed transaction should be OK")
479    }
480
481    /// Notifies all subscribers of a proposed transaction.
482    pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
483        let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
484        self.handle.spawn(async move {
485            if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
486                error!("notify_proposed_transaction channel is closed: {}", e);
487            }
488        });
489    }
490
491    /// Subscribes to rejected transaction notifications with the given name.
492    ///
493    /// Returns a receiver channel that will receive rejected transaction events.
494    pub async fn subscribe_reject_transaction<S: ToString>(
495        &self,
496        name: S,
497    ) -> Receiver<(PoolTransactionEntry, Reject)> {
498        Request::call(&self.reject_transaction_register, name.to_string())
499            .await
500            .expect("Subscribe rejected transaction should be OK")
501    }
502
503    /// Notifies all subscribers of a rejected transaction.
504    pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
505        let reject_transaction_notifier = self.reject_transaction_notifier.clone();
506        self.handle.spawn(async move {
507            if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
508                error!("notify_reject_transaction channel is closed: {}", e);
509            }
510        });
511    }
512
513    /// Subscribes to network alert notifications with the given name.
514    ///
515    /// Returns a receiver channel that will receive network alert events.
516    pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
517        Request::call(&self.network_alert_register, name.to_string())
518            .await
519            .expect("Subscribe network alert should be OK")
520    }
521
522    /// Notifies all subscribers of a network alert.
523    pub fn notify_network_alert(&self, alert: Alert) {
524        let network_alert_notifier = self.network_alert_notifier.clone();
525        self.handle.spawn(async move {
526            if let Err(e) = network_alert_notifier.send(alert).await {
527                error!("notify_network_alert channel is closed: {}", e);
528            }
529        });
530    }
531}