rpc_core_net/
rpc_server.rs

1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::{debug, trace};
5
6use rpc_core::rpc::{Rpc, RpcProto};
7
8use crate::config::RpcConfig;
9use crate::detail::tcp_channel::TcpChannel;
10use crate::tcp_server::TcpServer;
11
12pub struct RpcSession {
13    pub rpc: RefCell<Rc<Rpc>>,
14    on_close: RefCell<Option<Box<dyn Fn()>>>,
15    channel: Weak<TcpChannel>,
16}
17
18impl RpcSession {
19    pub fn new(rpc: Rc<Rpc>, channel: Weak<TcpChannel>) -> Rc<Self> {
20        Rc::new(Self {
21            rpc: rpc.into(),
22            on_close: None.into(),
23            channel,
24        })
25    }
26
27    pub fn on_close<F>(&self, callback: F)
28        where F: Fn() + 'static,
29    {
30        *self.on_close.borrow_mut() = Some(Box::new(callback));
31    }
32}
33
34impl Drop for RpcSession {
35    fn drop(&mut self) {
36        trace!("~RpcSession");
37    }
38}
39
40pub struct RpcServer {
41    config: Rc<RefCell<RpcConfig>>,
42    server: Rc<TcpServer>,
43    on_session: RefCell<Option<Box<dyn Fn(Weak<RpcSession>)>>>,
44    this: RefCell<Weak<Self>>,
45}
46
47impl RpcServer {
48    pub fn new(port: u16, config: RpcConfig) -> Rc<Self> {
49        let tcp_config = config.to_tcp_config();
50        let config = Rc::new(RefCell::new(config));
51        let r = Rc::new(Self {
52            config,
53            server: TcpServer::new(port, tcp_config),
54            on_session: None.into(),
55            this: Weak::new().into(),
56        });
57        let this_weak = Rc::downgrade(&r);
58        *r.this.borrow_mut() = this_weak.clone().into();
59        r.server.on_session(move |session| {
60            let this = this_weak.upgrade().unwrap();
61            let tcp_channel = session.upgrade().unwrap();
62            let rpc = if let Some(rpc) = this.config.borrow().rpc.clone() {
63                if rpc.is_ready() {
64                    debug!("rpc already connected");
65                    tcp_channel.do_close();
66                    return;
67                }
68                rpc
69            } else {
70                Rpc::new(None)
71            };
72
73            let rpc_session = RpcSession::new(rpc.clone(), Rc::downgrade(&tcp_channel));
74            let rs_weak = Rc::downgrade(&rpc_session);
75
76            {
77                let rs_weak = rs_weak.clone();
78                rpc.get_connection().borrow_mut().set_send_package_impl(Box::new(move |package: Vec<u8>| {
79                    if let Some(rs) = rs_weak.upgrade() {
80                        rs.channel.upgrade().unwrap().send(package);
81                    }
82                }));
83            }
84            {
85                let rs_weak = rs_weak.clone();
86                tcp_channel.on_data(move |package| {
87                    if let Some(rs) = rs_weak.upgrade() {
88                        rs.rpc.borrow().get_connection().borrow().on_recv_package(package);
89                    }
90                });
91            }
92
93            rpc.set_timer(|ms: u32, handle: Box<dyn Fn()>| {
94                tokio::task::spawn_local(async move {
95                    tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
96                    handle();
97                });
98            });
99            {
100                // bind rpc_session lifecycle to tcp_session and end with on_close
101                let rs = rpc_session.clone();
102                // let tc_weak = Rc::downgrade(&tcp_channel);
103                tcp_channel.on_close(move || {
104                    rs.rpc.borrow_mut().set_ready(false);
105                    // *tc_weak.upgrade().unwrap().on_close.borrow_mut() = None;
106                });
107            }
108            rpc_session.rpc.borrow_mut().set_ready(true);
109
110            {
111                let on_session = this.on_session.borrow();
112                if let Some(on_session) = on_session.as_ref() {
113                    on_session(rs_weak);
114                }
115            }
116        });
117        r
118    }
119
120    pub fn downgrade(&self) -> Weak<Self> {
121        self.this.borrow().clone()
122    }
123
124    pub fn start(&self) {
125        self.server.start();
126    }
127
128    pub fn stop(&self) {
129        self.server.stop();
130    }
131
132    pub fn on_session<F>(&self, callback: F)
133        where F: Fn(Weak<RpcSession>) + 'static,
134    {
135        *self.on_session.borrow_mut() = Some(Box::new(callback));
136    }
137}
138