ckb_notify/
lib.rs

1//! TODO(doc): @quake
2use ckb_app_config::NotifyConfig;
3use ckb_async_runtime::Handle;
4use ckb_logger::{debug, error, info, trace};
5use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
6use ckb_types::packed::Byte32;
7use ckb_types::{
8    core::{BlockView, tx_pool::Reject},
9    packed::Alert,
10};
11use std::{collections::HashMap, time::Duration};
12use tokio::process::Command;
13use tokio::sync::watch;
14use tokio::sync::{
15    mpsc::{self, Receiver, Sender},
16    oneshot,
17};
18use tokio::time::timeout;
19
20pub use ckb_types::core::tx_pool::PoolTransactionEntry;
21
22/// Asynchronous request sent to the service.
23pub struct Request<A, R> {
24    /// Oneshot channel for the service to send back the response.
25    pub responder: oneshot::Sender<R>,
26    /// Request arguments.
27    pub arguments: A,
28}
29
30impl<A, R> Request<A, R> {
31    /// Call the service with the arguments and wait for the response.
32    pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
33        let (responder, response) = oneshot::channel();
34        let _ = sender
35            .send(Request {
36                responder,
37                arguments,
38            })
39            .await;
40        response.await.ok()
41    }
42}
43
44/// TODO(doc): @quake
45pub const SIGNAL_CHANNEL_SIZE: usize = 1;
46/// TODO(doc): @quake
47pub const REGISTER_CHANNEL_SIZE: usize = 2;
48/// TODO(doc): @quake
49pub const NOTIFY_CHANNEL_SIZE: usize = 128;
50
51/// TODO(doc): @quake
52pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
53
54/// watcher request type alias
55pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
56
57/// Notify timeout config
58#[derive(Copy, Clone)]
59pub(crate) struct NotifyTimeout {
60    pub(crate) tx: Duration,
61    pub(crate) alert: Duration,
62    pub(crate) script: Duration,
63}
64
65const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
66const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
67const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
68
69impl NotifyTimeout {
70    pub(crate) fn new(config: &NotifyConfig) -> Self {
71        NotifyTimeout {
72            tx: config
73                .notify_tx_timeout
74                .map(Duration::from_millis)
75                .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
76            alert: config
77                .notify_alert_timeout
78                .map(Duration::from_millis)
79                .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
80            script: config
81                .script_timeout
82                .map(Duration::from_millis)
83                .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
84        }
85    }
86}
87
88/// TODO(doc): @quake
89#[derive(Clone)]
90pub struct NotifyController {
91    new_block_register: NotifyRegister<BlockView>,
92    new_block_watcher: NotifyWatcher<Byte32>,
93    new_block_notifier: Sender<BlockView>,
94    new_transaction_register: NotifyRegister<PoolTransactionEntry>,
95    new_transaction_notifier: Sender<PoolTransactionEntry>,
96    proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
97    proposed_transaction_notifier: Sender<PoolTransactionEntry>,
98    reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
99    reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
100    network_alert_register: NotifyRegister<Alert>,
101    network_alert_notifier: Sender<Alert>,
102    handle: Handle,
103}
104
105/// TODO(doc): @quake
106pub struct NotifyService {
107    config: NotifyConfig,
108    new_block_subscribers: HashMap<String, Sender<BlockView>>,
109    new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
110    new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
111    proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
112    reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
113    network_alert_subscribers: HashMap<String, Sender<Alert>>,
114    timeout: NotifyTimeout,
115    handle: Handle,
116}
117
118impl NotifyService {
119    /// TODO(doc): @quake
120    pub fn new(config: NotifyConfig, handle: Handle) -> Self {
121        let timeout = NotifyTimeout::new(&config);
122
123        Self {
124            config,
125            new_block_subscribers: HashMap::default(),
126            new_block_watchers: HashMap::default(),
127            new_transaction_subscribers: HashMap::default(),
128            proposed_transaction_subscribers: HashMap::default(),
129            reject_transaction_subscribers: HashMap::default(),
130            network_alert_subscribers: HashMap::default(),
131            timeout,
132            handle,
133        }
134    }
135
136    /// start background tokio spawned task.
137    pub fn start(mut self) -> NotifyController {
138        let signal_receiver: CancellationToken = new_tokio_exit_rx();
139        let handle = self.handle.clone();
140
141        let (new_block_register, mut new_block_register_receiver) =
142            mpsc::channel(REGISTER_CHANNEL_SIZE);
143        let (new_block_watcher, mut new_block_watcher_receiver) =
144            mpsc::channel(REGISTER_CHANNEL_SIZE);
145        let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
146
147        let (new_transaction_register, mut new_transaction_register_receiver) =
148            mpsc::channel(REGISTER_CHANNEL_SIZE);
149        let (new_transaction_sender, mut new_transaction_receiver) =
150            mpsc::channel(NOTIFY_CHANNEL_SIZE);
151
152        let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
153            mpsc::channel(REGISTER_CHANNEL_SIZE);
154        let (proposed_transaction_sender, mut proposed_transaction_receiver) =
155            mpsc::channel(NOTIFY_CHANNEL_SIZE);
156
157        let (reject_transaction_register, mut reject_transaction_register_receiver) =
158            mpsc::channel(REGISTER_CHANNEL_SIZE);
159        let (reject_transaction_sender, mut reject_transaction_receiver) =
160            mpsc::channel(NOTIFY_CHANNEL_SIZE);
161
162        let (network_alert_register, mut network_alert_register_receiver) =
163            mpsc::channel(REGISTER_CHANNEL_SIZE);
164        let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
165
166        handle.spawn(async move {
167            loop {
168                tokio::select! {
169                    Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
170                    Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
171                    Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
172                    Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
173                    Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
174                    Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
175                    Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
176                    Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
177                    Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
178                    Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
179                    Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
180                    _ = signal_receiver.cancelled() => {
181                        info!("NotifyService received exit signal, exit now");
182                        break;
183                    }
184                    else => break,
185                }
186            }
187        });
188
189        NotifyController {
190            new_block_register,
191            new_block_watcher,
192            new_block_notifier: new_block_sender,
193            new_transaction_register,
194            new_transaction_notifier: new_transaction_sender,
195            proposed_transaction_register,
196            proposed_transaction_notifier: proposed_transaction_sender,
197            reject_transaction_register,
198            reject_transaction_notifier: reject_transaction_sender,
199            network_alert_register,
200            network_alert_notifier: network_alert_sender,
201            handle,
202        }
203    }
204
205    fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
206        let Request {
207            responder,
208            arguments: name,
209        } = msg;
210        debug!("Watch new_block {:?}", name);
211        let (sender, receiver) = watch::channel(Byte32::zero());
212        self.new_block_watchers.insert(name, sender);
213        let _ = responder.send(receiver);
214    }
215
216    fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
217        let Request {
218            responder,
219            arguments: name,
220        } = msg;
221        debug!("Register new_block {:?}", name);
222        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
223        self.new_block_subscribers.insert(name, sender);
224        let _ = responder.send(receiver);
225    }
226
227    fn handle_notify_new_block(&self, block: BlockView) {
228        trace!("New block event {:?}", block);
229        let block_hash = block.hash();
230        // notify all subscribers
231        for subscriber in self.new_block_subscribers.values() {
232            let block = block.clone();
233            let subscriber = subscriber.clone();
234            self.handle.spawn(async move {
235                if let Err(e) = subscriber.send(block).await {
236                    error!("Failed to notify new block, error: {}", e);
237                }
238            });
239        }
240
241        // notify all watchers
242        for watcher in self.new_block_watchers.values() {
243            if let Err(e) = watcher.send(block_hash.clone()) {
244                error!("Failed to notify new block watcher, error: {}", e);
245            }
246        }
247
248        // notify script
249        if let Some(script) = self.config.new_block_notify_script.clone() {
250            let script_timeout = self.timeout.script;
251            self.handle.spawn(async move {
252                let args = [format!("{block_hash:#x}")];
253                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
254                    Ok(ret) => match ret {
255                        Ok(status) => debug!("The new_block_notify script exited with: {status}"),
256                        Err(e) => error!(
257                            "Failed to run new_block_notify_script: {} {:?}, error: {}",
258                            script, args[0], e
259                        ),
260                    },
261                    Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
262                }
263            });
264        }
265    }
266
267    fn handle_register_new_transaction(
268        &mut self,
269        msg: Request<String, Receiver<PoolTransactionEntry>>,
270    ) {
271        let Request {
272            responder,
273            arguments: name,
274        } = msg;
275        debug!("Register new_transaction {:?}", name);
276        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
277        self.new_transaction_subscribers.insert(name, sender);
278        let _ = responder.send(receiver);
279    }
280
281    fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
282        trace!("New tx event {:?}", tx_entry);
283        // notify all subscribers
284        let tx_timeout = self.timeout.tx;
285        // notify all subscribers
286        for subscriber in self.new_transaction_subscribers.values() {
287            let tx_entry = tx_entry.clone();
288            let subscriber = subscriber.clone();
289            self.handle.spawn(async move {
290                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
291                    error!("Failed to notify new transaction, error: {}", e);
292                }
293            });
294        }
295    }
296
297    fn handle_register_proposed_transaction(
298        &mut self,
299        msg: Request<String, Receiver<PoolTransactionEntry>>,
300    ) {
301        let Request {
302            responder,
303            arguments: name,
304        } = msg;
305        debug!("Register proposed_transaction {:?}", name);
306        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
307        self.proposed_transaction_subscribers.insert(name, sender);
308        let _ = responder.send(receiver);
309    }
310
311    fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
312        trace!("Proposed tx event {:?}", tx_entry);
313        // notify all subscribers
314        let tx_timeout = self.timeout.tx;
315        // notify all subscribers
316        for subscriber in self.proposed_transaction_subscribers.values() {
317            let tx_entry = tx_entry.clone();
318            let subscriber = subscriber.clone();
319            self.handle.spawn(async move {
320                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
321                    error!("Failed to notify proposed transaction, error {}", e);
322                }
323            });
324        }
325    }
326
327    fn handle_register_reject_transaction(
328        &mut self,
329        msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
330    ) {
331        let Request {
332            responder,
333            arguments: name,
334        } = msg;
335        debug!("Register reject_transaction {:?}", name);
336        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
337        self.reject_transaction_subscribers.insert(name, sender);
338        let _ = responder.send(receiver);
339    }
340
341    fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
342        trace!("Tx reject event {:?}", tx_entry);
343        // notify all subscribers
344        let tx_timeout = self.timeout.tx;
345        // notify all subscribers
346        for subscriber in self.reject_transaction_subscribers.values() {
347            let tx_entry = tx_entry.clone();
348            let subscriber = subscriber.clone();
349            self.handle.spawn(async move {
350                if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
351                    error!("Failed to notify transaction reject, error: {}", e);
352                }
353            });
354        }
355    }
356
357    fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
358        let Request {
359            responder,
360            arguments: name,
361        } = msg;
362        debug!("Register network_alert {:?}", name);
363        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
364        self.network_alert_subscribers.insert(name, sender);
365        let _ = responder.send(receiver);
366    }
367
368    fn handle_notify_network_alert(&self, alert: Alert) {
369        trace!("Network alert event {:?}", alert);
370        let alert_timeout = self.timeout.alert;
371        let message = alert
372            .as_reader()
373            .raw()
374            .message()
375            .as_utf8()
376            .expect("alert message should be utf8")
377            .to_owned();
378        // notify all subscribers
379        for subscriber in self.network_alert_subscribers.values() {
380            let subscriber = subscriber.clone();
381            let alert = alert.clone();
382            self.handle.spawn(async move {
383                if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
384                    error!("Failed to notify network_alert, error: {}", e);
385                }
386            });
387        }
388
389        // notify script
390        if let Some(script) = self.config.network_alert_notify_script.clone() {
391            let script_timeout = self.timeout.script;
392            self.handle.spawn(async move {
393                let args = [message];
394                match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
395                    Ok(ret) => match ret {
396                        Ok(status) => {
397                            debug!("the network_alert_notify script exited with: {}", status)
398                        }
399                        Err(e) => error!(
400                            "failed to run network_alert_notify_script: {} {}, error: {}",
401                            script, args[0], e
402                        ),
403                    },
404                    Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
405                }
406            });
407        }
408    }
409}
410
411impl NotifyController {
412    /// TODO(doc): @quake
413    pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
414        Request::call(&self.new_block_register, name.to_string())
415            .await
416            .expect("Subscribe new block should be OK")
417    }
418
419    /// watch new block notify
420    pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
421        Request::call(&self.new_block_watcher, name.to_string())
422            .await
423            .expect("Watch new block should be OK")
424    }
425
426    /// TODO(doc): @quake
427    pub fn notify_new_block(&self, block: BlockView) {
428        let new_block_notifier = self.new_block_notifier.clone();
429        self.handle.spawn(async move {
430            if let Err(e) = new_block_notifier.send(block).await {
431                error!("notify_new_block channel is closed: {}", e);
432            }
433        });
434    }
435
436    /// TODO(doc): @quake
437    pub async fn subscribe_new_transaction<S: ToString>(
438        &self,
439        name: S,
440    ) -> Receiver<PoolTransactionEntry> {
441        Request::call(&self.new_transaction_register, name.to_string())
442            .await
443            .expect("Subscribe new transaction should be OK")
444    }
445
446    /// TODO(doc): @quake
447    pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
448        let new_transaction_notifier = self.new_transaction_notifier.clone();
449        self.handle.spawn(async move {
450            if let Err(e) = new_transaction_notifier.send(tx_entry).await {
451                error!("notify_new_transaction channel is closed: {}", e);
452            }
453        });
454    }
455
456    /// TODO(doc): @quake
457    pub async fn subscribe_proposed_transaction<S: ToString>(
458        &self,
459        name: S,
460    ) -> Receiver<PoolTransactionEntry> {
461        Request::call(&self.proposed_transaction_register, name.to_string())
462            .await
463            .expect("Subscribe proposed transaction should be OK")
464    }
465
466    /// TODO(doc): @quake
467    pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
468        let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
469        self.handle.spawn(async move {
470            if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
471                error!("notify_proposed_transaction channel is closed: {}", e);
472            }
473        });
474    }
475
476    /// TODO(doc): @quake
477    pub async fn subscribe_reject_transaction<S: ToString>(
478        &self,
479        name: S,
480    ) -> Receiver<(PoolTransactionEntry, Reject)> {
481        Request::call(&self.reject_transaction_register, name.to_string())
482            .await
483            .expect("Subscribe rejected transaction should be OK")
484    }
485
486    /// TODO(doc): @quake
487    pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
488        let reject_transaction_notifier = self.reject_transaction_notifier.clone();
489        self.handle.spawn(async move {
490            if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
491                error!("notify_reject_transaction channel is closed: {}", e);
492            }
493        });
494    }
495
496    /// TODO(doc): @quake
497    pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
498        Request::call(&self.network_alert_register, name.to_string())
499            .await
500            .expect("Subscribe network alert should be OK")
501    }
502
503    /// TODO(doc): @quake
504    pub fn notify_network_alert(&self, alert: Alert) {
505        let network_alert_notifier = self.network_alert_notifier.clone();
506        self.handle.spawn(async move {
507            if let Err(e) = network_alert_notifier.send(alert).await {
508                error!("notify_network_alert channel is closed: {}", e);
509            }
510        });
511    }
512}