rpc_core_net/
rpc_client.rs

1use std::cell::RefCell;
2use std::error::Error;
3use std::rc::Rc;
4
5use rpc_core::connection::Connection;
6use rpc_core::rpc::Rpc;
7
8use crate::config::RpcConfig;
9use crate::tcp_client::TcpClient;
10
11pub struct RpcClientImpl {
12    tcp_client: Rc<TcpClient>,
13    config: RpcConfig,
14    on_open: Option<Box<dyn Fn(Rc<Rpc>)>>,
15    on_open_failed: Option<Box<dyn Fn(&dyn Error)>>,
16    on_close: Option<Box<dyn Fn()>>,
17    connection: Rc<RefCell<dyn Connection>>,
18    rpc: Option<Rc<Rpc>>,
19}
20
21pub struct RpcClient {
22    inner: RefCell<RpcClientImpl>,
23}
24
25impl RpcClient {
26    pub fn new(config: RpcConfig) -> Rc<Self> {
27        let r = Rc::new(Self {
28            inner: RefCell::new(RpcClientImpl {
29                tcp_client: TcpClient::new(config.to_tcp_config()),
30                config,
31                on_open: None,
32                on_open_failed: None,
33                on_close: None,
34                connection: rpc_core::connection::DefaultConnection::new(),
35                rpc: None,
36            })
37        });
38        let this_weak = Rc::downgrade(&r);
39        r.inner.borrow_mut().tcp_client.on_open(move || {
40            let this = this_weak.upgrade().unwrap();
41            {
42                let mut this = this.inner.borrow_mut();
43                if let Some(rpc) = this.config.rpc.clone() {
44                    this.connection = rpc.get_connection();
45                    this.rpc = Some(rpc);
46                } else {
47                    this.rpc = Some(Rpc::new(Some(this.connection.clone())));
48                }
49            }
50
51            {
52                let this_weak = this_weak.clone();
53                this.inner.borrow().connection.borrow_mut().set_send_package_impl(Box::new(move |package: Vec<u8>| {
54                    if let Some(this) = this_weak.upgrade() {
55                        this.inner.borrow().tcp_client.send(package);
56                    }
57                }));
58            }
59            {
60                let this_weak = this_weak.clone();
61                this.inner.borrow().tcp_client.on_data(move |package| {
62                    if let Some(this) = this_weak.upgrade() {
63                        this.inner.borrow().connection.borrow_mut().on_recv_package(package);
64                    }
65                });
66            }
67
68            this.inner.borrow_mut().rpc.as_ref().unwrap().set_timer(|ms: u32, handle: Box<dyn Fn()>| {
69                tokio::task::spawn_local(async move {
70                    tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
71                    handle();
72                });
73            });
74            {
75                let this_weak = this_weak.clone();
76                this.inner.borrow().tcp_client.on_close(move || {
77                    let this = this_weak.upgrade().unwrap();
78                    this.inner.borrow().rpc.as_ref().unwrap().set_ready(false);
79                });
80            }
81            this.inner.borrow_mut().rpc.as_ref().unwrap().set_ready(true);
82
83            {
84                let this = this.inner.borrow();
85                if let Some(on_open) = &this.on_open {
86                    on_open(this.rpc.clone().unwrap());
87                }
88            }
89        });
90        r
91    }
92
93    pub fn open(&self, host: impl ToString, port: u16) {
94        self.inner.borrow_mut().tcp_client.open(host, port);
95    }
96
97    pub fn close(&self) {
98        self.inner.borrow().tcp_client.close();
99    }
100
101    pub fn set_reconnect(&self, ms: u32) {
102        self.inner.borrow_mut().tcp_client.set_reconnect(ms);
103    }
104
105    pub fn cancel_reconnect(&self) {
106        self.inner.borrow_mut().tcp_client.cancel_reconnect();
107    }
108
109    pub fn stop(&mut self) {
110        self.close();
111    }
112
113    pub fn on_open<F>(&self, callback: F)
114        where F: Fn(Rc<Rpc>) + 'static,
115    {
116        self.inner.borrow_mut().on_open = Some(Box::new(callback));
117    }
118
119    pub fn on_open_failed<F>(&self, callback: F)
120        where F: Fn(&dyn Error) + 'static,
121    {
122        self.inner.borrow_mut().on_open_failed = Some(Box::new(callback));
123    }
124
125    pub fn on_close<F>(&self, callback: F)
126        where F: Fn() + 'static,
127    {
128        self.inner.borrow_mut().on_close = Some(Box::new(callback));
129    }
130}
131