rpc_core_net/
rpc_server.rs1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::{debug, trace};
5
6use rpc_core::rpc::{Rpc, RpcProto};
7
8use crate::config::RpcConfig;
9use crate::detail::tcp_channel::TcpChannel;
10use crate::tcp_server::TcpServer;
11
12pub struct RpcSession {
13 pub rpc: RefCell<Rc<Rpc>>,
14 on_close: RefCell<Option<Box<dyn Fn()>>>,
15 channel: Weak<TcpChannel>,
16}
17
18impl RpcSession {
19 pub fn new(rpc: Rc<Rpc>, channel: Weak<TcpChannel>) -> Rc<Self> {
20 Rc::new(Self {
21 rpc: rpc.into(),
22 on_close: None.into(),
23 channel,
24 })
25 }
26
27 pub fn on_close<F>(&self, callback: F)
28 where F: Fn() + 'static,
29 {
30 *self.on_close.borrow_mut() = Some(Box::new(callback));
31 }
32}
33
34impl Drop for RpcSession {
35 fn drop(&mut self) {
36 trace!("~RpcSession");
37 }
38}
39
40pub struct RpcServer {
41 config: Rc<RefCell<RpcConfig>>,
42 server: Rc<TcpServer>,
43 on_session: RefCell<Option<Box<dyn Fn(Weak<RpcSession>)>>>,
44 this: RefCell<Weak<Self>>,
45}
46
47impl RpcServer {
48 pub fn new(port: u16, config: RpcConfig) -> Rc<Self> {
49 let tcp_config = config.to_tcp_config();
50 let config = Rc::new(RefCell::new(config));
51 let r = Rc::new(Self {
52 config,
53 server: TcpServer::new(port, tcp_config),
54 on_session: None.into(),
55 this: Weak::new().into(),
56 });
57 let this_weak = Rc::downgrade(&r);
58 *r.this.borrow_mut() = this_weak.clone().into();
59 r.server.on_session(move |session| {
60 let this = this_weak.upgrade().unwrap();
61 let tcp_channel = session.upgrade().unwrap();
62 let rpc = if let Some(rpc) = this.config.borrow().rpc.clone() {
63 if rpc.is_ready() {
64 debug!("rpc already connected");
65 tcp_channel.do_close();
66 return;
67 }
68 rpc
69 } else {
70 Rpc::new(None)
71 };
72
73 let rpc_session = RpcSession::new(rpc.clone(), Rc::downgrade(&tcp_channel));
74 let rs_weak = Rc::downgrade(&rpc_session);
75
76 {
77 let rs_weak = rs_weak.clone();
78 rpc.get_connection().borrow_mut().set_send_package_impl(Box::new(move |package: Vec<u8>| {
79 if let Some(rs) = rs_weak.upgrade() {
80 rs.channel.upgrade().unwrap().send(package);
81 }
82 }));
83 }
84 {
85 let rs_weak = rs_weak.clone();
86 tcp_channel.on_data(move |package| {
87 if let Some(rs) = rs_weak.upgrade() {
88 rs.rpc.borrow().get_connection().borrow().on_recv_package(package);
89 }
90 });
91 }
92
93 rpc.set_timer(|ms: u32, handle: Box<dyn Fn()>| {
94 tokio::task::spawn_local(async move {
95 tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
96 handle();
97 });
98 });
99 {
100 let rs = rpc_session.clone();
102 tcp_channel.on_close(move || {
104 rs.rpc.borrow_mut().set_ready(false);
105 });
107 }
108 rpc_session.rpc.borrow_mut().set_ready(true);
109
110 {
111 let on_session = this.on_session.borrow();
112 if let Some(on_session) = on_session.as_ref() {
113 on_session(rs_weak);
114 }
115 }
116 });
117 r
118 }
119
120 pub fn downgrade(&self) -> Weak<Self> {
121 self.this.borrow().clone()
122 }
123
124 pub fn start(&self) {
125 self.server.start();
126 }
127
128 pub fn stop(&self) {
129 self.server.stop();
130 }
131
132 pub fn on_session<F>(&self, callback: F)
133 where F: Fn(Weak<RpcSession>) + 'static,
134 {
135 *self.on_session.borrow_mut() = Some(Box::new(callback));
136 }
137}
138