zbus_lib/wsocket/
component.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::mpsc::{channel, Receiver, Sender};
4use std::thread;
5use std::time::{Duration, SystemTime};
6
7use log::{debug, error, info};
8use ws::{CloseCode, Handler, Sender as WsCmd};
9
10use crate::err::{OkResult, ZbusErr};
11use crate::message::{Message, Request, Response};
12use crate::rpc::IOHandlers;
13use crate::wsocket::{Instruct, WsClientHandler};
14
15pub struct WsClient {
17 tx: Sender<Instruct>,
18 url: &'static str,
19}
20
21impl WsClient {
22 pub fn connect(url: &'static str, process: Option<IOHandlers>) -> Result<Self, ZbusErr> {
24 let (tx, rx) = channel();
25 handle_instruct(url, rx, tx.clone(), process);
26 tx.send(Instruct::Connect);
27 Ok(Self {
28 tx: tx.clone(),
29 url,
30 })
31 }
32 pub fn reconnect(&self) -> OkResult {
34 if self.is_close() {
35 self.tx.send(Instruct::Connect);
36 Ok(())
37 } else {
38 Err(ZbusErr::err("reconnect is err"))
39 }
40 }
41
42 pub fn is_close(&self) -> bool {
43 let (tx, rx) = channel();
44 self.tx.send(Instruct::IsClose(Some(tx)));
45 if let Ok(result) = rx.recv_timeout(Duration::from_secs(20)) {
46 if let Ok(()) = result {
47 return true;
48 };
49 };
50 false
51 }
52 pub fn handler(&self) -> WsClientHandler {
53 WsClientHandler {
54 tx: self.tx.clone(),
55 }
56 }
57}
58
59
60impl Drop for WsClient {
61 fn drop(&mut self) {
62self.tx.send(Instruct::Close(None));
64 self.tx.send(Instruct::Exit);
65 error!("client is over");
66 thread::sleep(Duration::from_secs(3));
67 }
68}
69
70
71fn handle_instruct(url: &'static str, rx: Receiver<Instruct>, tx: Sender<Instruct>, procoss: Option<IOHandlers>) {
72 thread::spawn(move || {
73 let mut queue: HashMap<String, (SystemTime, Response)> = HashMap::new();
74 let reconnect_num = 10;
75 let mut ws_cmd: Option<WsCmd> = None;
76 'a: loop {
77 let tx = tx.clone();
78 if let Ok(instruct) = rx.recv_timeout(Duration::from_secs(60)) {
79 match instruct {
80 Instruct::Delivery(msg, sender) => {let result: OkResult = ws_cmd.as_ref().map_or_else(|| Err(ZbusErr::closed()), |cmd| {
82 let msg = match msg {
83 Message::Request(ref req) => serde_json::to_string(req),
84 Message::Response(ref resp) => serde_json::to_string(resp),
85 };
86 msg.map_or_else(|_| Err(ZbusErr::validate("msg data err")), |json| {
87 debug!("deliver msg is {}", json);
88 cmd.send(json.clone()).map_err(|e| {
89 error!("deliver msg {} {} fail", json, e.to_string());
90 if let ws::ErrorKind::Io(_) = e.kind {
91 tx.send(Instruct::Closed);
92 ZbusErr::closed()
93 } else {
94 ZbusErr::err(e.to_string())
95 }
96 })
97 })
98 });
99 sender.map(|sender| sender.send(result));
100 }
101 Instruct::Receive(msg) => {match msg {
103 Message::Request(req) => {
104 debug!("handle rpc request");
105 match req.id() {
106 None => debug!("bad request ,no request id {}", serde_json::to_string(&req).unwrap()),
107 Some(_) => { procoss.as_ref().map(|io_handler| io_handler.handler_request(req, tx.clone())); }
108 };
109 }Message::Response(resp) => {
111 if let Some(id) = resp.id() {
112 debug!("put resp {}", id);
113 queue.insert(String::from(id), (SystemTime::now(), resp));
114 } else {
115 debug!("error resp {}", serde_json::to_string(&resp).unwrap());
116 }
117 }
118 };
119 }
120 Instruct::Connect => {for i in 0..reconnect_num {
124 let (sender, rx) = channel();
125 let tx = tx.clone();
126 thread::spawn(move || {
127 let result = ws::connect(url, |out| MessageHandle {
128 ws_client: out,
129 sender: sender.clone(),
130 tx: tx.clone(),
131 });
132 if let Err(e) = result {
133 error!("ws client is close exit reason {}", e);
134 }
135 error!("ws client is close");
136 });
137 if let Ok(ws_client) = rx.recv_timeout(Duration::from_secs(30)) {
138 ws_cmd = Some(ws_client);
139 continue 'a;
140 } else {
141 thread::sleep(Duration::from_secs(10));
142 error!("{} secs {} nums reconnect", 10, i);
143 }
144 }
145 error!("重新连接失败");
146 }
147 Instruct::Response(msg_id, sender) => {let result = queue.remove(&msg_id).map_or(Err(ZbusErr::err("未发现")), |(_, resp)| Ok(resp));
149 sender.map(|sender| sender.send(result));
150 }
151 Instruct::Close(sender) => {ws_cmd.as_ref().map(|cmd| {
153 cmd.close(CloseCode::Normal);
154 tx.send(Instruct::Closed);});
156 sender.map(|sender| sender.send(Ok(())));
157 }
158 Instruct::Closed => {ws_cmd = None;
160 }
161 Instruct::IsClose(sender) => {let result = ws_cmd.as_ref().map_or_else(|| Err(ZbusErr::closed()), |cmd| {
163 cmd.ping(vec![]).map_or_else(|e| {
164 if let ws::ErrorKind::Io(_) = e.kind {
165 tx.send(Instruct::Closed);
166 Err(ZbusErr::closed())} else {
168 Ok(())
169 }
170 }, |_| Ok(()))
171 });
172 sender.map(|sender| sender.send(result));
173 }
174 Instruct::Exit => {
175 break;
176 }
177 _ => {}
178 };
179 } else {
180 tx.send(Instruct::IsClose(None));
181 for (id, (timeout, resp)) in &queue {
182 timeout.elapsed().map(|elapsed| if elapsed.as_secs() > 30 {
183 tx.send(Instruct::Response(id.into(), None));
184 });
185 }
186 }
187 }
188 error!(" handle Instruct over,resp nums {} not get", queue.len());
189 });
190}
191
192struct MessageHandle {
193 ws_client: WsCmd,
194 tx: Sender<Instruct>,
195 sender: Sender<WsCmd>,
196}
197
198impl Handler for MessageHandle {
199 fn on_open(&mut self, shake: ws::Handshake) -> ws::Result<()> {
200 if let Some(addr) = shake.remote_addr()? {
201 debug!("Connection with {} now open", addr);
202 }
203 dbg!("ws connect");
204 self.sender.send(self.ws_client.clone());
206 Ok(())
207 }
208 fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
209 let json = msg.to_string();
210 let msg = if json.contains(r#""status":null"#) {
211 debug!("request");
212 serde_json::from_str::<Request>(&json).map(|msg| Message::Request(msg))
213 } else {
214 debug!("response");
215 serde_json::from_str::<Response>(&json).map(|msg| Message::Response(msg))
216 };
217 msg.map_or_else(|e| debug!(" recv message err is {} [{}]", e, json), |msg| { self.tx.send(Instruct::Receive(msg)); });
219
220 Ok(())
221 }
222 fn on_close(&mut self, code: CloseCode, reason: &str) {
223 dbg!("ws client is closed");
224 self.tx.send(Instruct::Closed);
225 }
227}