1use ckb_async_runtime::Handle;
2use ckb_stop_handler::{SignalSender, StopHandler};
3use ckb_types::H256;
4use otx_format::jsonrpc_types::OpenTransaction;
5use tokio::sync::{
6 mpsc::{self, Receiver, Sender},
7 oneshot,
8};
9
10use std::collections::HashMap;
11
12pub type RuntimeHandle = Handle;
13
14pub struct Request<A, R> {
16 pub responder: oneshot::Sender<R>,
18 pub arguments: A,
20}
21
22impl<A, R> Request<A, R> {
23 pub async fn call(sender: &Sender<Request<A, R>>, arguments: A) -> Option<R> {
25 let (responder, response) = oneshot::channel();
26 let _ = sender
27 .send(Request {
28 responder,
29 arguments,
30 })
31 .await;
32 response.await.ok()
33 }
34}
35
36pub const SIGNAL_CHANNEL_SIZE: usize = 1;
37pub const REGISTER_CHANNEL_SIZE: usize = 2;
38pub const NOTIFY_CHANNEL_SIZE: usize = 128;
39
40pub type NotifyRegister<M> = Sender<Request<String, Receiver<M>>>;
41
42#[derive(Clone)]
43pub struct NotifyController {
44 stop: StopHandler<()>,
45 new_open_tx_register: NotifyRegister<OpenTransaction>,
46 new_open_tx_notifier: Sender<OpenTransaction>,
47 commit_open_tx_register: NotifyRegister<Vec<H256>>,
48 commit_open_tx_notifier: Sender<Vec<H256>>,
49 interval_register: NotifyRegister<u64>,
50 interval_notifier: Sender<u64>,
51 start_register: NotifyRegister<()>,
52 start_notifier: Sender<()>,
53 stop_register: NotifyRegister<()>,
54 stop_notifier: Sender<()>,
55 handle: Handle,
56}
57
58impl Drop for NotifyController {
59 fn drop(&mut self) {
60 self.stop.try_send(());
61 }
62}
63
64pub struct NotifyService {
65 new_open_tx_subscribers: HashMap<String, Sender<OpenTransaction>>,
66 commit_open_tx_subscribers: HashMap<String, Sender<Vec<H256>>>,
67 interval_subscribers: HashMap<String, Sender<u64>>,
68 start_subscribers: HashMap<String, Sender<()>>,
69 stop_subscribers: HashMap<String, Sender<()>>,
70}
71
72impl Default for NotifyService {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl NotifyService {
79 pub fn new() -> Self {
80 Self {
81 new_open_tx_subscribers: HashMap::default(),
82 commit_open_tx_subscribers: HashMap::default(),
83 interval_subscribers: HashMap::default(),
84 start_subscribers: HashMap::default(),
85 stop_subscribers: HashMap::default(),
86 }
87 }
88
89 pub fn start(mut self, handle: Handle) -> NotifyController {
91 let (signal_sender, mut signal_receiver) = oneshot::channel();
92
93 let (new_open_tx_register, mut new_open_tx_register_receiver) =
94 mpsc::channel(REGISTER_CHANNEL_SIZE);
95 let (new_open_tx_sender, mut new_open_tx_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
96
97 let (commit_open_tx_register, mut commit_open_tx_register_receiver) =
98 mpsc::channel(REGISTER_CHANNEL_SIZE);
99 let (commit_open_tx_sender, mut commit_open_tx_receiver) =
100 mpsc::channel(NOTIFY_CHANNEL_SIZE);
101
102 let (interval_register, mut interval_register_receiver) =
103 mpsc::channel(REGISTER_CHANNEL_SIZE);
104 let (interval_sender, mut interval_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
105
106 let (start_register, mut start_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
107 let (start_sender, mut start_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
108
109 let (stop_register, mut stop_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE);
110 let (stop_sender, mut stop_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
111
112 handle.spawn(async move {
113 loop {
114 tokio::select! {
115 _ = &mut signal_receiver => {
116 break;
117 }
118 Some(msg) = new_open_tx_register_receiver.recv() => { self.handle_register_new_open_tx(msg) },
119 Some(msg) = new_open_tx_receiver.recv() => { self.handle_notify_new_open_tx(msg).await },
120 Some(msg) = commit_open_tx_register_receiver.recv() => { self.handle_register_commit_open_tx(msg) },
121 Some(msg) = commit_open_tx_receiver.recv() => { self.handle_notify_commit_open_tx(msg).await },
122 Some(msg) = interval_register_receiver.recv() => { self.handle_register_interval(msg) },
123 Some(msg) = interval_receiver.recv() => { self.handle_notify_interval(msg).await },
124 Some(msg) = start_register_receiver.recv() => { self.handle_register_start(msg) },
125 Some(()) = start_receiver.recv() => { self.handle_notify_start().await },
126 Some(msg) = stop_register_receiver.recv() => { self.handle_register_stop(msg) },
127 Some(()) = stop_receiver.recv() => { self.handle_notify_stop().await },
128 else => break,
129 }
130 }
131 });
132
133 NotifyController {
134 new_open_tx_register,
135 new_open_tx_notifier: new_open_tx_sender,
136 commit_open_tx_register,
137 commit_open_tx_notifier: commit_open_tx_sender,
138 interval_register,
139 interval_notifier: interval_sender,
140 start_register,
141 start_notifier: start_sender,
142 stop_register,
143 stop_notifier: stop_sender,
144 stop: StopHandler::new(
145 SignalSender::Tokio(signal_sender),
146 None,
147 "notify".to_string(),
148 ),
149 handle,
150 }
151 }
152
153 fn handle_register_new_open_tx(&mut self, msg: Request<String, Receiver<OpenTransaction>>) {
154 let Request {
155 responder,
156 arguments: name,
157 } = msg;
158 log::debug!("Register new_open_tx {:?}", name);
159 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
160 self.new_open_tx_subscribers.insert(name, sender);
161 let _ = responder.send(receiver);
162 }
163
164 async fn handle_notify_new_open_tx(&mut self, otx: OpenTransaction) {
165 log::trace!("event new open tx {:?}", otx);
166 for subscriber in self.new_open_tx_subscribers.values() {
168 let _ = subscriber.send(otx.clone()).await;
169 }
170 }
171
172 fn handle_register_commit_open_tx(&mut self, msg: Request<String, Receiver<Vec<H256>>>) {
173 let Request {
174 responder,
175 arguments: name,
176 } = msg;
177 log::debug!("Register commit_open_tx {:?}", name);
178 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
179 self.commit_open_tx_subscribers.insert(name, sender);
180 let _ = responder.send(receiver);
181 }
182
183 async fn handle_notify_commit_open_tx(&mut self, otx_hashes: Vec<H256>) {
184 log::trace!("event commit open tx {:?}", otx_hashes);
185 for subscriber in self.commit_open_tx_subscribers.values() {
187 let _ = subscriber.send(otx_hashes.clone()).await;
188 }
189 }
190
191 fn handle_register_interval(&mut self, msg: Request<String, Receiver<u64>>) {
192 let Request {
193 responder,
194 arguments: name,
195 } = msg;
196 log::debug!("Register interval event plugin: {:?}", name);
197 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
198 self.interval_subscribers.insert(name, sender);
199 let _ = responder.send(receiver);
200 }
201
202 async fn handle_notify_interval(&mut self, elapsed_secs: u64) {
203 log::trace!("event interval");
204 for subscriber in self.interval_subscribers.values() {
206 let _ = subscriber.send(elapsed_secs).await;
207 }
208 }
209
210 fn handle_register_start(&mut self, msg: Request<String, Receiver<()>>) {
211 let Request {
212 responder,
213 arguments: name,
214 } = msg;
215 log::debug!("Register start {:?}", name);
216 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
217 self.start_subscribers.insert(name, sender);
218 let _ = responder.send(receiver);
219 }
220
221 async fn handle_notify_start(&mut self) {
222 log::trace!("event start");
223 for subscriber in self.start_subscribers.values() {
225 let _ = subscriber.send(()).await;
226 }
227 }
228
229 fn handle_register_stop(&mut self, msg: Request<String, Receiver<()>>) {
230 let Request {
231 responder,
232 arguments: name,
233 } = msg;
234 log::debug!("Register stop {:?}", name);
235 let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
236 self.stop_subscribers.insert(name, sender);
237 let _ = responder.send(receiver);
238 }
239
240 async fn handle_notify_stop(&mut self) {
241 log::trace!("event stop");
242 for subscriber in self.stop_subscribers.values() {
244 let _ = subscriber.send(()).await;
245 }
246 }
247}
248
249impl NotifyController {
250 pub async fn subscribe_new_open_tx<S: ToString>(&self, name: S) -> Receiver<OpenTransaction> {
251 Request::call(&self.new_open_tx_register, name.to_string())
252 .await
253 .expect("Subscribe new open tx should be OK")
254 }
255
256 pub fn notify_new_open_tx(&self, otx: OpenTransaction) {
257 let new_open_tx_notifier = self.new_open_tx_notifier.clone();
258 self.handle.spawn(async move {
259 let _ = new_open_tx_notifier.send(otx).await;
260 });
261 }
262
263 pub async fn subscribe_commit_open_tx<S: ToString>(&self, name: S) -> Receiver<Vec<H256>> {
264 Request::call(&self.commit_open_tx_register, name.to_string())
265 .await
266 .expect("Subscribe commit open tx should be OK")
267 }
268
269 pub fn notify_commit_open_tx(&self, otx_hashes: Vec<H256>) {
270 let commit_open_tx_notifier = self.commit_open_tx_notifier.clone();
271 self.handle.spawn(async move {
272 let _ = commit_open_tx_notifier.send(otx_hashes).await;
273 });
274 }
275
276 pub async fn subscribe_interval<S: ToString>(&self, name: S) -> Receiver<u64> {
277 Request::call(&self.interval_register, name.to_string())
278 .await
279 .expect("Subscribe interval should be OK")
280 }
281
282 pub fn notify_interval(&self, elapsed_secs: u64) {
283 let interval_notifier = self.interval_notifier.clone();
284 self.handle.spawn(async move {
285 let _ = interval_notifier.send(elapsed_secs).await;
286 });
287 }
288
289 pub async fn subscribe_start<S: ToString>(&self, name: S) -> Receiver<()> {
290 Request::call(&self.start_register, name.to_string())
291 .await
292 .expect("Subscribe start should be OK")
293 }
294
295 pub fn notify_start(&self) {
296 let start_notifier = self.start_notifier.clone();
297 self.handle.spawn(async move {
298 let _ = start_notifier.send(()).await;
299 });
300 }
301
302 pub async fn subscribe_stop<S: ToString>(&self, name: S) -> Receiver<()> {
303 Request::call(&self.stop_register, name.to_string())
304 .await
305 .expect("Subscribe stop should be OK")
306 }
307
308 pub fn notify_stop(&self) {
309 let stop_notifier = self.stop_notifier.clone();
310 self.handle.spawn(async move {
311 let _ = stop_notifier.send(()).await;
312 });
313 }
314}