Skip to main content

rs_pkg/network/websocket/client/
client.rs

1use super::{
2    super::{ErrorHandlerType, MessageHandlerType},
3    WebSocketClientConfig,
4};
5use crate::{
6    async_fn::wrap_fn,
7    cron::{Cron, CronConfig},
8    monitor::Monitor,
9    network::websocket::BytesGenerator,
10    worker::Worker,
11};
12use bytes::Bytes;
13use futures_util::{SinkExt, StreamExt};
14use std::sync::Arc;
15use std::{error::Error, time::Duration};
16use tokio::{
17    select,
18    sync::{
19        Mutex,
20        mpsc::{self, Receiver, Sender, error::SendError},
21    },
22};
23use tokio_tungstenite::{connect_async, tungstenite::Message};
24use tracing::{debug, error, warn};
25
26#[derive(Clone)]
27pub struct Client {
28    name: String,
29    addr: String,
30    message_handler: Arc<MessageHandlerType<Message>>,
31    error_handler: Arc<ErrorHandlerType>,
32    ping_payload: Arc<BytesGenerator>,
33    // worker: Arc<Mutex<Worker<()>>>,
34    ping_interval: String,
35
36    client_close: Arc<Sender<()>>,
37    client_done: Arc<Mutex<Receiver<()>>>,
38
39    reconnect: bool,
40    reconnect_sender: Arc<Sender<()>>,
41    reconnect_receiver: Arc<Mutex<Receiver<()>>>,
42
43    message_sender: Arc<Sender<Message>>,
44    message_receiver: Arc<Mutex<Receiver<Message>>>,
45}
46
47impl Client {
48    pub fn new(name: &str, cfg: &WebSocketClientConfig) -> Self {
49        let (sender, receiver) = tokio::sync::mpsc::channel(1000);
50        let (client_close, client_done) = tokio::sync::mpsc::channel(1);
51        let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::channel(1);
52        Self {
53            name: name.to_string(),
54            addr: cfg.addr.clone(),
55            message_handler: wrap_fn(|msg| async {
56                match msg {
57                    Message::Text(t) => debug!("Received text: {}", t),
58                    Message::Binary(b) => debug!("Received binary: {:?}", b),
59                    Message::Ping(p) => debug!("Received ping: {:?}", p),
60                    Message::Pong(p) => debug!("Received pong: {:?}", p),
61                    Message::Close(c) => debug!("Received close: {:?}", c),
62                    Message::Frame(f) => debug!("Received frame: {:?}", f),
63                }
64                None
65            }),
66            error_handler: wrap_fn(|e| async move { error!("Received error: {}", e) }),
67            ping_payload: wrap_fn(|_| async {
68                let ts = chrono::Utc::now().timestamp().to_string();
69                Bytes::from(ts)
70            }),
71
72            // worker: Arc::new(Mutex::new(Worker::new(name, 1))),
73            ping_interval: cfg.ping_interval.clone(),
74
75            reconnect: cfg.reconnect,
76            reconnect_sender: Arc::new(reconnect_sender),
77            reconnect_receiver: Arc::new(Mutex::new(reconnect_receiver)),
78
79            client_close: Arc::new(client_close),
80            client_done: Arc::new(Mutex::new(client_done)),
81
82            message_sender: Arc::new(sender),
83            message_receiver: Arc::new(Mutex::new(receiver)),
84        }
85    }
86
87    pub async fn stop(&self) -> Result<(), SendError<()>> {
88        self.client_close.send(()).await
89    }
90
91    pub fn with_message_handler<F, Fut>(mut self, h: F) -> Self
92    where
93        F: Fn(Message) -> Fut + Send + Sync + 'static,
94        Fut: Future<Output = Option<Message>> + Send + Sync + 'static,
95    {
96        self.message_handler = wrap_fn(h);
97        self
98    }
99
100    pub fn with_error_handler<F, Fut>(mut self, h: F) -> Self
101    where
102        F: Fn(Box<dyn Error + Send + Sync + 'static>) -> Fut + Send + Sync + 'static,
103        Fut: Future<Output = ()> + Send + Sync + 'static,
104    {
105        self.error_handler = wrap_fn(h);
106        self
107    }
108
109    pub fn with_ping_payload<F, Fut>(mut self, h: F) -> Self
110    where
111        F: Fn() -> Fut + Send + Sync + 'static,
112        Fut: Future<Output = Bytes> + Send + Sync + 'static,
113    {
114        let h = Arc::new(h);
115        self.ping_payload = wrap_fn(move |_| {
116            let h = h.clone();
117            async move { h().await }
118        });
119        self
120    }
121
122    async fn connect(&mut self, done: Arc<Mutex<Receiver<()>>>) {
123        let reconnect_sender = self.reconnect_sender.clone();
124
125        if let Ok((stream, _)) = connect_async(&self.addr)
126            .await
127            .inspect_err(|e| error!("[{}] connect to {} failed: {}", self.name, self.addr, e))
128        {
129            let (sink, stream) = stream.split();
130            let sink = Arc::new(Mutex::new(sink));
131            let stream = Arc::new(Mutex::new(stream));
132            let msg_handler = self.message_handler.clone();
133            let msg_receiver = self.message_receiver.clone();
134            let err_handler_ping = self.error_handler.clone();
135            let err_handler_main = self.error_handler.clone();
136
137            let mut cron_cfg = CronConfig::default();
138            cron_cfg.interval = self.ping_interval.clone();
139            cron_cfg.run_after_start = self.ping_interval.clone();
140            cron_cfg.interval_after_finish = false;
141
142            let cron = Cron::new("PING", &cron_cfg);
143            let ping_msg_sender = self.message_sender.clone();
144            cron.run(move || {
145                let msg_sender = ping_msg_sender.clone();
146                let err_handler_ping = err_handler_ping.clone();
147                let now = chrono::Utc::now().timestamp_millis().to_string();
148                let ping = Message::Ping(Bytes::from(now));
149                async move {
150                    if let Err(err) = msg_sender.send(ping).await {
151                        _ = err_handler_ping(Box::new(err)).await;
152                    }
153                }
154            })
155            .await;
156
157            // receiver
158            let receiver_msg_sender = self.message_sender.clone();
159            let receiver_reconnect_sender = reconnect_sender.clone();
160            let receiver_err_handler_main = err_handler_main.clone();
161            let (sender_close, mut sender_done) = tokio::sync::mpsc::channel(1);
162            tokio::spawn(async move {
163                let mut guard = stream.lock().await;
164                let mut done = done.lock().await;
165                let msg_sender = receiver_msg_sender.clone();
166                let reconnect_sender = receiver_reconnect_sender.clone();
167                let err_handler_main = receiver_err_handler_main.clone();
168                loop {
169                    select! {
170                        _ = done.recv() => {
171                            done.close();
172                            _ = sender_close.send(()).await;
173                            warn!("Conn Exit with done");
174                            return
175                        },
176
177                        t = guard.next() => {
178                            debug!("stream receive: {:?}", t);
179                            match t {
180                                Some(Ok(msg)) => {
181                                    if let Some(msg) = msg_handler(msg).await {
182                                        if let Err(e) = msg_sender.send(msg).await {
183                                            err_handler_main(Box::new(e)).await;
184                                            _ = reconnect_sender.send(()).await;
185                                            return
186                                        };
187                                    }
188                                },
189                                Some(Err(err)) => {
190                                    err_handler_main(Box::new(err)).await;
191                                    _ = reconnect_sender.send(()).await;
192                                    return
193                                },
194                                None => {
195                                    _ = reconnect_sender.send(()).await;
196                                    return
197                                }
198                            }
199                        }
200
201                    }
202                }
203            });
204
205            // sender
206            tokio::spawn(async move {
207                let mut msg_receiver = msg_receiver.lock().await;
208                let mut sink = sink.lock().await;
209                loop {
210                    select! {
211                        _ = sender_done.recv() => {
212                            sender_done.close();
213                            warn!("Conn Exit with done");
214                            return
215                        },
216
217                        msg = msg_receiver.recv() => {
218                            // debug!("msg_receiver receive: {:?}", msg);
219                            match msg {
220                                Some(msg) => {
221                                    if let Err(e) = sink.send(msg).await {
222                                        err_handler_main(Box::new(e)).await;
223                                        _ = reconnect_sender.send(()).await;
224                                        return
225                                    };
226                                },
227
228                                None => {
229                                    _ = reconnect_sender.send(()).await;
230                                    return
231                                },
232                            }
233                        }
234                    }
235                }
236            });
237            return;
238        }
239
240        _ = reconnect_sender.send(()).await;
241    }
242
243    pub async fn send_message(&self, msg: Message) {
244        _ = self.message_sender.send(msg).await;
245    }
246
247    pub async fn run(&self) {
248        let s = self.clone();
249        s.clone().connect(self.client_done.clone()).await;
250
251        if self.reconnect {
252            tokio::spawn(async move {
253                let s = s.clone();
254                let mut reconnect_guard = s.reconnect_receiver.lock().await;
255                let done = s.client_done.clone();
256                loop {
257                    _ = reconnect_guard.recv().await;
258                    tokio::time::sleep(Duration::from_secs(1)).await;
259                    s.clone().connect(done.clone()).await;
260                }
261            });
262        }
263    }
264}