rpc_core/net/
rpc_server.rs1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::{debug, trace};
5
6use crate::net::config::RpcConfig;
7use crate::net::detail::tcp_channel::TcpChannel;
8use crate::net::tcp_server::TcpServer;
9use crate::rpc::{Rpc, RpcProto};
10
11pub struct RpcSession {
12 pub rpc: RefCell<Rc<Rpc>>,
13 on_close: RefCell<Option<Box<dyn Fn()>>>,
14 channel: Weak<TcpChannel>,
15}
16
17impl RpcSession {
18 pub fn new(rpc: Rc<Rpc>, channel: Weak<TcpChannel>) -> Rc<Self> {
19 Rc::new(Self {
20 rpc: rpc.into(),
21 on_close: None.into(),
22 channel,
23 })
24 }
25
26 pub fn on_close<F>(&self, callback: F)
27 where
28 F: Fn() + 'static,
29 {
30 *self.on_close.borrow_mut() = Some(Box::new(callback));
31 }
32}
33
34impl Drop for RpcSession {
35 fn drop(&mut self) {
36 trace!("~RpcSession");
37 }
38}
39
40pub struct RpcServer {
41 config: Rc<RefCell<RpcConfig>>,
42 server: Rc<TcpServer>,
43 on_session: RefCell<Option<Box<dyn Fn(Weak<RpcSession>)>>>,
44 this: RefCell<Weak<Self>>,
45}
46
47impl RpcServer {
48 pub fn new(port: u16, config: RpcConfig) -> Rc<Self> {
49 let tcp_config = config.to_tcp_config();
50 let config = Rc::new(RefCell::new(config));
51 let r = Rc::new(Self {
52 config,
53 server: TcpServer::new(port, tcp_config),
54 on_session: None.into(),
55 this: Weak::new().into(),
56 });
57 let this_weak = Rc::downgrade(&r);
58 *r.this.borrow_mut() = this_weak.clone().into();
59 r.server.on_session(move |session| {
60 let this = this_weak.upgrade().unwrap();
61 let tcp_channel = session.upgrade().unwrap();
62 let rpc = if let Some(rpc) = this.config.borrow().rpc.clone() {
63 if rpc.is_ready() {
64 debug!("rpc already connected");
65 tcp_channel.close();
66 return;
67 }
68 rpc
69 } else {
70 Rpc::new(None)
71 };
72
73 let rpc_session = RpcSession::new(rpc.clone(), Rc::downgrade(&tcp_channel));
74 let rs_weak = Rc::downgrade(&rpc_session);
75
76 {
77 let rs_weak = rs_weak.clone();
78 rpc.get_connection()
79 .borrow_mut()
80 .set_send_package_impl(Box::new(move |package: Vec<u8>| {
81 if let Some(rs) = rs_weak.upgrade() {
82 rs.channel.upgrade().unwrap().send(package);
83 }
84 }));
85 }
86 {
87 let rs_weak = rs_weak.clone();
88 tcp_channel.on_data(move |package| {
89 if let Some(rs) = rs_weak.upgrade() {
90 rs.rpc
91 .borrow()
92 .get_connection()
93 .borrow()
94 .on_recv_package(package);
95 }
96 });
97 }
98
99 rpc.set_timer(|ms: u32, handle: Box<dyn Fn()>| {
100 tokio::task::spawn_local(async move {
101 tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
102 handle();
103 });
104 });
105 {
106 let rs = rpc_session.clone();
108 tcp_channel.on_close(move || {
110 rs.rpc.borrow_mut().set_ready(false);
111 });
113 }
114 rpc_session.rpc.borrow_mut().set_ready(true);
115
116 {
117 let on_session = this.on_session.borrow();
118 if let Some(on_session) = on_session.as_ref() {
119 on_session(rs_weak);
120 }
121 }
122 });
123 r
124 }
125
126 pub fn downgrade(&self) -> Weak<Self> {
127 self.this.borrow().clone()
128 }
129
130 pub fn start(&self) {
131 self.server.start();
132 }
133
134 pub fn stop(&self) {
135 self.server.stop();
136 }
137
138 pub fn on_session<F>(&self, callback: F)
139 where
140 F: Fn(Weak<RpcSession>) + 'static,
141 {
142 *self.on_session.borrow_mut() = Some(Box::new(callback));
143 }
144}