rpc_core/net/
rpc_server.rs

1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::{debug, trace};
5
6use crate::net::config::RpcConfig;
7use crate::net::detail::tcp_channel::TcpChannel;
8use crate::net::tcp_server::TcpServer;
9use crate::rpc::{Rpc, RpcProto};
10
11pub struct RpcSession {
12    pub rpc: RefCell<Rc<Rpc>>,
13    on_close: RefCell<Option<Box<dyn Fn()>>>,
14    channel: Weak<TcpChannel>,
15}
16
17impl RpcSession {
18    pub fn new(rpc: Rc<Rpc>, channel: Weak<TcpChannel>) -> Rc<Self> {
19        Rc::new(Self {
20            rpc: rpc.into(),
21            on_close: None.into(),
22            channel,
23        })
24    }
25
26    pub fn on_close<F>(&self, callback: F)
27    where
28        F: Fn() + 'static,
29    {
30        *self.on_close.borrow_mut() = Some(Box::new(callback));
31    }
32}
33
34impl Drop for RpcSession {
35    fn drop(&mut self) {
36        trace!("~RpcSession");
37    }
38}
39
40pub struct RpcServer {
41    config: Rc<RefCell<RpcConfig>>,
42    server: Rc<TcpServer>,
43    on_session: RefCell<Option<Box<dyn Fn(Weak<RpcSession>)>>>,
44    this: RefCell<Weak<Self>>,
45}
46
47impl RpcServer {
48    pub fn new(port: u16, config: RpcConfig) -> Rc<Self> {
49        let tcp_config = config.to_tcp_config();
50        let config = Rc::new(RefCell::new(config));
51        let r = Rc::new(Self {
52            config,
53            server: TcpServer::new(port, tcp_config),
54            on_session: None.into(),
55            this: Weak::new().into(),
56        });
57        let this_weak = Rc::downgrade(&r);
58        *r.this.borrow_mut() = this_weak.clone().into();
59        r.server.on_session(move |session| {
60            let this = this_weak.upgrade().unwrap();
61            let tcp_channel = session.upgrade().unwrap();
62            let rpc = if let Some(rpc) = this.config.borrow().rpc.clone() {
63                if rpc.is_ready() {
64                    debug!("rpc already connected");
65                    tcp_channel.close();
66                    return;
67                }
68                rpc
69            } else {
70                Rpc::new(None)
71            };
72
73            let rpc_session = RpcSession::new(rpc.clone(), Rc::downgrade(&tcp_channel));
74            let rs_weak = Rc::downgrade(&rpc_session);
75
76            {
77                let rs_weak = rs_weak.clone();
78                rpc.get_connection()
79                    .borrow_mut()
80                    .set_send_package_impl(Box::new(move |package: Vec<u8>| {
81                        if let Some(rs) = rs_weak.upgrade() {
82                            rs.channel.upgrade().unwrap().send(package);
83                        }
84                    }));
85            }
86            {
87                let rs_weak = rs_weak.clone();
88                tcp_channel.on_data(move |package| {
89                    if let Some(rs) = rs_weak.upgrade() {
90                        rs.rpc
91                            .borrow()
92                            .get_connection()
93                            .borrow()
94                            .on_recv_package(package);
95                    }
96                });
97            }
98
99            rpc.set_timer(|ms: u32, handle: Box<dyn Fn()>| {
100                tokio::task::spawn_local(async move {
101                    tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
102                    handle();
103                });
104            });
105            {
106                // bind rpc_session lifecycle to tcp_session and end with on_close
107                let rs = rpc_session.clone();
108                // let tc_weak = Rc::downgrade(&tcp_channel);
109                tcp_channel.on_close(move || {
110                    rs.rpc.borrow_mut().set_ready(false);
111                    // *tc_weak.upgrade().unwrap().on_close.borrow_mut() = None;
112                });
113            }
114            rpc_session.rpc.borrow_mut().set_ready(true);
115
116            {
117                let on_session = this.on_session.borrow();
118                if let Some(on_session) = on_session.as_ref() {
119                    on_session(rs_weak);
120                }
121            }
122        });
123        r
124    }
125
126    pub fn downgrade(&self) -> Weak<Self> {
127        self.this.borrow().clone()
128    }
129
130    pub fn start(&self) {
131        self.server.start();
132    }
133
134    pub fn stop(&self) {
135        self.server.stop();
136    }
137
138    pub fn on_session<F>(&self, callback: F)
139    where
140        F: Fn(Weak<RpcSession>) + 'static,
141    {
142        *self.on_session.borrow_mut() = Some(Box::new(callback));
143    }
144}