rpc_core_net/
tcp_client.rs1use std::cell::RefCell;
2use std::error::Error;
3use std::net::ToSocketAddrs;
4use std::rc::{Rc, Weak};
5
6use log::debug;
7use tokio::net::TcpStream;
8
9use crate::config::TcpConfig;
10use crate::detail::tcp_channel::TcpChannel;
11
12pub struct TcpClient {
13 host: RefCell<String>,
14 port: RefCell<u16>,
15 config: Rc<RefCell<TcpConfig>>,
16 on_open: RefCell<Option<Box<dyn Fn()>>>,
17 on_open_failed: RefCell<Option<Box<dyn Fn(&dyn Error)>>>,
18 on_close: RefCell<Option<Box<dyn Fn()>>>,
19 reconnect_ms: RefCell<u32>,
20 reconnect_timer_running: RefCell<bool>,
21 channel: Rc<TcpChannel>,
22 this: RefCell<Weak<Self>>,
23}
24
25impl TcpClient {
27 pub fn new(config: TcpConfig) -> Rc<Self> {
28 let config = Rc::new(RefCell::new(config));
29 let r = Rc::new(Self {
30 host: "".to_string().into(),
31 port: 0.into(),
32 config: config.clone(),
33 on_open: None.into(),
34 on_open_failed: None.into(),
35 on_close: None.into(),
36 reconnect_ms: 0.into(),
37 reconnect_timer_running: false.into(),
38 channel: TcpChannel::new(config),
39 this: Weak::new().into(),
40 });
41 let this_weak = Rc::downgrade(&r);
42 *r.this.borrow_mut() = this_weak.clone().into();
43
44 r.channel.on_close(move || {
45 if let Some(this) = this_weak.upgrade() {
46 if let Some(on_close) = this.on_close.take() {
47 on_close();
48 }
49 tokio::task::spawn_local(async move {
50 this.check_reconnect().await;
51 });
52 }
53 });
54 r
55 }
56
57 pub fn downgrade(&self) -> Weak<Self> {
58 self.this.borrow().clone()
59 }
60
61 pub fn open(&self, host: impl ToString, port: u16) {
62 *self.host.borrow_mut() = host.to_string();
63 *self.port.borrow_mut() = port;
64 self.do_open();
65 }
66
67 pub fn close(&self) {
68 self.cancel_reconnect();
69 self.channel.close();
70 }
71
72 pub fn set_reconnect(&self, ms: u32) {
73 *self.reconnect_ms.borrow_mut() = ms;
74 }
75
76 pub fn cancel_reconnect(&self) {
77 *self.reconnect_timer_running.borrow_mut() = false;
78 }
79
80 pub fn stop(&self) {
81 self.close();
82 }
83
84 pub fn on_open<F>(&self, callback: F)
85 where F: Fn() + 'static,
86 {
87 *self.on_open.borrow_mut() = Some(Box::new(callback));
88 }
89
90 pub fn on_open_failed<F>(&self, callback: F)
91 where F: Fn(&dyn Error) + 'static,
92 {
93 *self.on_open_failed.borrow_mut() = Some(Box::new(callback));
94 }
95
96 pub fn on_data<F>(&self, callback: F)
97 where F: Fn(Vec<u8>) + 'static,
98 {
99 self.channel.on_data(callback);
100 }
101
102 pub fn on_close<F>(&self, callback: F)
103 where F: Fn() + 'static,
104 {
105 *self.on_close.borrow_mut() = Some(Box::new(callback));
106 }
107
108 pub fn send(&self, data: Vec<u8>) {
109 self.channel.send(data);
110 }
111
112 pub fn send_str(&self, data: impl ToString) {
113 self.channel.send_str(data);
114 }
115}
116
117impl TcpClient {
119 fn do_open(&self) {
120 self.config.borrow_mut().init();
121 let host = self.host.borrow().clone();
122 let port = *self.port.borrow();
123
124 let this_weak = self.this.borrow().clone();
125 tokio::task::spawn_local(async move {
126 debug!("connect_tcp: {host} {port}");
127 let result = TcpClient::connect_tcp(host, port).await;
128 debug!("connect_tcp: {result:?}");
129 let this = this_weak.upgrade().unwrap();
130
131 match result {
132 Ok(stream) => {
133 this.channel.do_open(stream);
134 if let Some(on_open) = this.on_open.borrow_mut().as_ref() {
135 on_open();
136 }
137 }
138 Err(err) => {
139 if let Some(on_open_failed) = this.on_open_failed.borrow().as_ref() {
140 on_open_failed(&*err);
141 }
142 this.check_reconnect().await;
143 }
144 };
145 });
146 }
147
148 async fn connect_tcp(
149 host: String,
150 port: u16,
151 ) -> Result<TcpStream, Box<dyn Error + Send + Sync>> {
152 let mut host = host;
153 if host == "localhost" {
154 host = "127.0.0.1".parse().unwrap();
155 }
156 let addr = (host, port).to_socket_addrs()?.next().unwrap();
157 let stream = TcpStream::connect(addr).await?;
158 Ok(stream)
159 }
160
161 async fn check_reconnect(&self) {
162 if !self.channel.is_open() && !*self.reconnect_timer_running.borrow() && *self.reconnect_ms.borrow() > 0 {
163 *self.reconnect_timer_running.borrow_mut() = true;
164 tokio::time::sleep(tokio::time::Duration::from_millis((*self.reconnect_ms.borrow()).into())).await;
165 if *self.reconnect_timer_running.borrow() {
166 *self.reconnect_timer_running.borrow_mut() = false;
167 } else {
168 return;
169 }
170 if !self.channel.is_open() {
171 self.do_open();
172 }
173 }
174 }
175}
176