rpc_core/net/
rpc_client.rs

1use std::cell::RefCell;
2use std::error::Error;
3use std::rc::Rc;
4
5use crate::connection::Connection;
6use crate::net::config::RpcConfig;
7use crate::net::tcp_client::TcpClient;
8use crate::rpc::Rpc;
9
10pub struct RpcClientImpl {
11    tcp_client: Rc<TcpClient>,
12    config: RpcConfig,
13    on_open: Option<Box<dyn Fn(Rc<Rpc>)>>,
14    on_open_failed: Option<Box<dyn Fn(&dyn Error)>>,
15    on_close: Option<Box<dyn Fn()>>,
16    connection: Rc<RefCell<dyn Connection>>,
17    rpc: Option<Rc<Rpc>>,
18}
19
20pub struct RpcClient {
21    inner: RefCell<RpcClientImpl>,
22}
23
24impl RpcClient {
25    pub fn new(config: RpcConfig) -> Rc<Self> {
26        let r = Rc::new(Self {
27            inner: RefCell::new(RpcClientImpl {
28                tcp_client: TcpClient::new(config.to_tcp_config()),
29                config,
30                on_open: None,
31                on_open_failed: None,
32                on_close: None,
33                connection: crate::connection::DefaultConnection::new(),
34                rpc: None,
35            }),
36        });
37
38        let this_weak = Rc::downgrade(&r);
39        r.inner.borrow_mut().tcp_client.on_open(move || {
40            let this = this_weak.upgrade().unwrap();
41            {
42                let mut this = this.inner.borrow_mut();
43                if let Some(rpc) = this.config.rpc.clone() {
44                    this.connection = rpc.get_connection();
45                    this.rpc = Some(rpc);
46                } else {
47                    this.rpc = Some(Rpc::new(Some(this.connection.clone())));
48                }
49            }
50
51            {
52                let this_weak = this_weak.clone();
53                this.inner
54                    .borrow()
55                    .connection
56                    .borrow_mut()
57                    .set_send_package_impl(Box::new(move |package: Vec<u8>| {
58                        if let Some(this) = this_weak.upgrade() {
59                            this.inner.borrow().tcp_client.send(package);
60                        }
61                    }));
62            }
63            {
64                let this_weak = this_weak.clone();
65                this.inner.borrow().tcp_client.on_data(move |package| {
66                    if let Some(this) = this_weak.upgrade() {
67                        this.inner
68                            .borrow()
69                            .connection
70                            .borrow_mut()
71                            .on_recv_package(package);
72                    }
73                });
74            }
75
76            this.inner.borrow_mut().rpc.as_ref().unwrap().set_timer(
77                |ms: u32, handle: Box<dyn Fn()>| {
78                    tokio::task::spawn_local(async move {
79                        tokio::time::sleep(tokio::time::Duration::from_millis(ms as u64)).await;
80                        handle();
81                    });
82                },
83            );
84            {
85                let this_weak = this_weak.clone();
86                this.inner.borrow().tcp_client.on_close(move || {
87                    let this = this_weak.upgrade().unwrap();
88                    this.inner.borrow().rpc.as_ref().unwrap().set_ready(false);
89                    {
90                        let inner = this.inner.borrow();
91                        if let Some(on_close) = inner.on_close.as_ref() {
92                            on_close();
93                        }
94                    }
95                });
96            }
97            this.inner
98                .borrow_mut()
99                .rpc
100                .as_ref()
101                .unwrap()
102                .set_ready(true);
103
104            {
105                let this = this.inner.borrow();
106                if let Some(on_open) = &this.on_open {
107                    on_open(this.rpc.clone().unwrap());
108                }
109            }
110        });
111
112        let this_weak = Rc::downgrade(&r);
113        r.inner.borrow_mut().tcp_client.on_open_failed(move |e| {
114            let this = this_weak.upgrade().unwrap();
115            {
116                let inner = this.inner.borrow();
117                if let Some(on_open_failed) = inner.on_open_failed.as_ref() {
118                    on_open_failed(e);
119                }
120            }
121        });
122
123        r
124    }
125
126    pub fn open(&self, host: impl ToString, port: u16) {
127        self.inner.borrow_mut().tcp_client.open(host, port);
128    }
129
130    pub fn close(&self) {
131        self.inner.borrow().tcp_client.close();
132    }
133
134    pub fn set_reconnect(&self, ms: u32) {
135        self.inner.borrow_mut().tcp_client.set_reconnect(ms);
136    }
137
138    pub fn cancel_reconnect(&self) {
139        self.inner.borrow_mut().tcp_client.cancel_reconnect();
140    }
141
142    pub fn stop(&mut self) {
143        self.close();
144    }
145
146    pub fn on_open<F>(&self, callback: F)
147    where
148        F: Fn(Rc<Rpc>) + 'static,
149    {
150        self.inner.borrow_mut().on_open = Some(Box::new(callback));
151    }
152
153    pub fn on_open_failed<F>(&self, callback: F)
154    where
155        F: Fn(&dyn Error) + 'static,
156    {
157        self.inner.borrow_mut().on_open_failed = Some(Box::new(callback));
158    }
159
160    pub fn on_close<F>(&self, callback: F)
161    where
162        F: Fn() + 'static,
163    {
164        self.inner.borrow_mut().on_close = Some(Box::new(callback));
165    }
166}