rpc_core/net/
tcp_client.rs

1use std::cell::RefCell;
2use std::error::Error;
3use std::net::ToSocketAddrs;
4use std::rc::{Rc, Weak};
5
6use log::debug;
7use tokio::net::TcpStream;
8
9use crate::net::config::TcpConfig;
10use crate::net::detail::tcp_channel::TcpChannel;
11
12pub struct TcpClient {
13    host: RefCell<String>,
14    port: RefCell<u16>,
15    config: Rc<RefCell<TcpConfig>>,
16    on_open: RefCell<Option<Box<dyn Fn()>>>,
17    on_open_failed: RefCell<Option<Box<dyn Fn(&dyn Error)>>>,
18    on_close: RefCell<Option<Box<dyn Fn()>>>,
19    reconnect_ms: RefCell<u32>,
20    reconnect_timer_running: RefCell<bool>,
21    channel: Rc<TcpChannel>,
22    this: RefCell<Weak<Self>>,
23}
24
25// public
26impl TcpClient {
27    pub fn new(config: TcpConfig) -> Rc<Self> {
28        let config = Rc::new(RefCell::new(config));
29        let r = Rc::new(Self {
30            host: "".to_string().into(),
31            port: 0.into(),
32            config: config.clone(),
33            on_open: None.into(),
34            on_open_failed: None.into(),
35            on_close: None.into(),
36            reconnect_ms: 0.into(),
37            reconnect_timer_running: false.into(),
38            channel: TcpChannel::new(config),
39            this: Weak::new().into(),
40        });
41        let this_weak = Rc::downgrade(&r);
42        *r.this.borrow_mut() = this_weak.clone().into();
43
44        r.channel.on_close(move || {
45            if let Some(this) = this_weak.upgrade() {
46                if let Some(on_close) = this.on_close.borrow().as_ref() {
47                    on_close();
48                }
49                tokio::task::spawn_local(async move {
50                    this.check_reconnect().await;
51                });
52            }
53        });
54        r
55    }
56
57    pub fn downgrade(&self) -> Weak<Self> {
58        self.this.borrow().clone()
59    }
60
61    pub fn open(&self, host: impl ToString, port: u16) {
62        *self.host.borrow_mut() = host.to_string();
63        *self.port.borrow_mut() = port;
64        self.do_open();
65    }
66
67    pub fn close(&self) {
68        self.cancel_reconnect();
69        self.channel.close();
70    }
71
72    pub fn set_reconnect(&self, ms: u32) {
73        *self.reconnect_ms.borrow_mut() = ms;
74    }
75
76    pub fn cancel_reconnect(&self) {
77        *self.reconnect_timer_running.borrow_mut() = false;
78    }
79
80    pub fn stop(&self) {
81        self.close();
82    }
83
84    pub fn on_open<F>(&self, callback: F)
85    where
86        F: Fn() + 'static,
87    {
88        *self.on_open.borrow_mut() = Some(Box::new(callback));
89    }
90
91    pub fn on_open_failed<F>(&self, callback: F)
92    where
93        F: Fn(&dyn Error) + 'static,
94    {
95        *self.on_open_failed.borrow_mut() = Some(Box::new(callback));
96    }
97
98    pub fn on_data<F>(&self, callback: F)
99    where
100        F: Fn(Vec<u8>) + 'static,
101    {
102        self.channel.on_data(callback);
103    }
104
105    pub fn on_close<F>(&self, callback: F)
106    where
107        F: Fn() + 'static,
108    {
109        *self.on_close.borrow_mut() = Some(Box::new(callback));
110    }
111
112    pub fn send(&self, data: Vec<u8>) {
113        self.channel.send(data);
114    }
115
116    pub fn send_str(&self, data: impl ToString) {
117        self.channel.send_str(data);
118    }
119}
120
121// private
122impl TcpClient {
123    fn do_open(&self) {
124        self.config.borrow_mut().init();
125        let host = self.host.borrow().clone();
126        let port = *self.port.borrow();
127
128        let this_weak = self.this.borrow().clone();
129        tokio::task::spawn_local(async move {
130            let this = this_weak.upgrade().unwrap();
131            if this.channel.is_open() {
132                this.channel.close();
133                this.channel.wait_close_finish().await;
134            }
135            debug!("connect_tcp: {host} {port}");
136            let result = TcpClient::connect_tcp(host, port).await;
137            debug!("connect_tcp: {result:?}");
138
139            match result {
140                Ok(stream) => {
141                    this.channel.do_open(stream);
142                    if let Some(on_open) = this.on_open.borrow_mut().as_ref() {
143                        on_open();
144                    }
145                }
146                Err(err) => {
147                    if let Some(on_open_failed) = this.on_open_failed.borrow().as_ref() {
148                        on_open_failed(&*err);
149                    }
150                    this.check_reconnect().await;
151                }
152            };
153        });
154    }
155
156    async fn connect_tcp(
157        host: String,
158        port: u16,
159    ) -> Result<TcpStream, Box<dyn Error + Send + Sync>> {
160        let mut host = host;
161        if host == "localhost" {
162            host = "127.0.0.1".parse().unwrap();
163        }
164        let addr = (host, port).to_socket_addrs()?.next().unwrap();
165        let stream = TcpStream::connect(addr).await?;
166        Ok(stream)
167    }
168
169    async fn check_reconnect(&self) {
170        if !self.channel.is_open()
171            && !*self.reconnect_timer_running.borrow()
172            && *self.reconnect_ms.borrow() > 0
173        {
174            *self.reconnect_timer_running.borrow_mut() = true;
175            tokio::time::sleep(tokio::time::Duration::from_millis(
176                (*self.reconnect_ms.borrow()).into(),
177            ))
178            .await;
179            if *self.reconnect_timer_running.borrow() {
180                *self.reconnect_timer_running.borrow_mut() = false;
181            } else {
182                return;
183            }
184            if !self.channel.is_open() {
185                self.do_open();
186            }
187        }
188    }
189}