Skip to main content

ckb_notify/
lib.rs

1//! Notification service for blockchain events.
2//!
3//! This crate provides a publish-subscribe notification system for CKB blockchain events,
4//! including new blocks, transactions, and network alerts. Components can register to receive
5//! notifications about these events asynchronously.
6use ckb_app_config::NotifyConfig;
7use ckb_async_runtime::Handle;
8use ckb_logger::{Level, debug, error, info, trace};
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::packed::Byte32;
11use ckb_types::{
12    core::{BlockView, tx_pool::Reject},
13    packed::Alert,
14};
15use std::{collections::HashMap, time::Duration};
16use tokio::process::Command;
17use tokio::sync::watch;
18use tokio::sync::{
19    mpsc::{self, Receiver, Sender},
20    oneshot,
21};
22use tokio::time::timeout;
23
24pub use ckb_types::core::tx_pool::PoolTransactionEntry;
25
26/// A log entry containing the message and log level.
27#[derive(Clone, Debug)]
28pub struct LogEntry {
29    /// The log message.
30    pub message: String,
31    /// The log level.
32    pub level: Level,
33    /// The log target
34    pub target: String,
35    /// The date
36    pub date: String,
37}
38
39/// Asynchronous request sent to the service.
40pub struct Request<A, R> {
41    /// Oneshot channel for the service to send back the response.
42    pub responder: oneshot::Sender<R>,
43    /// Request arguments.
44    pub arguments: A,
45}
46
47impl<A, R> Request<A, R> {
48    /// Call the service with the arguments and wait for the response.
49    pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
50        let (responder, response) = oneshot::channel();
51        let _ = sender
52            .send(Request {
53                responder,
54                arguments,
55            })
56            .await;
57        response.await.ok()
58    }
59}
60
61/// Channel size for signal communication.
62pub const SIGNAL_CHANNEL_SIZE: usize = 1;
63/// Channel size for registration requests.
64pub const REGISTER_CHANNEL_SIZE: usize = 2;
65/// Channel size for notification messages.
66pub const NOTIFY_CHANNEL_SIZE: usize = 128;
67
68/// Type alias for notification registration sender.
69pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
70
71/// watcher request type alias
72pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
73
74/// Notify timeout config
75#[derive(Copy, Clone)]
76pub(crate) struct NotifyTimeout {
77    pub(crate) tx: Duration,
78    pub(crate) alert: Duration,
79    pub(crate) script: Duration,
80}
81
82const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
83const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
84const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
85
86impl NotifyTimeout {
87    pub(crate) fn new(config: &NotifyConfig) -> Self {
88        NotifyTimeout {
89            tx: config
90                .notify_tx_timeout
91                .map(Duration::from_millis)
92                .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
93            alert: config
94                .notify_alert_timeout
95                .map(Duration::from_millis)
96                .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
97            script: config
98                .script_timeout
99                .map(Duration::from_millis)
100                .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
101        }
102    }
103}
104
105/// Controller for the notification service.
106///
107/// Provides methods to subscribe to various blockchain events and notify subscribers
108/// of new blocks, transactions, and network alerts.
109#[derive(Clone)]
110pub struct NotifyController {
111    new_block_register: NotifyRegister<BlockView>,
112    new_block_watcher: NotifyWatcher<Byte32>,
113    new_block_notifier: Sender<BlockView>,
114    new_transaction_register: NotifyRegister<PoolTransactionEntry>,
115    new_transaction_notifier: Sender<PoolTransactionEntry>,
116    proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
117    proposed_transaction_notifier: Sender<PoolTransactionEntry>,
118    reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
119    reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
120    network_alert_register: NotifyRegister<Alert>,
121    network_alert_notifier: Sender<Alert>,
122    log_register: NotifyRegister<LogEntry>,
123    log_notifier: Sender<LogEntry>,
124    handle: Handle,
125}
126
127/// Background service that manages event subscriptions and notifications.
128///
129/// Runs asynchronously and dispatches events to registered subscribers.
130pub struct NotifyService {
131    config: NotifyConfig,
132    new_block_subscribers: HashMap<String, Sender<BlockView>>,
133    new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
134    new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
135    proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
136    reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
137    network_alert_subscribers: HashMap<String, Sender<Alert>>,
138    log_subscribers: HashMap<String, Sender<LogEntry>>,
139    timeout: NotifyTimeout,
140    handle: Handle,
141}
142
143impl NotifyService {
144    /// Creates a new notification service with the given configuration and async runtime handle.
145    pub fn new(config: NotifyConfig, handle: Handle) -> Self {
146        let timeout = NotifyTimeout::new(&config);
147
148        Self {
149            config,
150            new_block_subscribers: HashMap::default(),
151            new_block_watchers: HashMap::default(),
152            new_transaction_subscribers: HashMap::default(),
153            proposed_transaction_subscribers: HashMap::default(),
154            reject_transaction_subscribers: HashMap::default(),
155            network_alert_subscribers: HashMap::default(),
156            log_subscribers: HashMap::default(),
157            timeout,
158            handle,
159        }
160    }
161
162    /// start background tokio spawned task.
163    pub fn start(mut self) -> NotifyController {
164        let stop_token: CancellationToken = new_tokio_exit_rx();
165        let handle = self.handle.clone();
166
167        let (new_block_register, mut new_block_register_receiver) =
168            mpsc::channel(REGISTER_CHANNEL_SIZE);
169        let (new_block_watcher, mut new_block_watcher_receiver) =
170            mpsc::channel(REGISTER_CHANNEL_SIZE);
171        let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
172
173        let (new_transaction_register, mut new_transaction_register_receiver) =
174            mpsc::channel(REGISTER_CHANNEL_SIZE);
175        let (new_transaction_sender, mut new_transaction_receiver) =
176            mpsc::channel(NOTIFY_CHANNEL_SIZE);
177
178        let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
179            mpsc::channel(REGISTER_CHANNEL_SIZE);
180        let (proposed_transaction_sender, mut proposed_transaction_receiver) =
181            mpsc::channel(NOTIFY_CHANNEL_SIZE);
182
183        let (reject_transaction_register, mut reject_transaction_register_receiver) =
184            mpsc::channel(REGISTER_CHANNEL_SIZE);
185        let (reject_transaction_sender, mut reject_transaction_receiver) =
186            mpsc::channel(NOTIFY_CHANNEL_SIZE);
187
188        let (network_alert_register, mut network_alert_register_receiver) =
189            mpsc::channel(REGISTER_CHANNEL_SIZE);
190        let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
191
192        let (log_register, mut log_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
193        let (log_sender, mut log_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
194
195        let stop_token_clone = stop_token;
196        handle.spawn(async move {
197            loop {
198                tokio::select! {
199                    Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
200                    Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
201                    Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
202                    Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
203                    Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
204                    Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
205                    Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
206                    Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
207                    Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
208                    Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
209                    Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
210                    Some(msg) = log_register_receiver.recv() => { self.handle_register_log(msg) },
211                    Some(msg) = log_receiver.recv() => { self.handle_notify_log(msg) },
212                    _ = stop_token_clone.cancelled() => {
213                        info!("NotifyService received exit signal, exit now");
214                        break;
215                    }
216                    else => break,
217                }
218            }
219        });
220
221        NotifyController {
222            new_block_register,
223            new_block_watcher,
224            new_block_notifier: new_block_sender,
225            new_transaction_register,
226            new_transaction_notifier: new_transaction_sender,
227            proposed_transaction_register,
228            proposed_transaction_notifier: proposed_transaction_sender,
229            reject_transaction_register,
230            reject_transaction_notifier: reject_transaction_sender,
231            network_alert_register,
232            network_alert_notifier: network_alert_sender,
233            log_register,
234            log_notifier: log_sender,
235            handle,
236        }
237    }
238
239    fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
240        let Request {
241            responder,
242            arguments: name,
243        } = msg;
244        debug!("Watch new_block {:?}", name);
245        let (sender, receiver) = watch::channel(Byte32::zero());
246        self.new_block_watchers.insert(name, sender);
247        let _ = responder.send(receiver);
248    }
249
250    fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
251        let Request {
252            responder,
253            arguments: name,
254        } = msg;
255        debug!("Register new_block {:?}", name);
256        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
257        self.new_block_subscribers.insert(name, sender);
258        let _ = responder.send(receiver);
259    }
260
261    fn handle_notify_new_block(&self, block: BlockView) {
262        trace!("New block event {:?}", block);
263        let block_hash = block.hash();
264        // notify all subscribers
265        for subscriber in self.new_block_subscribers.values() {
266            let block = block.clone();
267            let subscriber = subscriber.clone();
268            self.handle.spawn(async move {
269                if let Err(e) = subscriber.send(block).await {
270                    error!("Failed to notify new block, error: {}", e);
271                }
272            });
273        }
274
275        // notify all watchers
276        for watcher in self.new_block_watchers.values() {
277            if let Err(e) = watcher.send(block_hash.clone()) {
278                error!("Failed to notify new block watcher, error: {}", e);
279            }
280        }
281
282        // notify script
283        if let Some(script) = self.config.new_block_notify_script.clone() {
284            let script_timeout = self.timeout.script;
285            self.handle.spawn(async move {
286                let args = [format!("{block_hash:#x}")];
287                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
288                    Ok(ret) => match ret {
289                        Ok(status) => debug!("The new_block_notify script exited with: {status}"),
290                        Err(e) => error!(
291                            "Failed to run new_block_notify_script: {} {:?}, error: {}",
292                            script, args[0], e
293                        ),
294                    },
295                    Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
296                }
297            });
298        }
299    }
300
301    fn handle_register_new_transaction(
302        &mut self,
303        msg: Request<String, Receiver<PoolTransactionEntry>>,
304    ) {
305        let Request {
306            responder,
307            arguments: name,
308        } = msg;
309        debug!("Register new_transaction {:?}", name);
310        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
311        self.new_transaction_subscribers.insert(name, sender);
312        let _ = responder.send(receiver);
313    }
314
315    fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
316        trace!("New tx event {:?}", tx_entry);
317        // notify all subscribers
318        let tx_timeout = self.timeout.tx;
319        // notify all subscribers
320        for subscriber in self.new_transaction_subscribers.values() {
321            let tx_entry = tx_entry.clone();
322            let subscriber = subscriber.clone();
323            self.handle.spawn(async move {
324                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
325                    error!("Failed to notify new transaction, error: {}", e);
326                }
327            });
328        }
329    }
330
331    fn handle_register_proposed_transaction(
332        &mut self,
333        msg: Request<String, Receiver<PoolTransactionEntry>>,
334    ) {
335        let Request {
336            responder,
337            arguments: name,
338        } = msg;
339        debug!("Register proposed_transaction {:?}", name);
340        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
341        self.proposed_transaction_subscribers.insert(name, sender);
342        let _ = responder.send(receiver);
343    }
344
345    fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
346        trace!("Proposed tx event {:?}", tx_entry);
347        // notify all subscribers
348        let tx_timeout = self.timeout.tx;
349        // notify all subscribers
350        for subscriber in self.proposed_transaction_subscribers.values() {
351            let tx_entry = tx_entry.clone();
352            let subscriber = subscriber.clone();
353            self.handle.spawn(async move {
354                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
355                    error!("Failed to notify proposed transaction, error {}", e);
356                }
357            });
358        }
359    }
360
361    fn handle_register_reject_transaction(
362        &mut self,
363        msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
364    ) {
365        let Request {
366            responder,
367            arguments: name,
368        } = msg;
369        debug!("Register reject_transaction {:?}", name);
370        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
371        self.reject_transaction_subscribers.insert(name, sender);
372        let _ = responder.send(receiver);
373    }
374
375    fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
376        trace!("Tx reject event {:?}", tx_entry);
377        // notify all subscribers
378        let tx_timeout = self.timeout.tx;
379        // notify all subscribers
380        for subscriber in self.reject_transaction_subscribers.values() {
381            let tx_entry = tx_entry.clone();
382            let subscriber = subscriber.clone();
383            self.handle.spawn(async move {
384                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
385                    error!("Failed to notify transaction reject, error: {}", e);
386                }
387            });
388        }
389    }
390
391    fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
392        let Request {
393            responder,
394            arguments: name,
395        } = msg;
396        debug!("Register network_alert {:?}", name);
397        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
398        self.network_alert_subscribers.insert(name, sender);
399        let _ = responder.send(receiver);
400    }
401
402    fn handle_notify_network_alert(&self, alert: Alert) {
403        trace!("Network alert event {:?}", alert);
404        let alert_timeout = self.timeout.alert;
405        let message = alert
406            .as_reader()
407            .raw()
408            .message()
409            .as_utf8()
410            .expect("alert message should be utf8")
411            .to_owned();
412        // notify all subscribers
413        for subscriber in self.network_alert_subscribers.values() {
414            let subscriber = subscriber.clone();
415            let alert = alert.clone();
416            self.handle.spawn(async move {
417                if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
418                    error!("Failed to notify network_alert, error: {}", e);
419                }
420            });
421        }
422
423        // notify script
424        if let Some(script) = self.config.network_alert_notify_script.clone() {
425            let script_timeout = self.timeout.script;
426            self.handle.spawn(async move {
427                let args = [message];
428                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
429                    Ok(ret) => match ret {
430                        Ok(status) => {
431                            debug!("the network_alert_notify script exited with: {}", status)
432                        }
433                        Err(e) => error!(
434                            "failed to run network_alert_notify_script: {} {}, error: {}",
435                            script, args[0], e
436                        ),
437                    },
438                    Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
439                }
440            });
441        }
442    }
443
444    fn handle_register_log(&mut self, msg: Request<String, Receiver<LogEntry>>) {
445        let Request {
446            responder,
447            arguments: name,
448        } = msg;
449        debug!("Register log {:?}", name);
450        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
451        self.log_subscribers.insert(name, sender);
452        let _ = responder.send(receiver);
453    }
454
455    fn handle_notify_log(&self, log_entry: LogEntry) {
456        for subscriber in self.log_subscribers.values() {
457            let log_entry = log_entry.clone();
458            let subscriber = subscriber.clone();
459            // Ignore failures
460            subscriber.try_send(log_entry).ok();
461        }
462    }
463}
464
465impl NotifyController {
466    /// Subscribes to new block notifications with the given name.
467    ///
468    /// Returns a receiver channel that will receive new block events.
469    pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
470        Request::call(&self.new_block_register, name.to_string())
471            .await
472            .expect("Subscribe new block should be OK")
473    }
474
475    /// watch new block notify
476    pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
477        Request::call(&self.new_block_watcher, name.to_string())
478            .await
479            .expect("Watch new block should be OK")
480    }
481
482    /// Notifies all subscribers of a new block.
483    pub fn notify_new_block(&self, block: BlockView) {
484        let new_block_notifier = self.new_block_notifier.clone();
485        self.handle.spawn(async move {
486            if let Err(e) = new_block_notifier.send(block).await {
487                error!("notify_new_block channel is closed: {}", e);
488            }
489        });
490    }
491
492    /// Subscribes to new transaction notifications with the given name.
493    ///
494    /// Returns a receiver channel that will receive new transaction events from the transaction pool.
495    pub async fn subscribe_new_transaction<S: ToString>(
496        &self,
497        name: S,
498    ) -> Receiver<PoolTransactionEntry> {
499        Request::call(&self.new_transaction_register, name.to_string())
500            .await
501            .expect("Subscribe new transaction should be OK")
502    }
503
504    /// Notifies all subscribers of a new transaction in the transaction pool.
505    pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
506        let new_transaction_notifier = self.new_transaction_notifier.clone();
507        self.handle.spawn(async move {
508            if let Err(e) = new_transaction_notifier.send(tx_entry).await {
509                error!("notify_new_transaction channel is closed: {}", e);
510            }
511        });
512    }
513
514    /// Subscribes to proposed transaction notifications with the given name.
515    ///
516    /// Returns a receiver channel that will receive proposed transaction events.
517    pub async fn subscribe_proposed_transaction<S: ToString>(
518        &self,
519        name: S,
520    ) -> Receiver<PoolTransactionEntry> {
521        Request::call(&self.proposed_transaction_register, name.to_string())
522            .await
523            .expect("Subscribe proposed transaction should be OK")
524    }
525
526    /// Notifies all subscribers of a proposed transaction.
527    pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
528        let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
529        self.handle.spawn(async move {
530            if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
531                error!("notify_proposed_transaction channel is closed: {}", e);
532            }
533        });
534    }
535
536    /// Subscribes to rejected transaction notifications with the given name.
537    ///
538    /// Returns a receiver channel that will receive rejected transaction events.
539    pub async fn subscribe_reject_transaction<S: ToString>(
540        &self,
541        name: S,
542    ) -> Receiver<(PoolTransactionEntry, Reject)> {
543        Request::call(&self.reject_transaction_register, name.to_string())
544            .await
545            .expect("Subscribe rejected transaction should be OK")
546    }
547
548    /// Notifies all subscribers of a rejected transaction.
549    pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
550        let reject_transaction_notifier = self.reject_transaction_notifier.clone();
551        self.handle.spawn(async move {
552            if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
553                error!("notify_reject_transaction channel is closed: {}", e);
554            }
555        });
556    }
557
558    /// Subscribes to network alert notifications with the given name.
559    ///
560    /// Returns a receiver channel that will receive network alert events.
561    pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
562        Request::call(&self.network_alert_register, name.to_string())
563            .await
564            .expect("Subscribe network alert should be OK")
565    }
566
567    /// Notifies all subscribers of a network alert.
568    pub fn notify_network_alert(&self, alert: Alert) {
569        let network_alert_notifier = self.network_alert_notifier.clone();
570        self.handle.spawn(async move {
571            if let Err(e) = network_alert_notifier.send(alert).await {
572                error!("notify_network_alert channel is closed: {}", e);
573            }
574        });
575    }
576
577    /// Subscribes to log notifications with the given name.
578    ///
579    /// Returns a receiver channel that will receive log events.
580    pub async fn subscribe_log<S: ToString>(&self, name: S) -> Receiver<LogEntry> {
581        Request::call(&self.log_register, name.to_string())
582            .await
583            .expect("Subscribe log should be OK")
584    }
585
586    /// Notifies all subscribers of a log entry.
587    pub fn notify_log(&self, log_entry: LogEntry) {
588        let log_notifier = self.log_notifier.clone();
589        // Ignore failures
590        log_notifier.try_send(log_entry).ok();
591    }
592}