atm0s_sdn_rpc/
handler.rs

1use std::sync::Arc;
2
3use async_std::channel::Sender;
4use atm0s_sdn_identity::{ConnId, NodeId};
5use atm0s_sdn_network::{
6    behaviour::{ConnectionContext, ConnectionHandler, ConnectionHandlerAction},
7    transport::ConnectionEvent,
8};
9use atm0s_sdn_utils::error_handle::ErrorUtils;
10use parking_lot::Mutex;
11
12use crate::{
13    rpc_msg::{RpcError, RpcMsg},
14    rpc_queue::RpcQueue,
15};
16
17pub struct RpcHandler {
18    pub(crate) rpc_queue: Arc<Mutex<RpcQueue<Sender<Result<RpcMsg, RpcError>>>>>,
19    pub(crate) tx: Sender<RpcMsg>,
20}
21
22impl<BE, HE> ConnectionHandler<BE, HE> for RpcHandler {
23    fn on_opened(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
24
25    fn on_tick(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _interval_ms: u64) {}
26
27    fn on_awake(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
28
29    fn on_event(&mut self, _ctx: &ConnectionContext, now_ms: u64, event: ConnectionEvent) {
30        if let ConnectionEvent::Msg(msg) = event {
31            let mut rpc_queue = self.rpc_queue.lock();
32            if let Some(msg) = rpc_queue.on_msg(now_ms, msg) {
33                if msg.is_answer() {
34                    let req_id = msg.req_id().expect("Should has");
35                    if let Some(tx) = rpc_queue.take_request(req_id) {
36                        log::info!("[RpcHandler] on answer {}", msg.cmd);
37                        tx.try_send(Ok(msg)).print_error("Should send");
38                    } else {
39                        log::warn!("[RpcHandler] on answer {} without tx", msg.cmd);
40                    }
41                } else {
42                    self.tx.try_send(msg).print_error("Should send");
43                }
44            }
45        }
46    }
47
48    fn on_other_handler_event(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _from_node: NodeId, _from_conn: ConnId, _event: HE) {}
49
50    fn on_behavior_event(&mut self, _ctx: &ConnectionContext, _now_ms: u64, _event: HE) {}
51
52    fn on_closed(&mut self, _ctx: &ConnectionContext, _now_ms: u64) {}
53
54    fn pop_action(&mut self) -> Option<ConnectionHandlerAction<BE, HE>> {
55        None
56    }
57}