otx_pool/
notify.rs

1use ckb_async_runtime::Handle;
2use ckb_stop_handler::{SignalSender, StopHandler};
3use ckb_types::H256;
4use otx_format::jsonrpc_types::OpenTransaction;
5use tokio::sync::{
6    mpsc::{self, Receiver, Sender},
7    oneshot,
8};
9
10use std::collections::HashMap;
11
12pub type RuntimeHandle = Handle;
13
14/// Asynchronous request sent to the service.
15pub struct Request<A, R> {
16    /// Oneshot channel for the service to send back the response.
17    pub responder: oneshot::Sender<R>,
18    /// Request arguments.
19    pub arguments: A,
20}
21
22impl<A, R> Request<A, R> {
23    /// Call the service with the arguments and wait for the response.
24    pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
25        let (responder, response) = oneshot::channel();
26        let _ = sender
27            .send(Request {
28                responder,
29                arguments,
30            })
31            .await;
32        response.await.ok()
33    }
34}
35
36pub const SIGNAL_CHANNEL_SIZE: usize = 1;
37pub const REGISTER_CHANNEL_SIZE: usize = 2;
38pub const NOTIFY_CHANNEL_SIZE: usize = 128;
39
40pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
41
42#[derive(Clone)]
43pub struct NotifyController {
44    stop: StopHandler<()>,
45    new_open_tx_register: NotifyRegister<OpenTransaction>,
46    new_open_tx_notifier: Sender<OpenTransaction>,
47    commit_open_tx_register: NotifyRegister<Vec<H256>>,
48    commit_open_tx_notifier: Sender<Vec<H256>>,
49    interval_register: NotifyRegister<u64>,
50    interval_notifier: Sender<u64>,
51    start_register: NotifyRegister<()>,
52    start_notifier: Sender<()>,
53    stop_register: NotifyRegister<()>,
54    stop_notifier: Sender<()>,
55    handle: Handle,
56}
57
58impl Drop for NotifyController {
59    fn drop(&mut self) {
60        self.stop.try_send(());
61    }
62}
63
64pub struct NotifyService {
65    new_open_tx_subscribers: HashMap<String, Sender<OpenTransaction>>,
66    commit_open_tx_subscribers: HashMap<String, Sender<Vec<H256>>>,
67    interval_subscribers: HashMap<String, Sender<u64>>,
68    start_subscribers: HashMap<String, Sender<()>>,
69    stop_subscribers: HashMap<String, Sender<()>>,
70}
71
72impl Default for NotifyService {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl NotifyService {
79    pub fn new() -> Self {
80        Self {
81            new_open_tx_subscribers: HashMap::default(),
82            commit_open_tx_subscribers: HashMap::default(),
83            interval_subscribers: HashMap::default(),
84            start_subscribers: HashMap::default(),
85            stop_subscribers: HashMap::default(),
86        }
87    }
88
89    /// start background tokio spawned task.
90    pub fn start(mut self, handle: Handle) -> NotifyController {
91        let (signal_sender, mut signal_receiver) = oneshot::channel();
92
93        let (new_open_tx_register, mut new_open_tx_register_receiver) =
94            mpsc::channel(REGISTER_CHANNEL_SIZE);
95        let (new_open_tx_sender, mut new_open_tx_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
96
97        let (commit_open_tx_register, mut commit_open_tx_register_receiver) =
98            mpsc::channel(REGISTER_CHANNEL_SIZE);
99        let (commit_open_tx_sender, mut commit_open_tx_receiver) =
100            mpsc::channel(NOTIFY_CHANNEL_SIZE);
101
102        let (interval_register, mut interval_register_receiver) =
103            mpsc::channel(REGISTER_CHANNEL_SIZE);
104        let (interval_sender, mut interval_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
105
106        let (start_register, mut start_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
107        let (start_sender, mut start_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
108
109        let (stop_register, mut stop_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
110        let (stop_sender, mut stop_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
111
112        handle.spawn(async move {
113            loop {
114                tokio::select! {
115                    _ = &mut signal_receiver => {
116                        break;
117                    }
118                    Some(msg) = new_open_tx_register_receiver.recv() => { self.handle_register_new_open_tx(msg) },
119                    Some(msg) = new_open_tx_receiver.recv() => { self.handle_notify_new_open_tx(msg).await },
120                    Some(msg) = commit_open_tx_register_receiver.recv() => { self.handle_register_commit_open_tx(msg) },
121                    Some(msg) = commit_open_tx_receiver.recv() => { self.handle_notify_commit_open_tx(msg).await },
122                    Some(msg) = interval_register_receiver.recv() => { self.handle_register_interval(msg) },
123                    Some(msg) = interval_receiver.recv() => { self.handle_notify_interval(msg).await },
124                    Some(msg) = start_register_receiver.recv() => { self.handle_register_start(msg) },
125                    Some(()) = start_receiver.recv() => { self.handle_notify_start().await },
126                    Some(msg) = stop_register_receiver.recv() => { self.handle_register_stop(msg) },
127                    Some(()) = stop_receiver.recv() => { self.handle_notify_stop().await },
128                    else => break,
129                }
130            }
131        });
132
133        NotifyController {
134            new_open_tx_register,
135            new_open_tx_notifier: new_open_tx_sender,
136            commit_open_tx_register,
137            commit_open_tx_notifier: commit_open_tx_sender,
138            interval_register,
139            interval_notifier: interval_sender,
140            start_register,
141            start_notifier: start_sender,
142            stop_register,
143            stop_notifier: stop_sender,
144            stop: StopHandler::new(
145                SignalSender::Tokio(signal_sender),
146                None,
147                "notify".to_string(),
148            ),
149            handle,
150        }
151    }
152
153    fn handle_register_new_open_tx(&mut self, msg: Request<String, Receiver<OpenTransaction>>) {
154        let Request {
155            responder,
156            arguments: name,
157        } = msg;
158        log::debug!("Register new_open_tx {:?}", name);
159        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
160        self.new_open_tx_subscribers.insert(name, sender);
161        let _ = responder.send(receiver);
162    }
163
164    async fn handle_notify_new_open_tx(&mut self, otx: OpenTransaction) {
165        log::trace!("event new open tx {:?}", otx);
166        // notify all subscribers
167        for subscriber in self.new_open_tx_subscribers.values() {
168            let _ = subscriber.send(otx.clone()).await;
169        }
170    }
171
172    fn handle_register_commit_open_tx(&mut self, msg: Request<String, Receiver<Vec<H256>>>) {
173        let Request {
174            responder,
175            arguments: name,
176        } = msg;
177        log::debug!("Register commit_open_tx {:?}", name);
178        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
179        self.commit_open_tx_subscribers.insert(name, sender);
180        let _ = responder.send(receiver);
181    }
182
183    async fn handle_notify_commit_open_tx(&mut self, otx_hashes: Vec<H256>) {
184        log::trace!("event commit open tx {:?}", otx_hashes);
185        // notify all subscribers
186        for subscriber in self.commit_open_tx_subscribers.values() {
187            let _ = subscriber.send(otx_hashes.clone()).await;
188        }
189    }
190
191    fn handle_register_interval(&mut self, msg: Request<String, Receiver<u64>>) {
192        let Request {
193            responder,
194            arguments: name,
195        } = msg;
196        log::debug!("Register interval event plugin: {:?}", name);
197        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
198        self.interval_subscribers.insert(name, sender);
199        let _ = responder.send(receiver);
200    }
201
202    async fn handle_notify_interval(&mut self, elapsed_secs: u64) {
203        log::trace!("event interval");
204        // notify all subscribers
205        for subscriber in self.interval_subscribers.values() {
206            let _ = subscriber.send(elapsed_secs).await;
207        }
208    }
209
210    fn handle_register_start(&mut self, msg: Request<String, Receiver<()>>) {
211        let Request {
212            responder,
213            arguments: name,
214        } = msg;
215        log::debug!("Register start {:?}", name);
216        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
217        self.start_subscribers.insert(name, sender);
218        let _ = responder.send(receiver);
219    }
220
221    async fn handle_notify_start(&mut self) {
222        log::trace!("event start");
223        // notify all subscribers
224        for subscriber in self.start_subscribers.values() {
225            let _ = subscriber.send(()).await;
226        }
227    }
228
229    fn handle_register_stop(&mut self, msg: Request<String, Receiver<()>>) {
230        let Request {
231            responder,
232            arguments: name,
233        } = msg;
234        log::debug!("Register stop {:?}", name);
235        let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
236        self.stop_subscribers.insert(name, sender);
237        let _ = responder.send(receiver);
238    }
239
240    async fn handle_notify_stop(&mut self) {
241        log::trace!("event stop");
242        // notify all subscribers
243        for subscriber in self.stop_subscribers.values() {
244            let _ = subscriber.send(()).await;
245        }
246    }
247}
248
249impl NotifyController {
250    pub async fn subscribe_new_open_tx<S: ToString>(&self, name: S) -> Receiver<OpenTransaction> {
251        Request::call(&self.new_open_tx_register, name.to_string())
252            .await
253            .expect("Subscribe new open tx should be OK")
254    }
255
256    pub fn notify_new_open_tx(&self, otx: OpenTransaction) {
257        let new_open_tx_notifier = self.new_open_tx_notifier.clone();
258        self.handle.spawn(async move {
259            let _ = new_open_tx_notifier.send(otx).await;
260        });
261    }
262
263    pub async fn subscribe_commit_open_tx<S: ToString>(&self, name: S) -> Receiver<Vec<H256>> {
264        Request::call(&self.commit_open_tx_register, name.to_string())
265            .await
266            .expect("Subscribe commit open tx should be OK")
267    }
268
269    pub fn notify_commit_open_tx(&self, otx_hashes: Vec<H256>) {
270        let commit_open_tx_notifier = self.commit_open_tx_notifier.clone();
271        self.handle.spawn(async move {
272            let _ = commit_open_tx_notifier.send(otx_hashes).await;
273        });
274    }
275
276    pub async fn subscribe_interval<S: ToString>(&self, name: S) -> Receiver<u64> {
277        Request::call(&self.interval_register, name.to_string())
278            .await
279            .expect("Subscribe interval should be OK")
280    }
281
282    pub fn notify_interval(&self, elapsed_secs: u64) {
283        let interval_notifier = self.interval_notifier.clone();
284        self.handle.spawn(async move {
285            let _ = interval_notifier.send(elapsed_secs).await;
286        });
287    }
288
289    pub async fn subscribe_start<S: ToString>(&self, name: S) -> Receiver<()> {
290        Request::call(&self.start_register, name.to_string())
291            .await
292            .expect("Subscribe start should be OK")
293    }
294
295    pub fn notify_start(&self) {
296        let start_notifier = self.start_notifier.clone();
297        self.handle.spawn(async move {
298            let _ = start_notifier.send(()).await;
299        });
300    }
301
302    pub async fn subscribe_stop<S: ToString>(&self, name: S) -> Receiver<()> {
303        Request::call(&self.stop_register, name.to_string())
304            .await
305            .expect("Subscribe stop should be OK")
306    }
307
308    pub fn notify_stop(&self) {
309        let stop_notifier = self.stop_notifier.clone();
310        self.handle.spawn(async move {
311            let _ = stop_notifier.send(()).await;
312        });
313    }
314}