rs_pkg/network/websocket/client/
client.rs1use super::{
2 super::{ErrorHandlerType, MessageHandlerType},
3 WebSocketClientConfig,
4};
5use crate::{
6 async_fn::wrap_fn,
7 cron::{Cron, CronConfig},
8 monitor::Monitor,
9 network::websocket::BytesGenerator,
10 worker::Worker,
11};
12use bytes::Bytes;
13use futures_util::{SinkExt, StreamExt};
14use std::sync::Arc;
15use std::{error::Error, time::Duration};
16use tokio::{
17 select,
18 sync::{
19 Mutex,
20 mpsc::{self, Receiver, Sender, error::SendError},
21 },
22};
23use tokio_tungstenite::{connect_async, tungstenite::Message};
24use tracing::{debug, error, warn};
25
26#[derive(Clone)]
27pub struct Client {
28 name: String,
29 addr: String,
30 message_handler: Arc<MessageHandlerType<Message>>,
31 error_handler: Arc<ErrorHandlerType>,
32 ping_payload: Arc<BytesGenerator>,
33 ping_interval: String,
35
36 client_close: Arc<Sender<()>>,
37 client_done: Arc<Mutex<Receiver<()>>>,
38
39 reconnect: bool,
40 reconnect_sender: Arc<Sender<()>>,
41 reconnect_receiver: Arc<Mutex<Receiver<()>>>,
42
43 message_sender: Arc<Sender<Message>>,
44 message_receiver: Arc<Mutex<Receiver<Message>>>,
45}
46
47impl Client {
48 pub fn new(name: &str, cfg: &WebSocketClientConfig) -> Self {
49 let (sender, receiver) = tokio::sync::mpsc::channel(1000);
50 let (client_close, client_done) = tokio::sync::mpsc::channel(1);
51 let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::channel(1);
52 Self {
53 name: name.to_string(),
54 addr: cfg.addr.clone(),
55 message_handler: wrap_fn(|msg| async {
56 match msg {
57 Message::Text(t) => debug!("Received text: {}", t),
58 Message::Binary(b) => debug!("Received binary: {:?}", b),
59 Message::Ping(p) => debug!("Received ping: {:?}", p),
60 Message::Pong(p) => debug!("Received pong: {:?}", p),
61 Message::Close(c) => debug!("Received close: {:?}", c),
62 Message::Frame(f) => debug!("Received frame: {:?}", f),
63 }
64 None
65 }),
66 error_handler: wrap_fn(|e| async move { error!("Received error: {}", e) }),
67 ping_payload: wrap_fn(|_| async {
68 let ts = chrono::Utc::now().timestamp().to_string();
69 Bytes::from(ts)
70 }),
71
72 ping_interval: cfg.ping_interval.clone(),
74
75 reconnect: cfg.reconnect,
76 reconnect_sender: Arc::new(reconnect_sender),
77 reconnect_receiver: Arc::new(Mutex::new(reconnect_receiver)),
78
79 client_close: Arc::new(client_close),
80 client_done: Arc::new(Mutex::new(client_done)),
81
82 message_sender: Arc::new(sender),
83 message_receiver: Arc::new(Mutex::new(receiver)),
84 }
85 }
86
87 pub async fn stop(&self) -> Result<(), SendError<()>> {
88 self.client_close.send(()).await
89 }
90
91 pub fn with_message_handler<F, Fut>(mut self, h: F) -> Self
92 where
93 F: Fn(Message) -> Fut + Send + Sync + 'static,
94 Fut: Future<Output = Option<Message>> + Send + Sync + 'static,
95 {
96 self.message_handler = wrap_fn(h);
97 self
98 }
99
100 pub fn with_error_handler<F, Fut>(mut self, h: F) -> Self
101 where
102 F: Fn(Box<dyn Error + Send + Sync + 'static>) -> Fut + Send + Sync + 'static,
103 Fut: Future<Output = ()> + Send + Sync + 'static,
104 {
105 self.error_handler = wrap_fn(h);
106 self
107 }
108
109 pub fn with_ping_payload<F, Fut>(mut self, h: F) -> Self
110 where
111 F: Fn() -> Fut + Send + Sync + 'static,
112 Fut: Future<Output = Bytes> + Send + Sync + 'static,
113 {
114 let h = Arc::new(h);
115 self.ping_payload = wrap_fn(move |_| {
116 let h = h.clone();
117 async move { h().await }
118 });
119 self
120 }
121
122 async fn connect(&mut self, done: Arc<Mutex<Receiver<()>>>) {
123 let reconnect_sender = self.reconnect_sender.clone();
124
125 if let Ok((stream, _)) = connect_async(&self.addr)
126 .await
127 .inspect_err(|e| error!("[{}] connect to {} failed: {}", self.name, self.addr, e))
128 {
129 let (sink, stream) = stream.split();
130 let sink = Arc::new(Mutex::new(sink));
131 let stream = Arc::new(Mutex::new(stream));
132 let msg_handler = self.message_handler.clone();
133 let msg_receiver = self.message_receiver.clone();
134 let err_handler_ping = self.error_handler.clone();
135 let err_handler_main = self.error_handler.clone();
136
137 let mut cron_cfg = CronConfig::default();
138 cron_cfg.interval = self.ping_interval.clone();
139 cron_cfg.run_after_start = self.ping_interval.clone();
140 cron_cfg.interval_after_finish = false;
141
142 let cron = Cron::new("PING", &cron_cfg);
143 let ping_msg_sender = self.message_sender.clone();
144 cron.run(move || {
145 let msg_sender = ping_msg_sender.clone();
146 let err_handler_ping = err_handler_ping.clone();
147 let now = chrono::Utc::now().timestamp_millis().to_string();
148 let ping = Message::Ping(Bytes::from(now));
149 async move {
150 if let Err(err) = msg_sender.send(ping).await {
151 _ = err_handler_ping(Box::new(err)).await;
152 }
153 }
154 })
155 .await;
156
157 let receiver_msg_sender = self.message_sender.clone();
159 let receiver_reconnect_sender = reconnect_sender.clone();
160 let receiver_err_handler_main = err_handler_main.clone();
161 let (sender_close, mut sender_done) = tokio::sync::mpsc::channel(1);
162 tokio::spawn(async move {
163 let mut guard = stream.lock().await;
164 let mut done = done.lock().await;
165 let msg_sender = receiver_msg_sender.clone();
166 let reconnect_sender = receiver_reconnect_sender.clone();
167 let err_handler_main = receiver_err_handler_main.clone();
168 loop {
169 select! {
170 _ = done.recv() => {
171 done.close();
172 _ = sender_close.send(()).await;
173 warn!("Conn Exit with done");
174 return
175 },
176
177 t = guard.next() => {
178 debug!("stream receive: {:?}", t);
179 match t {
180 Some(Ok(msg)) => {
181 if let Some(msg) = msg_handler(msg).await {
182 if let Err(e) = msg_sender.send(msg).await {
183 err_handler_main(Box::new(e)).await;
184 _ = reconnect_sender.send(()).await;
185 return
186 };
187 }
188 },
189 Some(Err(err)) => {
190 err_handler_main(Box::new(err)).await;
191 _ = reconnect_sender.send(()).await;
192 return
193 },
194 None => {
195 _ = reconnect_sender.send(()).await;
196 return
197 }
198 }
199 }
200
201 }
202 }
203 });
204
205 tokio::spawn(async move {
207 let mut msg_receiver = msg_receiver.lock().await;
208 let mut sink = sink.lock().await;
209 loop {
210 select! {
211 _ = sender_done.recv() => {
212 sender_done.close();
213 warn!("Conn Exit with done");
214 return
215 },
216
217 msg = msg_receiver.recv() => {
218 match msg {
220 Some(msg) => {
221 if let Err(e) = sink.send(msg).await {
222 err_handler_main(Box::new(e)).await;
223 _ = reconnect_sender.send(()).await;
224 return
225 };
226 },
227
228 None => {
229 _ = reconnect_sender.send(()).await;
230 return
231 },
232 }
233 }
234 }
235 }
236 });
237 return;
238 }
239
240 _ = reconnect_sender.send(()).await;
241 }
242
243 pub async fn send_message(&self, msg: Message) {
244 _ = self.message_sender.send(msg).await;
245 }
246
247 pub async fn run(&self) {
248 let s = self.clone();
249 s.clone().connect(self.client_done.clone()).await;
250
251 if self.reconnect {
252 tokio::spawn(async move {
253 let s = s.clone();
254 let mut reconnect_guard = s.reconnect_receiver.lock().await;
255 let done = s.client_done.clone();
256 loop {
257 _ = reconnect_guard.recv().await;
258 tokio::time::sleep(Duration::from_secs(1)).await;
259 s.clone().connect(done.clone()).await;
260 }
261 });
262 }
263 }
264}