1use std::sync::Arc;
2
3use async_std::channel::Sender;
4use atm0s_sdn_identity::{ConnId, NodeId};
5use atm0s_sdn_network::{
6 behaviour::{ConnectionContext, ConnectionHandler, ConnectionHandlerAction},
7 transport::ConnectionEvent,
8};
9use atm0s_sdn_utils::error_handle::ErrorUtils;
10use parking_lot::Mutex;
11
12use crate::{
13 rpc_msg::{RpcError, RpcMsg},
14 rpc_queue::RpcQueue,
15};
16
17pub struct RpcHandler {
18 pub(crate) rpc_queue: Arc<Mutex<RpcQueue<Sender<Result<RpcMsg, RpcError>>>>>,
19 pub(crate) tx: Sender<RpcMsg>,
20}
21
22impl<BE, HE> ConnectionHandler<BE, HE> for RpcHandler {
23 fn on_opened(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
24
25 fn on_tick(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _interval_ms: u64) {}
26
27 fn on_awake(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
28
29 fn on_event(&mut self, _ctx: &ConnectionContext, now_ms: u64, event: ConnectionEvent) {
30 if let ConnectionEvent::Msg(msg) = event {
31 let mut rpc_queue = self.rpc_queue.lock();
32 if let Some(msg) = rpc_queue.on_msg(now_ms, msg) {
33 if msg.is_answer() {
34 let req_id = msg.req_id().expect("Should has");
35 if let Some(tx) = rpc_queue.take_request(req_id) {
36 log::info!("[RpcHandler] on answer {}", msg.cmd);
37 tx.try_send(Ok(msg)).print_error("Should send");
38 } else {
39 log::warn!("[RpcHandler] on answer {} without tx", msg.cmd);
40 }
41 } else {
42 self.tx.try_send(msg).print_error("Should send");
43 }
44 }
45 }
46 }
47
48 fn on_other_handler_event(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _from_node: NodeId, _from_conn: ConnId, _event: HE) {}
49
50 fn on_behavior_event(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _event: HE) {}
51
52 fn on_closed(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
53
54 fn pop_action(&mut self) -> Option<ConnectionHandlerAction<BE, HE>> {
55 None
56 }
57}