1use ckb_app_config::NotifyConfig;
7use ckb_async_runtime::Handle;
8use ckb_logger::{Level, debug, error, info, trace};
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::packed::Byte32;
11use ckb_types::{
12 core::{BlockView, tx_pool::Reject},
13 packed::Alert,
14};
15use std::{collections::HashMap, time::Duration};
16use tokio::process::Command;
17use tokio::sync::watch;
18use tokio::sync::{
19 mpsc::{self, Receiver, Sender},
20 oneshot,
21};
22use tokio::time::timeout;
23
24pub use ckb_types::core::tx_pool::PoolTransactionEntry;
25
26#[derive(Clone, Debug)]
28pub struct LogEntry {
29 pub message: String,
31 pub level: Level,
33 pub target: String,
35 pub date: String,
37}
38
39pub struct Request<A, R> {
41 pub responder: oneshot::Sender<R>,
43 pub arguments: A,
45}
46
47impl<A, R> Request<A, R> {
48 pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
50 let (responder, response) = oneshot::channel();
51 let _ = sender
52 .send(Request {
53 responder,
54 arguments,
55 })
56 .await;
57 response.await.ok()
58 }
59}
60
61pub const SIGNAL_CHANNEL_SIZE: usize = 1;
63pub const REGISTER_CHANNEL_SIZE: usize = 2;
65pub const NOTIFY_CHANNEL_SIZE: usize = 128;
67
68pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
70
71pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
73
74#[derive(Copy, Clone)]
76pub(crate) struct NotifyTimeout {
77 pub(crate) tx: Duration,
78 pub(crate) alert: Duration,
79 pub(crate) script: Duration,
80}
81
82const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
83const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
84const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
85
86impl NotifyTimeout {
87 pub(crate) fn new(config: &NotifyConfig) -> Self {
88 NotifyTimeout {
89 tx: config
90 .notify_tx_timeout
91 .map(Duration::from_millis)
92 .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
93 alert: config
94 .notify_alert_timeout
95 .map(Duration::from_millis)
96 .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
97 script: config
98 .script_timeout
99 .map(Duration::from_millis)
100 .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
101 }
102 }
103}
104
105#[derive(Clone)]
110pub struct NotifyController {
111 new_block_register: NotifyRegister<BlockView>,
112 new_block_watcher: NotifyWatcher<Byte32>,
113 new_block_notifier: Sender<BlockView>,
114 new_transaction_register: NotifyRegister<PoolTransactionEntry>,
115 new_transaction_notifier: Sender<PoolTransactionEntry>,
116 proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
117 proposed_transaction_notifier: Sender<PoolTransactionEntry>,
118 reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
119 reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
120 network_alert_register: NotifyRegister<Alert>,
121 network_alert_notifier: Sender<Alert>,
122 log_register: NotifyRegister<LogEntry>,
123 log_notifier: Sender<LogEntry>,
124 handle: Handle,
125}
126
127pub struct NotifyService {
131 config: NotifyConfig,
132 new_block_subscribers: HashMap<String, Sender<BlockView>>,
133 new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
134 new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
135 proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
136 reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
137 network_alert_subscribers: HashMap<String, Sender<Alert>>,
138 log_subscribers: HashMap<String, Sender<LogEntry>>,
139 timeout: NotifyTimeout,
140 handle: Handle,
141}
142
143impl NotifyService {
144 pub fn new(config: NotifyConfig, handle: Handle) -> Self {
146 let timeout = NotifyTimeout::new(&config);
147
148 Self {
149 config,
150 new_block_subscribers: HashMap::default(),
151 new_block_watchers: HashMap::default(),
152 new_transaction_subscribers: HashMap::default(),
153 proposed_transaction_subscribers: HashMap::default(),
154 reject_transaction_subscribers: HashMap::default(),
155 network_alert_subscribers: HashMap::default(),
156 log_subscribers: HashMap::default(),
157 timeout,
158 handle,
159 }
160 }
161
162 pub fn start(mut self) -> NotifyController {
164 let stop_token: CancellationToken = new_tokio_exit_rx();
165 let handle = self.handle.clone();
166
167 let (new_block_register, mut new_block_register_receiver) =
168 mpsc::channel(REGISTER_CHANNEL_SIZE);
169 let (new_block_watcher, mut new_block_watcher_receiver) =
170 mpsc::channel(REGISTER_CHANNEL_SIZE);
171 let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
172
173 let (new_transaction_register, mut new_transaction_register_receiver) =
174 mpsc::channel(REGISTER_CHANNEL_SIZE);
175 let (new_transaction_sender, mut new_transaction_receiver) =
176 mpsc::channel(NOTIFY_CHANNEL_SIZE);
177
178 let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
179 mpsc::channel(REGISTER_CHANNEL_SIZE);
180 let (proposed_transaction_sender, mut proposed_transaction_receiver) =
181 mpsc::channel(NOTIFY_CHANNEL_SIZE);
182
183 let (reject_transaction_register, mut reject_transaction_register_receiver) =
184 mpsc::channel(REGISTER_CHANNEL_SIZE);
185 let (reject_transaction_sender, mut reject_transaction_receiver) =
186 mpsc::channel(NOTIFY_CHANNEL_SIZE);
187
188 let (network_alert_register, mut network_alert_register_receiver) =
189 mpsc::channel(REGISTER_CHANNEL_SIZE);
190 let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
191
192 let (log_register, mut log_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
193 let (log_sender, mut log_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
194
195 let stop_token_clone = stop_token;
196 handle.spawn(async move {
197 loop {
198 tokio::select! {
199 Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
200 Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
201 Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
202 Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
203 Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
204 Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
205 Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
206 Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
207 Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
208 Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
209 Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
210 Some(msg) = log_register_receiver.recv() => { self.handle_register_log(msg) },
211 Some(msg) = log_receiver.recv() => { self.handle_notify_log(msg) },
212 _ = stop_token_clone.cancelled() => {
213 info!("NotifyService received exit signal, exit now");
214 break;
215 }
216 else => break,
217 }
218 }
219 });
220
221 NotifyController {
222 new_block_register,
223 new_block_watcher,
224 new_block_notifier: new_block_sender,
225 new_transaction_register,
226 new_transaction_notifier: new_transaction_sender,
227 proposed_transaction_register,
228 proposed_transaction_notifier: proposed_transaction_sender,
229 reject_transaction_register,
230 reject_transaction_notifier: reject_transaction_sender,
231 network_alert_register,
232 network_alert_notifier: network_alert_sender,
233 log_register,
234 log_notifier: log_sender,
235 handle,
236 }
237 }
238
239 fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
240 let Request {
241 responder,
242 arguments: name,
243 } = msg;
244 debug!("Watch new_block {:?}", name);
245 let (sender, receiver) = watch::channel(Byte32::zero());
246 self.new_block_watchers.insert(name, sender);
247 let _ = responder.send(receiver);
248 }
249
250 fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
251 let Request {
252 responder,
253 arguments: name,
254 } = msg;
255 debug!("Register new_block {:?}", name);
256 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
257 self.new_block_subscribers.insert(name, sender);
258 let _ = responder.send(receiver);
259 }
260
261 fn handle_notify_new_block(&self, block: BlockView) {
262 trace!("New block event {:?}", block);
263 let block_hash = block.hash();
264 for subscriber in self.new_block_subscribers.values() {
266 let block = block.clone();
267 let subscriber = subscriber.clone();
268 self.handle.spawn(async move {
269 if let Err(e) = subscriber.send(block).await {
270 error!("Failed to notify new block, error: {}", e);
271 }
272 });
273 }
274
275 for watcher in self.new_block_watchers.values() {
277 if let Err(e) = watcher.send(block_hash.clone()) {
278 error!("Failed to notify new block watcher, error: {}", e);
279 }
280 }
281
282 if let Some(script) = self.config.new_block_notify_script.clone() {
284 let script_timeout = self.timeout.script;
285 self.handle.spawn(async move {
286 let args = [format!("{block_hash:#x}")];
287 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
288 Ok(ret) => match ret {
289 Ok(status) => debug!("The new_block_notify script exited with: {status}"),
290 Err(e) => error!(
291 "Failed to run new_block_notify_script: {} {:?}, error: {}",
292 script, args[0], e
293 ),
294 },
295 Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
296 }
297 });
298 }
299 }
300
301 fn handle_register_new_transaction(
302 &mut self,
303 msg: Request<String, Receiver<PoolTransactionEntry>>,
304 ) {
305 let Request {
306 responder,
307 arguments: name,
308 } = msg;
309 debug!("Register new_transaction {:?}", name);
310 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
311 self.new_transaction_subscribers.insert(name, sender);
312 let _ = responder.send(receiver);
313 }
314
315 fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
316 trace!("New tx event {:?}", tx_entry);
317 let tx_timeout = self.timeout.tx;
319 for subscriber in self.new_transaction_subscribers.values() {
321 let tx_entry = tx_entry.clone();
322 let subscriber = subscriber.clone();
323 self.handle.spawn(async move {
324 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
325 error!("Failed to notify new transaction, error: {}", e);
326 }
327 });
328 }
329 }
330
331 fn handle_register_proposed_transaction(
332 &mut self,
333 msg: Request<String, Receiver<PoolTransactionEntry>>,
334 ) {
335 let Request {
336 responder,
337 arguments: name,
338 } = msg;
339 debug!("Register proposed_transaction {:?}", name);
340 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
341 self.proposed_transaction_subscribers.insert(name, sender);
342 let _ = responder.send(receiver);
343 }
344
345 fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
346 trace!("Proposed tx event {:?}", tx_entry);
347 let tx_timeout = self.timeout.tx;
349 for subscriber in self.proposed_transaction_subscribers.values() {
351 let tx_entry = tx_entry.clone();
352 let subscriber = subscriber.clone();
353 self.handle.spawn(async move {
354 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
355 error!("Failed to notify proposed transaction, error {}", e);
356 }
357 });
358 }
359 }
360
361 fn handle_register_reject_transaction(
362 &mut self,
363 msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
364 ) {
365 let Request {
366 responder,
367 arguments: name,
368 } = msg;
369 debug!("Register reject_transaction {:?}", name);
370 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
371 self.reject_transaction_subscribers.insert(name, sender);
372 let _ = responder.send(receiver);
373 }
374
375 fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
376 trace!("Tx reject event {:?}", tx_entry);
377 let tx_timeout = self.timeout.tx;
379 for subscriber in self.reject_transaction_subscribers.values() {
381 let tx_entry = tx_entry.clone();
382 let subscriber = subscriber.clone();
383 self.handle.spawn(async move {
384 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
385 error!("Failed to notify transaction reject, error: {}", e);
386 }
387 });
388 }
389 }
390
391 fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
392 let Request {
393 responder,
394 arguments: name,
395 } = msg;
396 debug!("Register network_alert {:?}", name);
397 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
398 self.network_alert_subscribers.insert(name, sender);
399 let _ = responder.send(receiver);
400 }
401
402 fn handle_notify_network_alert(&self, alert: Alert) {
403 trace!("Network alert event {:?}", alert);
404 let alert_timeout = self.timeout.alert;
405 let message = alert
406 .as_reader()
407 .raw()
408 .message()
409 .as_utf8()
410 .expect("alert message should be utf8")
411 .to_owned();
412 for subscriber in self.network_alert_subscribers.values() {
414 let subscriber = subscriber.clone();
415 let alert = alert.clone();
416 self.handle.spawn(async move {
417 if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
418 error!("Failed to notify network_alert, error: {}", e);
419 }
420 });
421 }
422
423 if let Some(script) = self.config.network_alert_notify_script.clone() {
425 let script_timeout = self.timeout.script;
426 self.handle.spawn(async move {
427 let args = [message];
428 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
429 Ok(ret) => match ret {
430 Ok(status) => {
431 debug!("the network_alert_notify script exited with: {}", status)
432 }
433 Err(e) => error!(
434 "failed to run network_alert_notify_script: {} {}, error: {}",
435 script, args[0], e
436 ),
437 },
438 Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
439 }
440 });
441 }
442 }
443
444 fn handle_register_log(&mut self, msg: Request<String, Receiver<LogEntry>>) {
445 let Request {
446 responder,
447 arguments: name,
448 } = msg;
449 debug!("Register log {:?}", name);
450 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
451 self.log_subscribers.insert(name, sender);
452 let _ = responder.send(receiver);
453 }
454
455 fn handle_notify_log(&self, log_entry: LogEntry) {
456 for subscriber in self.log_subscribers.values() {
457 let log_entry = log_entry.clone();
458 let subscriber = subscriber.clone();
459 subscriber.try_send(log_entry).ok();
461 }
462 }
463}
464
465impl NotifyController {
466 pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
470 Request::call(&self.new_block_register, name.to_string())
471 .await
472 .expect("Subscribe new block should be OK")
473 }
474
475 pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
477 Request::call(&self.new_block_watcher, name.to_string())
478 .await
479 .expect("Watch new block should be OK")
480 }
481
482 pub fn notify_new_block(&self, block: BlockView) {
484 let new_block_notifier = self.new_block_notifier.clone();
485 self.handle.spawn(async move {
486 if let Err(e) = new_block_notifier.send(block).await {
487 error!("notify_new_block channel is closed: {}", e);
488 }
489 });
490 }
491
492 pub async fn subscribe_new_transaction<S: ToString>(
496 &self,
497 name: S,
498 ) -> Receiver<PoolTransactionEntry> {
499 Request::call(&self.new_transaction_register, name.to_string())
500 .await
501 .expect("Subscribe new transaction should be OK")
502 }
503
504 pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
506 let new_transaction_notifier = self.new_transaction_notifier.clone();
507 self.handle.spawn(async move {
508 if let Err(e) = new_transaction_notifier.send(tx_entry).await {
509 error!("notify_new_transaction channel is closed: {}", e);
510 }
511 });
512 }
513
514 pub async fn subscribe_proposed_transaction<S: ToString>(
518 &self,
519 name: S,
520 ) -> Receiver<PoolTransactionEntry> {
521 Request::call(&self.proposed_transaction_register, name.to_string())
522 .await
523 .expect("Subscribe proposed transaction should be OK")
524 }
525
526 pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
528 let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
529 self.handle.spawn(async move {
530 if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
531 error!("notify_proposed_transaction channel is closed: {}", e);
532 }
533 });
534 }
535
536 pub async fn subscribe_reject_transaction<S: ToString>(
540 &self,
541 name: S,
542 ) -> Receiver<(PoolTransactionEntry, Reject)> {
543 Request::call(&self.reject_transaction_register, name.to_string())
544 .await
545 .expect("Subscribe rejected transaction should be OK")
546 }
547
548 pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
550 let reject_transaction_notifier = self.reject_transaction_notifier.clone();
551 self.handle.spawn(async move {
552 if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
553 error!("notify_reject_transaction channel is closed: {}", e);
554 }
555 });
556 }
557
558 pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
562 Request::call(&self.network_alert_register, name.to_string())
563 .await
564 .expect("Subscribe network alert should be OK")
565 }
566
567 pub fn notify_network_alert(&self, alert: Alert) {
569 let network_alert_notifier = self.network_alert_notifier.clone();
570 self.handle.spawn(async move {
571 if let Err(e) = network_alert_notifier.send(alert).await {
572 error!("notify_network_alert channel is closed: {}", e);
573 }
574 });
575 }
576
577 pub async fn subscribe_log<S: ToString>(&self, name: S) -> Receiver<LogEntry> {
581 Request::call(&self.log_register, name.to_string())
582 .await
583 .expect("Subscribe log should be OK")
584 }
585
586 pub fn notify_log(&self, log_entry: LogEntry) {
588 let log_notifier = self.log_notifier.clone();
589 log_notifier.try_send(log_entry).ok();
591 }
592}