rpc_core_net/
tcp_client.rs

1use std::cell::RefCell;
2use std::error::Error;
3use std::net::ToSocketAddrs;
4use std::rc::{Rc, Weak};
5
6use log::debug;
7use tokio::net::TcpStream;
8
9use crate::config::TcpConfig;
10use crate::detail::tcp_channel::TcpChannel;
11
12pub struct TcpClient {
13    host: RefCell<String>,
14    port: RefCell<u16>,
15    config: Rc<RefCell<TcpConfig>>,
16    on_open: RefCell<Option<Box<dyn Fn()>>>,
17    on_open_failed: RefCell<Option<Box<dyn Fn(&dyn Error)>>>,
18    on_close: RefCell<Option<Box<dyn Fn()>>>,
19    reconnect_ms: RefCell<u32>,
20    reconnect_timer_running: RefCell<bool>,
21    channel: Rc<TcpChannel>,
22    this: RefCell<Weak<Self>>,
23}
24
25// public
26impl TcpClient {
27    pub fn new(config: TcpConfig) -> Rc<Self> {
28        let config = Rc::new(RefCell::new(config));
29        let r = Rc::new(Self {
30            host: "".to_string().into(),
31            port: 0.into(),
32            config: config.clone(),
33            on_open: None.into(),
34            on_open_failed: None.into(),
35            on_close: None.into(),
36            reconnect_ms: 0.into(),
37            reconnect_timer_running: false.into(),
38            channel: TcpChannel::new(config),
39            this: Weak::new().into(),
40        });
41        let this_weak = Rc::downgrade(&r);
42        *r.this.borrow_mut() = this_weak.clone().into();
43
44        r.channel.on_close(move || {
45            if let Some(this) = this_weak.upgrade() {
46                if let Some(on_close) = this.on_close.take() {
47                    on_close();
48                }
49                tokio::task::spawn_local(async move {
50                    this.check_reconnect().await;
51                });
52            }
53        });
54        r
55    }
56
57    pub fn downgrade(&self) -> Weak<Self> {
58        self.this.borrow().clone()
59    }
60
61    pub fn open(&self, host: impl ToString, port: u16) {
62        *self.host.borrow_mut() = host.to_string();
63        *self.port.borrow_mut() = port;
64        self.do_open();
65    }
66
67    pub fn close(&self) {
68        self.cancel_reconnect();
69        self.channel.close();
70    }
71
72    pub fn set_reconnect(&self, ms: u32) {
73        *self.reconnect_ms.borrow_mut() = ms;
74    }
75
76    pub fn cancel_reconnect(&self) {
77        *self.reconnect_timer_running.borrow_mut() = false;
78    }
79
80    pub fn stop(&self) {
81        self.close();
82    }
83
84    pub fn on_open<F>(&self, callback: F)
85        where F: Fn() + 'static,
86    {
87        *self.on_open.borrow_mut() = Some(Box::new(callback));
88    }
89
90    pub fn on_open_failed<F>(&self, callback: F)
91        where F: Fn(&dyn Error) + 'static,
92    {
93        *self.on_open_failed.borrow_mut() = Some(Box::new(callback));
94    }
95
96    pub fn on_data<F>(&self, callback: F)
97        where F: Fn(Vec<u8>) + 'static,
98    {
99        self.channel.on_data(callback);
100    }
101
102    pub fn on_close<F>(&self, callback: F)
103        where F: Fn() + 'static,
104    {
105        *self.on_close.borrow_mut() = Some(Box::new(callback));
106    }
107
108    pub fn send(&self, data: Vec<u8>) {
109        self.channel.send(data);
110    }
111
112    pub fn send_str(&self, data: impl ToString) {
113        self.channel.send_str(data);
114    }
115}
116
117// private
118impl TcpClient {
119    fn do_open(&self) {
120        self.config.borrow_mut().init();
121        let host = self.host.borrow().clone();
122        let port = *self.port.borrow();
123
124        let this_weak = self.this.borrow().clone();
125        tokio::task::spawn_local(async move {
126            debug!("connect_tcp: {host} {port}");
127            let result = TcpClient::connect_tcp(host, port).await;
128            debug!("connect_tcp: {result:?}");
129            let this = this_weak.upgrade().unwrap();
130
131            match result {
132                Ok(stream) => {
133                    this.channel.do_open(stream);
134                    if let Some(on_open) = this.on_open.borrow_mut().as_ref() {
135                        on_open();
136                    }
137                }
138                Err(err) => {
139                    if let Some(on_open_failed) = this.on_open_failed.borrow().as_ref() {
140                        on_open_failed(&*err);
141                    }
142                    this.check_reconnect().await;
143                }
144            };
145        });
146    }
147
148    async fn connect_tcp(
149        host: String,
150        port: u16,
151    ) -> Result<TcpStream, Box<dyn Error + Send + Sync>> {
152        let mut host = host;
153        if host == "localhost" {
154            host = "127.0.0.1".parse().unwrap();
155        }
156        let addr = (host, port).to_socket_addrs()?.next().unwrap();
157        let stream = TcpStream::connect(addr).await?;
158        Ok(stream)
159    }
160
161    async fn check_reconnect(&self) {
162        if !self.channel.is_open() && !*self.reconnect_timer_running.borrow() && *self.reconnect_ms.borrow() > 0 {
163            *self.reconnect_timer_running.borrow_mut() = true;
164            tokio::time::sleep(tokio::time::Duration::from_millis((*self.reconnect_ms.borrow()).into())).await;
165            if *self.reconnect_timer_running.borrow() {
166                *self.reconnect_timer_running.borrow_mut() = false;
167            } else {
168                return;
169            }
170            if !self.channel.is_open() {
171                self.do_open();
172            }
173        }
174    }
175}
176