zbus_lib/wsocket/
component.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::mpsc::{channel, Receiver, Sender};
4use std::thread;
5use std::time::{Duration, SystemTime};
6
7use log::{debug, error, info};
8use ws::{CloseCode, Handler, Sender as WsCmd};
9
10use crate::err::{OkResult, ZbusErr};
11use crate::message::{Message, Request, Response};
12use crate::rpc::IOHandlers;
13use crate::wsocket::{Instruct, WsClientHandler};
14
15//wsClient-> wsconnect
16pub struct WsClient {
17    tx: Sender<Instruct>,
18    url: &'static str,
19}
20
21impl WsClient {
22    /// rpcServer 会用到处理request
23    pub fn connect(url: &'static str, process: Option<IOHandlers>) -> Result<Self, ZbusErr> {
24        let (tx, rx) = channel();
25        handle_instruct(url, rx, tx.clone(), process);
26        tx.send(Instruct::Connect);
27        Ok(Self {
28            tx: tx.clone(),
29            url,
30        })
31    }
32    //TODO  后面还可以再改进成,发指令再连接
33    pub fn reconnect(&self) -> OkResult {
34        if self.is_close() {
35            self.tx.send(Instruct::Connect);
36            Ok(())
37        } else {
38            Err(ZbusErr::err("reconnect is err"))
39        }
40    }
41
42    pub fn is_close(&self) -> bool {
43        let (tx, rx) = channel();
44        self.tx.send(Instruct::IsClose(Some(tx)));
45        if let Ok(result) = rx.recv_timeout(Duration::from_secs(20)) {
46            if let Ok(()) = result {
47                return true;
48            };
49        };
50        false
51    }
52    pub fn handler(&self) -> WsClientHandler {
53        WsClientHandler {
54            tx: self.tx.clone(),
55        }
56    }
57}
58
59
60impl Drop for WsClient {
61    fn drop(&mut self) {
62//在这里关闭连接
63        self.tx.send(Instruct::Close(None));
64        self.tx.send(Instruct::Exit);
65        error!("client is over");
66        thread::sleep(Duration::from_secs(3));
67    }
68}
69
70
71fn handle_instruct(url: &'static str, rx: Receiver<Instruct>, tx: Sender<Instruct>, procoss: Option<IOHandlers>) {
72    thread::spawn(move || {
73        let mut queue: HashMap<String, (SystemTime, Response)> = HashMap::new();
74        let reconnect_num = 10;
75        let mut ws_cmd: Option<WsCmd> = None;
76        'a: loop {
77            let tx = tx.clone();
78            if let Ok(instruct) = rx.recv_timeout(Duration::from_secs(60)) {
79                match instruct {
80                    Instruct::Delivery(msg, sender) => {//把response放入队列中
81                        let result: OkResult = ws_cmd.as_ref().map_or_else(|| Err(ZbusErr::closed()), |cmd| {
82                            let msg = match msg {
83                                Message::Request(ref req) => serde_json::to_string(req),
84                                Message::Response(ref resp) => serde_json::to_string(resp),
85                            };
86                            msg.map_or_else(|_| Err(ZbusErr::validate("msg data err")), |json| {
87                                debug!("deliver msg is {}", json);
88                                cmd.send(json.clone()).map_err(|e| {
89                                    error!("deliver msg  {} {} fail", json, e.to_string());
90                                    if let ws::ErrorKind::Io(_) = e.kind {
91                                        tx.send(Instruct::Closed);
92                                        ZbusErr::closed()
93                                    } else {
94                                        ZbusErr::err(e.to_string())
95                                    }
96                                })
97                            })
98                        });
99                        sender.map(|sender| sender.send(result));
100                    }
101                    Instruct::Receive(msg) => {//把response放入队列中
102                        match msg {
103                            Message::Request(req) => {
104                                debug!("handle rpc request");
105                                match req.id() {
106                                    None => debug!("bad request ,no request id {}", serde_json::to_string(&req).unwrap()),
107                                    Some(_) => { procoss.as_ref().map(|io_handler| io_handler.handler_request(req, tx.clone())); }
108                                };
109                            }// 启动rpcServer
110                            Message::Response(resp) => {
111                                if let Some(id) = resp.id() {
112                                    debug!("put resp {}", id);
113                                    queue.insert(String::from(id), (SystemTime::now(), resp));
114                                } else {
115                                    debug!("error resp {}", serde_json::to_string(&resp).unwrap());
116                                }
117                            }
118                        };
119                    }
120                    Instruct::Connect => {//连接服务器
121                        //连接成功/失败处理
122                        //循环处理次数,可10秒重连一次
123                        for i in 0..reconnect_num {
124                            let (sender, rx) = channel();
125                            let tx = tx.clone();
126                            thread::spawn(move || {
127                                let result = ws::connect(url, |out| MessageHandle {
128                                    ws_client: out,
129                                    sender: sender.clone(),
130                                    tx: tx.clone(),
131                                });
132                                if let Err(e) = result {
133                                    error!("ws client is close exit reason {}", e);
134                                }
135                                error!("ws client is close");
136                            });
137                            if let Ok(ws_client) = rx.recv_timeout(Duration::from_secs(30)) {
138                                 ws_cmd = Some(ws_client);
139                                continue 'a;
140                            } else {
141                                thread::sleep(Duration::from_secs(10));
142                                error!("{} secs {} nums reconnect", 10, i);
143                            }
144                        }
145                        error!("重新连接失败");
146                    }
147                    Instruct::Response(msg_id, sender) => {//查询response
148                        let result = queue.remove(&msg_id).map_or(Err(ZbusErr::err("未发现")), |(_, resp)| Ok(resp));
149                        sender.map(|sender| sender.send(result));
150                    }
151                    Instruct::Close(sender) => {//关闭连接
152                        ws_cmd.as_ref().map(|cmd| {
153                            cmd.close(CloseCode::Normal);
154                            tx.send(Instruct::Closed);//关闭成功失败都认为关闭成功
155                        });
156                        sender.map(|sender| sender.send(Ok(())));
157                    }
158                    Instruct::Closed => {//连接已经关闭
159                        ws_cmd = None;
160                    }
161                    Instruct::IsClose(sender) => {//查询连接是否关闭
162                        let result = ws_cmd.as_ref().map_or_else(|| Err(ZbusErr::closed()), |cmd| {
163                            cmd.ping(vec![]).map_or_else(|e| {
164                                if let ws::ErrorKind::Io(_) = e.kind {
165                                    tx.send(Instruct::Closed);
166                                    Err(ZbusErr::closed())//关闭
167                                } else {
168                                    Ok(())
169                                }
170                            }, |_| Ok(()))
171                        });
172                        sender.map(|sender| sender.send(result));
173                    }
174                    Instruct::Exit => {
175                        break;
176                    }
177                    _ => {}
178                };
179            } else {
180                tx.send(Instruct::IsClose(None));
181                for (id, (timeout, resp)) in &queue {
182                    timeout.elapsed().map(|elapsed| if elapsed.as_secs() > 30 {
183                        tx.send(Instruct::Response(id.into(), None));
184                    });
185                }
186            }
187        }
188        error!(" handle Instruct over,resp nums {} not get", queue.len());
189    });
190}
191
192struct MessageHandle {
193    ws_client: WsCmd,
194    tx: Sender<Instruct>,
195    sender: Sender<WsCmd>,
196}
197
198impl Handler for MessageHandle {
199    fn on_open(&mut self, shake: ws::Handshake) -> ws::Result<()> {
200        if let Some(addr) = shake.remote_addr()? {
201            debug!("Connection with {} now open", addr);
202        }
203        dbg!("ws connect");
204        // self.tx.send(Instruct::Connected(self.ws_client.clone()));
205        self.sender.send(self.ws_client.clone());
206        Ok(())
207    }
208    fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
209        let json = msg.to_string();
210        let msg = if json.contains(r#""status":null"#) {
211            debug!("request");
212            serde_json::from_str::<Request>(&json).map(|msg| Message::Request(msg))
213        } else {
214            debug!("response");
215            serde_json::from_str::<Response>(&json).map(|msg| Message::Response(msg))
216        };
217        // debug!("{}", json);
218        msg.map_or_else(|e| debug!(" recv message err is {} [{}]", e, json), |msg| { self.tx.send(Instruct::Receive(msg)); });
219
220        Ok(())
221    }
222    fn on_close(&mut self, code: CloseCode, reason: &str) {
223        dbg!("ws client is closed");
224        self.tx.send(Instruct::Closed);
225        //send close
226    }
227}