rpc_core/net/
rpc_client.rs1use std::cell::RefCell;
2use std::error::Error;
3use std::rc::Rc;
4
5use crate::connection::Connection;
6use crate::net::config::RpcConfig;
7use crate::net::tcp_client::TcpClient;
8use crate::rpc::Rpc;
9
10pub struct RpcClientImpl {
11 tcp_client: Rc<TcpClient>,
12 config: RpcConfig,
13 on_open: Option<Box<dyn Fn(Rc<Rpc>)>>,
14 on_open_failed: Option<Box<dyn Fn(&dyn Error)>>,
15 on_close: Option<Box<dyn Fn()>>,
16 connection: Rc<RefCell<dyn Connection>>,
17 rpc: Option<Rc<Rpc>>,
18}
19
20pub struct RpcClient {
21 inner: RefCell<RpcClientImpl>,
22}
23
24impl RpcClient {
25 pub fn new(config: RpcConfig) -> Rc<Self> {
26 let r = Rc::new(Self {
27 inner: RefCell::new(RpcClientImpl {
28 tcp_client: TcpClient::new(config.to_tcp_config()),
29 config,
30 on_open: None,
31 on_open_failed: None,
32 on_close: None,
33 connection: crate::connection::DefaultConnection::new(),
34 rpc: None,
35 }),
36 });
37
38 let this_weak = Rc::downgrade(&r);
39 r.inner.borrow_mut().tcp_client.on_open(move || {
40 let this = this_weak.upgrade().unwrap();
41 {
42 let mut this = this.inner.borrow_mut();
43 if let Some(rpc) = this.config.rpc.clone() {
44 this.connection = rpc.get_connection();
45 this.rpc = Some(rpc);
46 } else {
47 this.rpc = Some(Rpc::new(Some(this.connection.clone())));
48 }
49 }
50
51 {
52 let this_weak = this_weak.clone();
53 this.inner
54 .borrow()
55 .connection
56 .borrow_mut()
57 .set_send_package_impl(Box::new(move |package: Vec<u8>| {
58 if let Some(this) = this_weak.upgrade() {
59 this.inner.borrow().tcp_client.send(package);
60 }
61 }));
62 }
63 {
64 let this_weak = this_weak.clone();
65 this.inner.borrow().tcp_client.on_data(move |package| {
66 if let Some(this) = this_weak.upgrade() {
67 this.inner
68 .borrow()
69 .connection
70 .borrow_mut()
71 .on_recv_package(package);
72 }
73 });
74 }
75
76 this.inner.borrow_mut().rpc.as_ref().unwrap().set_timer(
77 |ms: u32, handle: Box<dyn Fn()>| {
78 tokio::task::spawn_local(async move {
79 tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
80 handle();
81 });
82 },
83 );
84 {
85 let this_weak = this_weak.clone();
86 this.inner.borrow().tcp_client.on_close(move || {
87 let this = this_weak.upgrade().unwrap();
88 this.inner.borrow().rpc.as_ref().unwrap().set_ready(false);
89 {
90 let inner = this.inner.borrow();
91 if let Some(on_close) = inner.on_close.as_ref() {
92 on_close();
93 }
94 }
95 });
96 }
97 this.inner
98 .borrow_mut()
99 .rpc
100 .as_ref()
101 .unwrap()
102 .set_ready(true);
103
104 {
105 let this = this.inner.borrow();
106 if let Some(on_open) = &this.on_open {
107 on_open(this.rpc.clone().unwrap());
108 }
109 }
110 });
111
112 let this_weak = Rc::downgrade(&r);
113 r.inner.borrow_mut().tcp_client.on_open_failed(move |e| {
114 let this = this_weak.upgrade().unwrap();
115 {
116 let inner = this.inner.borrow();
117 if let Some(on_open_failed) = inner.on_open_failed.as_ref() {
118 on_open_failed(e);
119 }
120 }
121 });
122
123 r
124 }
125
126 pub fn open(&self, host: impl ToString, port: u16) {
127 self.inner.borrow_mut().tcp_client.open(host, port);
128 }
129
130 pub fn close(&self) {
131 self.inner.borrow().tcp_client.close();
132 }
133
134 pub fn set_reconnect(&self, ms: u32) {
135 self.inner.borrow_mut().tcp_client.set_reconnect(ms);
136 }
137
138 pub fn cancel_reconnect(&self) {
139 self.inner.borrow_mut().tcp_client.cancel_reconnect();
140 }
141
142 pub fn stop(&mut self) {
143 self.close();
144 }
145
146 pub fn on_open<F>(&self, callback: F)
147 where
148 F: Fn(Rc<Rpc>) + 'static,
149 {
150 self.inner.borrow_mut().on_open = Some(Box::new(callback));
151 }
152
153 pub fn on_open_failed<F>(&self, callback: F)
154 where
155 F: Fn(&dyn Error) + 'static,
156 {
157 self.inner.borrow_mut().on_open_failed = Some(Box::new(callback));
158 }
159
160 pub fn on_close<F>(&self, callback: F)
161 where
162 F: Fn() + 'static,
163 {
164 self.inner.borrow_mut().on_close = Some(Box::new(callback));
165 }
166}