1use ckb_app_config::NotifyConfig;
7use ckb_async_runtime::Handle;
8use ckb_logger::{debug, error, info, trace};
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::packed::Byte32;
11use ckb_types::{
12 core::{BlockView, tx_pool::Reject},
13 packed::Alert,
14};
15use std::{collections::HashMap, time::Duration};
16use tokio::process::Command;
17use tokio::sync::watch;
18use tokio::sync::{
19 mpsc::{self, Receiver, Sender},
20 oneshot,
21};
22use tokio::time::timeout;
23
24pub use ckb_types::core::tx_pool::PoolTransactionEntry;
25
26pub struct Request<A, R> {
28 pub responder: oneshot::Sender<R>,
30 pub arguments: A,
32}
33
34impl<A, R> Request<A, R> {
35 pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
37 let (responder, response) = oneshot::channel();
38 let _ = sender
39 .send(Request {
40 responder,
41 arguments,
42 })
43 .await;
44 response.await.ok()
45 }
46}
47
48pub const SIGNAL_CHANNEL_SIZE: usize = 1;
50pub const REGISTER_CHANNEL_SIZE: usize = 2;
52pub const NOTIFY_CHANNEL_SIZE: usize = 128;
54
55pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
57
58pub type NotifyWatcher<M> = Sender<Request<String, watch::Receiver<M>>>;
60
61#[derive(Copy, Clone)]
63pub(crate) struct NotifyTimeout {
64 pub(crate) tx: Duration,
65 pub(crate) alert: Duration,
66 pub(crate) script: Duration,
67}
68
69const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300);
70const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000);
71const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000);
72
73impl NotifyTimeout {
74 pub(crate) fn new(config: &NotifyConfig) -> Self {
75 NotifyTimeout {
76 tx: config
77 .notify_tx_timeout
78 .map(Duration::from_millis)
79 .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT),
80 alert: config
81 .notify_alert_timeout
82 .map(Duration::from_millis)
83 .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT),
84 script: config
85 .script_timeout
86 .map(Duration::from_millis)
87 .unwrap_or(DEFAULT_SCRIPT_TIMEOUT),
88 }
89 }
90}
91
92#[derive(Clone)]
97pub struct NotifyController {
98 new_block_register: NotifyRegister<BlockView>,
99 new_block_watcher: NotifyWatcher<Byte32>,
100 new_block_notifier: Sender<BlockView>,
101 new_transaction_register: NotifyRegister<PoolTransactionEntry>,
102 new_transaction_notifier: Sender<PoolTransactionEntry>,
103 proposed_transaction_register: NotifyRegister<PoolTransactionEntry>,
104 proposed_transaction_notifier: Sender<PoolTransactionEntry>,
105 reject_transaction_register: NotifyRegister<(PoolTransactionEntry, Reject)>,
106 reject_transaction_notifier: Sender<(PoolTransactionEntry, Reject)>,
107 network_alert_register: NotifyRegister<Alert>,
108 network_alert_notifier: Sender<Alert>,
109 handle: Handle,
110}
111
112pub struct NotifyService {
116 config: NotifyConfig,
117 new_block_subscribers: HashMap<String, Sender<BlockView>>,
118 new_block_watchers: HashMap<String, watch::Sender<Byte32>>,
119 new_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
120 proposed_transaction_subscribers: HashMap<String, Sender<PoolTransactionEntry>>,
121 reject_transaction_subscribers: HashMap<String, Sender<(PoolTransactionEntry, Reject)>>,
122 network_alert_subscribers: HashMap<String, Sender<Alert>>,
123 timeout: NotifyTimeout,
124 handle: Handle,
125}
126
127impl NotifyService {
128 pub fn new(config: NotifyConfig, handle: Handle) -> Self {
130 let timeout = NotifyTimeout::new(&config);
131
132 Self {
133 config,
134 new_block_subscribers: HashMap::default(),
135 new_block_watchers: HashMap::default(),
136 new_transaction_subscribers: HashMap::default(),
137 proposed_transaction_subscribers: HashMap::default(),
138 reject_transaction_subscribers: HashMap::default(),
139 network_alert_subscribers: HashMap::default(),
140 timeout,
141 handle,
142 }
143 }
144
145 pub fn start(mut self) -> NotifyController {
147 let signal_receiver: CancellationToken = new_tokio_exit_rx();
148 let handle = self.handle.clone();
149
150 let (new_block_register, mut new_block_register_receiver) =
151 mpsc::channel(REGISTER_CHANNEL_SIZE);
152 let (new_block_watcher, mut new_block_watcher_receiver) =
153 mpsc::channel(REGISTER_CHANNEL_SIZE);
154 let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
155
156 let (new_transaction_register, mut new_transaction_register_receiver) =
157 mpsc::channel(REGISTER_CHANNEL_SIZE);
158 let (new_transaction_sender, mut new_transaction_receiver) =
159 mpsc::channel(NOTIFY_CHANNEL_SIZE);
160
161 let (proposed_transaction_register, mut proposed_transaction_register_receiver) =
162 mpsc::channel(REGISTER_CHANNEL_SIZE);
163 let (proposed_transaction_sender, mut proposed_transaction_receiver) =
164 mpsc::channel(NOTIFY_CHANNEL_SIZE);
165
166 let (reject_transaction_register, mut reject_transaction_register_receiver) =
167 mpsc::channel(REGISTER_CHANNEL_SIZE);
168 let (reject_transaction_sender, mut reject_transaction_receiver) =
169 mpsc::channel(NOTIFY_CHANNEL_SIZE);
170
171 let (network_alert_register, mut network_alert_register_receiver) =
172 mpsc::channel(REGISTER_CHANNEL_SIZE);
173 let (network_alert_sender, mut network_alert_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
174
175 handle.spawn(async move {
176 loop {
177 tokio::select! {
178 Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) },
179 Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) },
180 Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) },
181 Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) },
182 Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) },
183 Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) },
184 Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) },
185 Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) },
186 Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) },
187 Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) },
188 Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) },
189 _ = signal_receiver.cancelled() => {
190 info!("NotifyService received exit signal, exit now");
191 break;
192 }
193 else => break,
194 }
195 }
196 });
197
198 NotifyController {
199 new_block_register,
200 new_block_watcher,
201 new_block_notifier: new_block_sender,
202 new_transaction_register,
203 new_transaction_notifier: new_transaction_sender,
204 proposed_transaction_register,
205 proposed_transaction_notifier: proposed_transaction_sender,
206 reject_transaction_register,
207 reject_transaction_notifier: reject_transaction_sender,
208 network_alert_register,
209 network_alert_notifier: network_alert_sender,
210 handle,
211 }
212 }
213
214 fn handle_watch_new_block(&mut self, msg: Request<String, watch::Receiver<Byte32>>) {
215 let Request {
216 responder,
217 arguments: name,
218 } = msg;
219 debug!("Watch new_block {:?}", name);
220 let (sender, receiver) = watch::channel(Byte32::zero());
221 self.new_block_watchers.insert(name, sender);
222 let _ = responder.send(receiver);
223 }
224
225 fn handle_register_new_block(&mut self, msg: Request<String, Receiver<BlockView>>) {
226 let Request {
227 responder,
228 arguments: name,
229 } = msg;
230 debug!("Register new_block {:?}", name);
231 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
232 self.new_block_subscribers.insert(name, sender);
233 let _ = responder.send(receiver);
234 }
235
236 fn handle_notify_new_block(&self, block: BlockView) {
237 trace!("New block event {:?}", block);
238 let block_hash = block.hash();
239 for subscriber in self.new_block_subscribers.values() {
241 let block = block.clone();
242 let subscriber = subscriber.clone();
243 self.handle.spawn(async move {
244 if let Err(e) = subscriber.send(block).await {
245 error!("Failed to notify new block, error: {}", e);
246 }
247 });
248 }
249
250 for watcher in self.new_block_watchers.values() {
252 if let Err(e) = watcher.send(block_hash.clone()) {
253 error!("Failed to notify new block watcher, error: {}", e);
254 }
255 }
256
257 if let Some(script) = self.config.new_block_notify_script.clone() {
259 let script_timeout = self.timeout.script;
260 self.handle.spawn(async move {
261 let args = [format!("{block_hash:#x}")];
262 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
263 Ok(ret) => match ret {
264 Ok(status) => debug!("The new_block_notify script exited with: {status}"),
265 Err(e) => error!(
266 "Failed to run new_block_notify_script: {} {:?}, error: {}",
267 script, args[0], e
268 ),
269 },
270 Err(_) => ckb_logger::warn!("new_block_notify_script {script} timed out"),
271 }
272 });
273 }
274 }
275
276 fn handle_register_new_transaction(
277 &mut self,
278 msg: Request<String, Receiver<PoolTransactionEntry>>,
279 ) {
280 let Request {
281 responder,
282 arguments: name,
283 } = msg;
284 debug!("Register new_transaction {:?}", name);
285 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
286 self.new_transaction_subscribers.insert(name, sender);
287 let _ = responder.send(receiver);
288 }
289
290 fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
291 trace!("New tx event {:?}", tx_entry);
292 let tx_timeout = self.timeout.tx;
294 for subscriber in self.new_transaction_subscribers.values() {
296 let tx_entry = tx_entry.clone();
297 let subscriber = subscriber.clone();
298 self.handle.spawn(async move {
299 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
300 error!("Failed to notify new transaction, error: {}", e);
301 }
302 });
303 }
304 }
305
306 fn handle_register_proposed_transaction(
307 &mut self,
308 msg: Request<String, Receiver<PoolTransactionEntry>>,
309 ) {
310 let Request {
311 responder,
312 arguments: name,
313 } = msg;
314 debug!("Register proposed_transaction {:?}", name);
315 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
316 self.proposed_transaction_subscribers.insert(name, sender);
317 let _ = responder.send(receiver);
318 }
319
320 fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
321 trace!("Proposed tx event {:?}", tx_entry);
322 let tx_timeout = self.timeout.tx;
324 for subscriber in self.proposed_transaction_subscribers.values() {
326 let tx_entry = tx_entry.clone();
327 let subscriber = subscriber.clone();
328 self.handle.spawn(async move {
329 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
330 error!("Failed to notify proposed transaction, error {}", e);
331 }
332 });
333 }
334 }
335
336 fn handle_register_reject_transaction(
337 &mut self,
338 msg: Request<String, Receiver<(PoolTransactionEntry, Reject)>>,
339 ) {
340 let Request {
341 responder,
342 arguments: name,
343 } = msg;
344 debug!("Register reject_transaction {:?}", name);
345 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
346 self.reject_transaction_subscribers.insert(name, sender);
347 let _ = responder.send(receiver);
348 }
349
350 fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) {
351 trace!("Tx reject event {:?}", tx_entry);
352 let tx_timeout = self.timeout.tx;
354 for subscriber in self.reject_transaction_subscribers.values() {
356 let tx_entry = tx_entry.clone();
357 let subscriber = subscriber.clone();
358 self.handle.spawn(async move {
359 if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await {
360 error!("Failed to notify transaction reject, error: {}", e);
361 }
362 });
363 }
364 }
365
366 fn handle_register_network_alert(&mut self, msg: Request<String, Receiver<Alert>>) {
367 let Request {
368 responder,
369 arguments: name,
370 } = msg;
371 debug!("Register network_alert {:?}", name);
372 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
373 self.network_alert_subscribers.insert(name, sender);
374 let _ = responder.send(receiver);
375 }
376
377 fn handle_notify_network_alert(&self, alert: Alert) {
378 trace!("Network alert event {:?}", alert);
379 let alert_timeout = self.timeout.alert;
380 let message = alert
381 .as_reader()
382 .raw()
383 .message()
384 .as_utf8()
385 .expect("alert message should be utf8")
386 .to_owned();
387 for subscriber in self.network_alert_subscribers.values() {
389 let subscriber = subscriber.clone();
390 let alert = alert.clone();
391 self.handle.spawn(async move {
392 if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await {
393 error!("Failed to notify network_alert, error: {}", e);
394 }
395 });
396 }
397
398 if let Some(script) = self.config.network_alert_notify_script.clone() {
400 let script_timeout = self.timeout.script;
401 self.handle.spawn(async move {
402 let args = [message];
403 match timeout(script_timeout, Command::new(&script).args(&args).status()).await {
404 Ok(ret) => match ret {
405 Ok(status) => {
406 debug!("the network_alert_notify script exited with: {}", status)
407 }
408 Err(e) => error!(
409 "failed to run network_alert_notify_script: {} {}, error: {}",
410 script, args[0], e
411 ),
412 },
413 Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script),
414 }
415 });
416 }
417 }
418}
419
420impl NotifyController {
421 pub async fn subscribe_new_block<S: ToString>(&self, name: S) -> Receiver<BlockView> {
425 Request::call(&self.new_block_register, name.to_string())
426 .await
427 .expect("Subscribe new block should be OK")
428 }
429
430 pub async fn watch_new_block<S: ToString>(&self, name: S) -> watch::Receiver<Byte32> {
432 Request::call(&self.new_block_watcher, name.to_string())
433 .await
434 .expect("Watch new block should be OK")
435 }
436
437 pub fn notify_new_block(&self, block: BlockView) {
439 let new_block_notifier = self.new_block_notifier.clone();
440 self.handle.spawn(async move {
441 if let Err(e) = new_block_notifier.send(block).await {
442 error!("notify_new_block channel is closed: {}", e);
443 }
444 });
445 }
446
447 pub async fn subscribe_new_transaction<S: ToString>(
451 &self,
452 name: S,
453 ) -> Receiver<PoolTransactionEntry> {
454 Request::call(&self.new_transaction_register, name.to_string())
455 .await
456 .expect("Subscribe new transaction should be OK")
457 }
458
459 pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) {
461 let new_transaction_notifier = self.new_transaction_notifier.clone();
462 self.handle.spawn(async move {
463 if let Err(e) = new_transaction_notifier.send(tx_entry).await {
464 error!("notify_new_transaction channel is closed: {}", e);
465 }
466 });
467 }
468
469 pub async fn subscribe_proposed_transaction<S: ToString>(
473 &self,
474 name: S,
475 ) -> Receiver<PoolTransactionEntry> {
476 Request::call(&self.proposed_transaction_register, name.to_string())
477 .await
478 .expect("Subscribe proposed transaction should be OK")
479 }
480
481 pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) {
483 let proposed_transaction_notifier = self.proposed_transaction_notifier.clone();
484 self.handle.spawn(async move {
485 if let Err(e) = proposed_transaction_notifier.send(tx_entry).await {
486 error!("notify_proposed_transaction channel is closed: {}", e);
487 }
488 });
489 }
490
491 pub async fn subscribe_reject_transaction<S: ToString>(
495 &self,
496 name: S,
497 ) -> Receiver<(PoolTransactionEntry, Reject)> {
498 Request::call(&self.reject_transaction_register, name.to_string())
499 .await
500 .expect("Subscribe rejected transaction should be OK")
501 }
502
503 pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) {
505 let reject_transaction_notifier = self.reject_transaction_notifier.clone();
506 self.handle.spawn(async move {
507 if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await {
508 error!("notify_reject_transaction channel is closed: {}", e);
509 }
510 });
511 }
512
513 pub async fn subscribe_network_alert<S: ToString>(&self, name: S) -> Receiver<Alert> {
517 Request::call(&self.network_alert_register, name.to_string())
518 .await
519 .expect("Subscribe network alert should be OK")
520 }
521
522 pub fn notify_network_alert(&self, alert: Alert) {
524 let network_alert_notifier = self.network_alert_notifier.clone();
525 self.handle.spawn(async move {
526 if let Err(e) = network_alert_notifier.send(alert).await {
527 error!("notify_network_alert channel is closed: {}", e);
528 }
529 });
530 }
531}