1use ckb_app_config::NotifyConfig;
3use ckb_async_runtime::Handle;
4use ckb_logger::{debug, error, info, trace};
5use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
6use ckb_types::packed::Byte32;
7use ckb_types::{
8 core::{BlockView, tx_pool::Reject},
9 packed::Alert,
10};
11use std::{collections::HashMap, time::Duration};
12use tokio::process::Command;
13use tokio::sync::watch;
14use tokio::sync::{
15 mpsc::{self, Receiver, Sender},
16 oneshot,
17};
18use tokio::time::timeout;
19
20pub use ckb_types::core::tx_pool::PoolTransactionEntry;
21
22pub struct Request<A, R> {
24 pub responder: oneshot::Sender<R>,
26 pub arguments: A,
28}
29
30impl<A, R> Request<A, R> {
31 pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
33 let (responder, response) = oneshot::channel();
34 let _ = sender
35 .send(Request {
36 responder,
37 arguments,
38 })
39 .await;
40 response.await.ok()
41 }
42}
43
44pub const SIGNAL_CHANNEL_SIZE: usize = 1;
46pub const REGISTER_CHANNEL_SIZE: usize = 2;
48pub const NOTIFY_CHANNEL_SIZE: usize = 128;
50
51pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
53
54pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
56
57#[derive(Copy, Clone)]
59pub(crate) struct NotifyTimeout {
60 pub(crate) tx: Duration,
61 pub(crate) alert: Duration,
62 pub(crate) script: Duration,
63}
64
65const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
66const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
67const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
68
69impl NotifyTimeout {
70 pub(crate) fn new(config: &NotifyConfig) -> Self {
71 NotifyTimeout {
72 tx: config
73 .notify_tx_timeout
74 .map(Duration::from_millis)
75 .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
76 alert: config
77 .notify_alert_timeout
78 .map(Duration::from_millis)
79 .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
80 script: config
81 .script_timeout
82 .map(Duration::from_millis)
83 .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
84 }
85 }
86}
87
88#[derive(Clone)]
90pub struct NotifyController {
91 new_block_register: NotifyRegister<BlockView>,
92 new_block_watcher: NotifyWatcher<Byte32>,
93 new_block_notifier: Sender<BlockView>,
94 new_transaction_register: NotifyRegister<PoolTransactionEntry>,
95 new_transaction_notifier: Sender<PoolTransactionEntry>,
96 proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
97 proposed_transaction_notifier: Sender<PoolTransactionEntry>,
98 reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
99 reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
100 network_alert_register: NotifyRegister<Alert>,
101 network_alert_notifier: Sender<Alert>,
102 handle: Handle,
103}
104
105pub struct NotifyService {
107 config: NotifyConfig,
108 new_block_subscribers: HashMap<String, Sender<BlockView>>,
109 new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
110 new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
111 proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
112 reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
113 network_alert_subscribers: HashMap<String, Sender<Alert>>,
114 timeout: NotifyTimeout,
115 handle: Handle,
116}
117
118impl NotifyService {
119 pub fn new(config: NotifyConfig, handle: Handle) -> Self {
121 let timeout = NotifyTimeout::new(&config);
122
123 Self {
124 config,
125 new_block_subscribers: HashMap::default(),
126 new_block_watchers: HashMap::default(),
127 new_transaction_subscribers: HashMap::default(),
128 proposed_transaction_subscribers: HashMap::default(),
129 reject_transaction_subscribers: HashMap::default(),
130 network_alert_subscribers: HashMap::default(),
131 timeout,
132 handle,
133 }
134 }
135
136 pub fn start(mut self) -> NotifyController {
138 let signal_receiver: CancellationToken = new_tokio_exit_rx();
139 let handle = self.handle.clone();
140
141 let (new_block_register, mut new_block_register_receiver) =
142 mpsc::channel(REGISTER_CHANNEL_SIZE);
143 let (new_block_watcher, mut new_block_watcher_receiver) =
144 mpsc::channel(REGISTER_CHANNEL_SIZE);
145 let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
146
147 let (new_transaction_register, mut new_transaction_register_receiver) =
148 mpsc::channel(REGISTER_CHANNEL_SIZE);
149 let (new_transaction_sender, mut new_transaction_receiver) =
150 mpsc::channel(NOTIFY_CHANNEL_SIZE);
151
152 let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
153 mpsc::channel(REGISTER_CHANNEL_SIZE);
154 let (proposed_transaction_sender, mut proposed_transaction_receiver) =
155 mpsc::channel(NOTIFY_CHANNEL_SIZE);
156
157 let (reject_transaction_register, mut reject_transaction_register_receiver) =
158 mpsc::channel(REGISTER_CHANNEL_SIZE);
159 let (reject_transaction_sender, mut reject_transaction_receiver) =
160 mpsc::channel(NOTIFY_CHANNEL_SIZE);
161
162 let (network_alert_register, mut network_alert_register_receiver) =
163 mpsc::channel(REGISTER_CHANNEL_SIZE);
164 let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
165
166 handle.spawn(async move {
167 loop {
168 tokio::select! {
169 Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
170 Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
171 Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
172 Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
173 Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
174 Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
175 Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
176 Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
177 Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
178 Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
179 Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
180 _ = signal_receiver.cancelled() => {
181 info!("NotifyService received exit signal, exit now");
182 break;
183 }
184 else => break,
185 }
186 }
187 });
188
189 NotifyController {
190 new_block_register,
191 new_block_watcher,
192 new_block_notifier: new_block_sender,
193 new_transaction_register,
194 new_transaction_notifier: new_transaction_sender,
195 proposed_transaction_register,
196 proposed_transaction_notifier: proposed_transaction_sender,
197 reject_transaction_register,
198 reject_transaction_notifier: reject_transaction_sender,
199 network_alert_register,
200 network_alert_notifier: network_alert_sender,
201 handle,
202 }
203 }
204
205 fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
206 let Request {
207 responder,
208 arguments: name,
209 } = msg;
210 debug!("Watch new_block {:?}", name);
211 let (sender, receiver) = watch::channel(Byte32::zero());
212 self.new_block_watchers.insert(name, sender);
213 let _ = responder.send(receiver);
214 }
215
216 fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
217 let Request {
218 responder,
219 arguments: name,
220 } = msg;
221 debug!("Register new_block {:?}", name);
222 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
223 self.new_block_subscribers.insert(name, sender);
224 let _ = responder.send(receiver);
225 }
226
227 fn handle_notify_new_block(&self, block: BlockView) {
228 trace!("New block event {:?}", block);
229 let block_hash = block.hash();
230 for subscriber in self.new_block_subscribers.values() {
232 let block = block.clone();
233 let subscriber = subscriber.clone();
234 self.handle.spawn(async move {
235 if let Err(e) = subscriber.send(block).await {
236 error!("Failed to notify new block, error: {}", e);
237 }
238 });
239 }
240
241 for watcher in self.new_block_watchers.values() {
243 if let Err(e) = watcher.send(block_hash.clone()) {
244 error!("Failed to notify new block watcher, error: {}", e);
245 }
246 }
247
248 if let Some(script) = self.config.new_block_notify_script.clone() {
250 let script_timeout = self.timeout.script;
251 self.handle.spawn(async move {
252 let args = [format!("{block_hash:#x}")];
253 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
254 Ok(ret) => match ret {
255 Ok(status) => debug!("The new_block_notify script exited with: {status}"),
256 Err(e) => error!(
257 "Failed to run new_block_notify_script: {} {:?}, error: {}",
258 script, args[0], e
259 ),
260 },
261 Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
262 }
263 });
264 }
265 }
266
267 fn handle_register_new_transaction(
268 &mut self,
269 msg: Request<String, Receiver<PoolTransactionEntry>>,
270 ) {
271 let Request {
272 responder,
273 arguments: name,
274 } = msg;
275 debug!("Register new_transaction {:?}", name);
276 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
277 self.new_transaction_subscribers.insert(name, sender);
278 let _ = responder.send(receiver);
279 }
280
281 fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
282 trace!("New tx event {:?}", tx_entry);
283 let tx_timeout = self.timeout.tx;
285 for subscriber in self.new_transaction_subscribers.values() {
287 let tx_entry = tx_entry.clone();
288 let subscriber = subscriber.clone();
289 self.handle.spawn(async move {
290 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
291 error!("Failed to notify new transaction, error: {}", e);
292 }
293 });
294 }
295 }
296
297 fn handle_register_proposed_transaction(
298 &mut self,
299 msg: Request<String, Receiver<PoolTransactionEntry>>,
300 ) {
301 let Request {
302 responder,
303 arguments: name,
304 } = msg;
305 debug!("Register proposed_transaction {:?}", name);
306 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
307 self.proposed_transaction_subscribers.insert(name, sender);
308 let _ = responder.send(receiver);
309 }
310
311 fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
312 trace!("Proposed tx event {:?}", tx_entry);
313 let tx_timeout = self.timeout.tx;
315 for subscriber in self.proposed_transaction_subscribers.values() {
317 let tx_entry = tx_entry.clone();
318 let subscriber = subscriber.clone();
319 self.handle.spawn(async move {
320 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
321 error!("Failed to notify proposed transaction, error {}", e);
322 }
323 });
324 }
325 }
326
327 fn handle_register_reject_transaction(
328 &mut self,
329 msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
330 ) {
331 let Request {
332 responder,
333 arguments: name,
334 } = msg;
335 debug!("Register reject_transaction {:?}", name);
336 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
337 self.reject_transaction_subscribers.insert(name, sender);
338 let _ = responder.send(receiver);
339 }
340
341 fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
342 trace!("Tx reject event {:?}", tx_entry);
343 let tx_timeout = self.timeout.tx;
345 for subscriber in self.reject_transaction_subscribers.values() {
347 let tx_entry = tx_entry.clone();
348 let subscriber = subscriber.clone();
349 self.handle.spawn(async move {
350 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
351 error!("Failed to notify transaction reject, error: {}", e);
352 }
353 });
354 }
355 }
356
357 fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
358 let Request {
359 responder,
360 arguments: name,
361 } = msg;
362 debug!("Register network_alert {:?}", name);
363 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
364 self.network_alert_subscribers.insert(name, sender);
365 let _ = responder.send(receiver);
366 }
367
368 fn handle_notify_network_alert(&self, alert: Alert) {
369 trace!("Network alert event {:?}", alert);
370 let alert_timeout = self.timeout.alert;
371 let message = alert
372 .as_reader()
373 .raw()
374 .message()
375 .as_utf8()
376 .expect("alert message should be utf8")
377 .to_owned();
378 for subscriber in self.network_alert_subscribers.values() {
380 let subscriber = subscriber.clone();
381 let alert = alert.clone();
382 self.handle.spawn(async move {
383 if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
384 error!("Failed to notify network_alert, error: {}", e);
385 }
386 });
387 }
388
389 if let Some(script) = self.config.network_alert_notify_script.clone() {
391 let script_timeout = self.timeout.script;
392 self.handle.spawn(async move {
393 let args = [message];
394 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
395 Ok(ret) => match ret {
396 Ok(status) => {
397 debug!("the network_alert_notify script exited with: {}", status)
398 }
399 Err(e) => error!(
400 "failed to run network_alert_notify_script: {} {}, error: {}",
401 script, args[0], e
402 ),
403 },
404 Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
405 }
406 });
407 }
408 }
409}
410
411impl NotifyController {
412 pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
414 Request::call(&self.new_block_register, name.to_string())
415 .await
416 .expect("Subscribe new block should be OK")
417 }
418
419 pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
421 Request::call(&self.new_block_watcher, name.to_string())
422 .await
423 .expect("Watch new block should be OK")
424 }
425
426 pub fn notify_new_block(&self, block: BlockView) {
428 let new_block_notifier = self.new_block_notifier.clone();
429 self.handle.spawn(async move {
430 if let Err(e) = new_block_notifier.send(block).await {
431 error!("notify_new_block channel is closed: {}", e);
432 }
433 });
434 }
435
436 pub async fn subscribe_new_transaction<S: ToString>(
438 &self,
439 name: S,
440 ) -> Receiver<PoolTransactionEntry> {
441 Request::call(&self.new_transaction_register, name.to_string())
442 .await
443 .expect("Subscribe new transaction should be OK")
444 }
445
446 pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
448 let new_transaction_notifier = self.new_transaction_notifier.clone();
449 self.handle.spawn(async move {
450 if let Err(e) = new_transaction_notifier.send(tx_entry).await {
451 error!("notify_new_transaction channel is closed: {}", e);
452 }
453 });
454 }
455
456 pub async fn subscribe_proposed_transaction<S: ToString>(
458 &self,
459 name: S,
460 ) -> Receiver<PoolTransactionEntry> {
461 Request::call(&self.proposed_transaction_register, name.to_string())
462 .await
463 .expect("Subscribe proposed transaction should be OK")
464 }
465
466 pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
468 let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
469 self.handle.spawn(async move {
470 if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
471 error!("notify_proposed_transaction channel is closed: {}", e);
472 }
473 });
474 }
475
476 pub async fn subscribe_reject_transaction<S: ToString>(
478 &self,
479 name: S,
480 ) -> Receiver<(PoolTransactionEntry, Reject)> {
481 Request::call(&self.reject_transaction_register, name.to_string())
482 .await
483 .expect("Subscribe rejected transaction should be OK")
484 }
485
486 pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
488 let reject_transaction_notifier = self.reject_transaction_notifier.clone();
489 self.handle.spawn(async move {
490 if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
491 error!("notify_reject_transaction channel is closed: {}", e);
492 }
493 });
494 }
495
496 pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
498 Request::call(&self.network_alert_register, name.to_string())
499 .await
500 .expect("Subscribe network alert should be OK")
501 }
502
503 pub fn notify_network_alert(&self, alert: Alert) {
505 let network_alert_notifier = self.network_alert_notifier.clone();
506 self.handle.spawn(async move {
507 if let Err(e) = network_alert_notifier.send(alert).await {
508 error!("notify_network_alert channel is closed: {}", e);
509 }
510 });
511 }
512}