rpc_core_net/
rpc_client.rs1use std::cell::RefCell;
2use std::error::Error;
3use std::rc::Rc;
4
5use rpc_core::connection::Connection;
6use rpc_core::rpc::Rpc;
7
8use crate::config::RpcConfig;
9use crate::tcp_client::TcpClient;
10
11pub struct RpcClientImpl {
12 tcp_client: Rc<TcpClient>,
13 config: RpcConfig,
14 on_open: Option<Box<dyn Fn(Rc<Rpc>)>>,
15 on_open_failed: Option<Box<dyn Fn(&dyn Error)>>,
16 on_close: Option<Box<dyn Fn()>>,
17 connection: Rc<RefCell<dyn Connection>>,
18 rpc: Option<Rc<Rpc>>,
19}
20
21pub struct RpcClient {
22 inner: RefCell<RpcClientImpl>,
23}
24
25impl RpcClient {
26 pub fn new(config: RpcConfig) -> Rc<Self> {
27 let r = Rc::new(Self {
28 inner: RefCell::new(RpcClientImpl {
29 tcp_client: TcpClient::new(config.to_tcp_config()),
30 config,
31 on_open: None,
32 on_open_failed: None,
33 on_close: None,
34 connection: rpc_core::connection::DefaultConnection::new(),
35 rpc: None,
36 })
37 });
38 let this_weak = Rc::downgrade(&r);
39 r.inner.borrow_mut().tcp_client.on_open(move || {
40 let this = this_weak.upgrade().unwrap();
41 {
42 let mut this = this.inner.borrow_mut();
43 if let Some(rpc) = this.config.rpc.clone() {
44 this.connection = rpc.get_connection();
45 this.rpc = Some(rpc);
46 } else {
47 this.rpc = Some(Rpc::new(Some(this.connection.clone())));
48 }
49 }
50
51 {
52 let this_weak = this_weak.clone();
53 this.inner.borrow().connection.borrow_mut().set_send_package_impl(Box::new(move |package: Vec<u8>| {
54 if let Some(this) = this_weak.upgrade() {
55 this.inner.borrow().tcp_client.send(package);
56 }
57 }));
58 }
59 {
60 let this_weak = this_weak.clone();
61 this.inner.borrow().tcp_client.on_data(move |package| {
62 if let Some(this) = this_weak.upgrade() {
63 this.inner.borrow().connection.borrow_mut().on_recv_package(package);
64 }
65 });
66 }
67
68 this.inner.borrow_mut().rpc.as_ref().unwrap().set_timer(|ms: u32, handle: Box<dyn Fn()>| {
69 tokio::task::spawn_local(async move {
70 tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
71 handle();
72 });
73 });
74 {
75 let this_weak = this_weak.clone();
76 this.inner.borrow().tcp_client.on_close(move || {
77 let this = this_weak.upgrade().unwrap();
78 this.inner.borrow().rpc.as_ref().unwrap().set_ready(false);
79 });
80 }
81 this.inner.borrow_mut().rpc.as_ref().unwrap().set_ready(true);
82
83 {
84 let this = this.inner.borrow();
85 if let Some(on_open) = &this.on_open {
86 on_open(this.rpc.clone().unwrap());
87 }
88 }
89 });
90 r
91 }
92
93 pub fn open(&self, host: impl ToString, port: u16) {
94 self.inner.borrow_mut().tcp_client.open(host, port);
95 }
96
97 pub fn close(&self) {
98 self.inner.borrow().tcp_client.close();
99 }
100
101 pub fn set_reconnect(&self, ms: u32) {
102 self.inner.borrow_mut().tcp_client.set_reconnect(ms);
103 }
104
105 pub fn cancel_reconnect(&self) {
106 self.inner.borrow_mut().tcp_client.cancel_reconnect();
107 }
108
109 pub fn stop(&mut self) {
110 self.close();
111 }
112
113 pub fn on_open<F>(&self, callback: F)
114 where F: Fn(Rc<Rpc>) + 'static,
115 {
116 self.inner.borrow_mut().on_open = Some(Box::new(callback));
117 }
118
119 pub fn on_open_failed<F>(&self, callback: F)
120 where F: Fn(&dyn Error) + 'static,
121 {
122 self.inner.borrow_mut().on_open_failed = Some(Box::new(callback));
123 }
124
125 pub fn on_close<F>(&self, callback: F)
126 where F: Fn() + 'static,
127 {
128 self.inner.borrow_mut().on_close = Some(Box::new(callback));
129 }
130}
131