rpc_core/net/
tcp_client.rs1use std::cell::RefCell;
2use std::error::Error;
3use std::net::ToSocketAddrs;
4use std::rc::{Rc, Weak};
5
6use log::debug;
7use tokio::net::TcpStream;
8
9use crate::net::config::TcpConfig;
10use crate::net::detail::tcp_channel::TcpChannel;
11
12pub struct TcpClient {
13 host: RefCell<String>,
14 port: RefCell<u16>,
15 config: Rc<RefCell<TcpConfig>>,
16 on_open: RefCell<Option<Box<dyn Fn()>>>,
17 on_open_failed: RefCell<Option<Box<dyn Fn(&dyn Error)>>>,
18 on_close: RefCell<Option<Box<dyn Fn()>>>,
19 reconnect_ms: RefCell<u32>,
20 reconnect_timer_running: RefCell<bool>,
21 channel: Rc<TcpChannel>,
22 this: RefCell<Weak<Self>>,
23}
24
25impl TcpClient {
27 pub fn new(config: TcpConfig) -> Rc<Self> {
28 let config = Rc::new(RefCell::new(config));
29 let r = Rc::new(Self {
30 host: "".to_string().into(),
31 port: 0.into(),
32 config: config.clone(),
33 on_open: None.into(),
34 on_open_failed: None.into(),
35 on_close: None.into(),
36 reconnect_ms: 0.into(),
37 reconnect_timer_running: false.into(),
38 channel: TcpChannel::new(config),
39 this: Weak::new().into(),
40 });
41 let this_weak = Rc::downgrade(&r);
42 *r.this.borrow_mut() = this_weak.clone().into();
43
44 r.channel.on_close(move || {
45 if let Some(this) = this_weak.upgrade() {
46 if let Some(on_close) = this.on_close.borrow().as_ref() {
47 on_close();
48 }
49 tokio::task::spawn_local(async move {
50 this.check_reconnect().await;
51 });
52 }
53 });
54 r
55 }
56
57 pub fn downgrade(&self) -> Weak<Self> {
58 self.this.borrow().clone()
59 }
60
61 pub fn open(&self, host: impl ToString, port: u16) {
62 *self.host.borrow_mut() = host.to_string();
63 *self.port.borrow_mut() = port;
64 self.do_open();
65 }
66
67 pub fn close(&self) {
68 self.cancel_reconnect();
69 self.channel.close();
70 }
71
72 pub fn set_reconnect(&self, ms: u32) {
73 *self.reconnect_ms.borrow_mut() = ms;
74 }
75
76 pub fn cancel_reconnect(&self) {
77 *self.reconnect_timer_running.borrow_mut() = false;
78 }
79
80 pub fn stop(&self) {
81 self.close();
82 }
83
84 pub fn on_open<F>(&self, callback: F)
85 where
86 F: Fn() + 'static,
87 {
88 *self.on_open.borrow_mut() = Some(Box::new(callback));
89 }
90
91 pub fn on_open_failed<F>(&self, callback: F)
92 where
93 F: Fn(&dyn Error) + 'static,
94 {
95 *self.on_open_failed.borrow_mut() = Some(Box::new(callback));
96 }
97
98 pub fn on_data<F>(&self, callback: F)
99 where
100 F: Fn(Vec<u8>) + 'static,
101 {
102 self.channel.on_data(callback);
103 }
104
105 pub fn on_close<F>(&self, callback: F)
106 where
107 F: Fn() + 'static,
108 {
109 *self.on_close.borrow_mut() = Some(Box::new(callback));
110 }
111
112 pub fn send(&self, data: Vec<u8>) {
113 self.channel.send(data);
114 }
115
116 pub fn send_str(&self, data: impl ToString) {
117 self.channel.send_str(data);
118 }
119}
120
121impl TcpClient {
123 fn do_open(&self) {
124 self.config.borrow_mut().init();
125 let host = self.host.borrow().clone();
126 let port = *self.port.borrow();
127
128 let this_weak = self.this.borrow().clone();
129 tokio::task::spawn_local(async move {
130 let this = this_weak.upgrade().unwrap();
131 if this.channel.is_open() {
132 this.channel.close();
133 this.channel.wait_close_finish().await;
134 }
135 debug!("connect_tcp: {host} {port}");
136 let result = TcpClient::connect_tcp(host, port).await;
137 debug!("connect_tcp: {result:?}");
138
139 match result {
140 Ok(stream) => {
141 this.channel.do_open(stream);
142 if let Some(on_open) = this.on_open.borrow_mut().as_ref() {
143 on_open();
144 }
145 }
146 Err(err) => {
147 if let Some(on_open_failed) = this.on_open_failed.borrow().as_ref() {
148 on_open_failed(&*err);
149 }
150 this.check_reconnect().await;
151 }
152 };
153 });
154 }
155
156 async fn connect_tcp(
157 host: String,
158 port: u16,
159 ) -> Result<TcpStream, Box<dyn Error + Send + Sync>> {
160 let mut host = host;
161 if host == "localhost" {
162 host = "127.0.0.1".parse().unwrap();
163 }
164 let addr = (host, port).to_socket_addrs()?.next().unwrap();
165 let stream = TcpStream::connect(addr).await?;
166 Ok(stream)
167 }
168
169 async fn check_reconnect(&self) {
170 if !self.channel.is_open()
171 && !*self.reconnect_timer_running.borrow()
172 && *self.reconnect_ms.borrow() > 0
173 {
174 *self.reconnect_timer_running.borrow_mut() = true;
175 tokio::time::sleep(tokio::time::Duration::from_millis(
176 (*self.reconnect_ms.borrow()).into(),
177 ))
178 .await;
179 if *self.reconnect_timer_running.borrow() {
180 *self.reconnect_timer_running.borrow_mut() = false;
181 } else {
182 return;
183 }
184 if !self.channel.is_open() {
185 self.do_open();
186 }
187 }
188 }
189}