chimes_utils/utils/
actix_client.rs

1extern crate tls_rustls as rustls;
2
3use actix_web::{
4    http::header::{self, HeaderMap, HeaderName},
5    http::Method,
6    web::Bytes,
7};
8use awc::{
9    error::HeaderValue,
10    ws::{self, Message},
11    Client as HttpClient, Connector,
12};
13
14use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
15use serde::Serialize;
16
17use crate::{error, ChimesError, ChimesResult as Result};
18use actix_tls::connect::rustls::webpki_roots_cert_store;
19use encoding_rs::{Encoding, UTF_8};
20use mime::Mime;
21use rustls::ClientConfig;
22use std::borrow::Cow;
23use std::{
24    sync::{Arc, Mutex},
25    time::Duration,
26};
27
28// use rustls_pemfile::{certs, pkcs8_private_keys};
29
30pub const DEFAULT_USER_AGENT: &str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3534.4 Safari/537.36";
31
32mod danger {
33    // use awc::*;
34    extern crate tls_rustls as rustls;
35    use rustls::{
36        client::{ServerCertVerified, ServerCertVerifier},
37        Certificate, ServerName,
38    };
39    use std::time::SystemTime;
40    pub struct NoCertificateVerification;
41
42    impl ServerCertVerifier for NoCertificateVerification {
43        fn verify_server_cert(
44            &self,
45            _end_entity: &Certificate,
46            _intermediates: &[Certificate],
47            _server_name: &ServerName,
48            _scts: &mut dyn Iterator<Item = &[u8]>,
49            _ocsp_response: &[u8],
50            _now: SystemTime,
51        ) -> Result<ServerCertVerified, rustls::Error> {
52            Ok(ServerCertVerified::assertion())
53        }
54    }
55}
56
57/// 请求客户端
58#[derive(Clone)]
59pub struct ChimesClient {
60    pub(crate) client: HttpClient,
61    charset: String,
62    headers: header::HeaderMap,
63}
64
65// pub trait ToResult {
66//     fn result(&self) -> Result<ResponseResult>;
67// }
68
69// impl tube_error::Result<String, tube_error::Error> for ToResult
70// {
71//      fn result(&self) -> Result<ResponseResult> {
72//         match serde_json::from_str(self) {
73//             Ok(rs) => Ok(rs),
74//             Err(err) => Err(error! {
75//                 code: -1,
76//                 msg: format!("error: {}", err)
77//             }),
78//         }
79//     }
80// }
81
82/// 将结果转换为字符串
83pub(crate) fn text_with_charset(
84    headers: &header::HeaderMap,
85    default_encoding: &str,
86    bs: Bytes,
87) -> Result<String> {
88    let content_type = headers
89        .get(header::CONTENT_TYPE)
90        .and_then(|value| value.to_str().ok())
91        .and_then(|value| value.parse::<Mime>().ok());
92    let encoding_name = content_type
93        .as_ref()
94        .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
95        .unwrap_or(default_encoding);
96    let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
97
98    let (text, _, _) = encoding.decode(&bs);
99    if let Cow::Owned(s) = text {
100        return Ok(s);
101    }
102    unsafe {
103        // decoding returned Cow::Borrowed, meaning these bytes
104        // are already valid utf8
105        Ok(String::from_utf8_unchecked(bs.to_vec()))
106    }
107}
108
109impl Default for ChimesClient {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl ChimesClient {
116    pub fn new() -> Self {
117        Self::new_timeout(60u64)
118    }
119    /// 创建一个新的连接客户端
120    ///
121    pub fn new_timeout(tm: u64) -> Self {
122        // disable ssl verification
123        let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
124        builder.set_verify(SslVerifyMode::NONE);
125        let _ = builder
126            .set_alpn_protos(b"\x02h2\x08http/1.1")
127            .map_err(|e| log::info!("Can not set alpn protocol: {:?}", e));
128
129        let connector = Connector::new()
130            .timeout(Duration::from_secs(tm))
131            .handshake_timeout(Duration::from_secs(30))
132            .openssl(builder.build());
133
134        let client = HttpClient::builder()
135            .connector(connector)
136            .timeout(Duration::from_secs(tm))
137            .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
138            // .header(header::AUTHORIZATION, token)
139            // .header(header::REFERER, "http://localhost")
140            // .initial_window_size(100)
141            // .initial_connection_window_size(100)
142            .finish();
143
144        ChimesClient {
145            client,
146            charset: "utf-8".to_owned(),
147            headers: header::HeaderMap::new(),
148        }
149    }
150
151    /**
152     * 使用rust_tls进行HTTPS请求的客户端
153     */
154    pub fn new_tls() -> Self {
155        // disable ssl verification
156        let mut config = ClientConfig::builder()
157            .with_safe_defaults()
158            .with_root_certificates(webpki_roots_cert_store())
159            .with_no_client_auth();
160
161        let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
162        config.alpn_protocols = protos;
163
164        // disable TLS verification
165        config
166            .dangerous()
167            .set_certificate_verifier(Arc::new(danger::NoCertificateVerification));
168
169        let client = awc::Client::builder()
170            .connector(
171                awc::Connector::new()
172                    .rustls(Arc::new(config))
173                    .handshake_timeout(Duration::from_secs(15))
174                    .timeout(Duration::from_secs(15)),
175            )
176            .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
177            .timeout(Duration::from_secs(30))
178            .finish();
179
180        ChimesClient {
181            client,
182            charset: "utf-8".to_owned(),
183            headers: header::HeaderMap::new(),
184        }
185    }
186
187    /// 带证书的远程请求客户端
188    pub fn new_ssl(private_key: &str, certificate: &str) -> Self {
189        // disable ssl verification
190        let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
191        builder.set_verify(SslVerifyMode::NONE);
192        let _ = builder
193            .set_alpn_protos(b"\x02h2\x08http/1.1")
194            .map_err(|e| log::info!("Can not set alpn protocol: {:?}", e));
195
196        let _ = builder
197            .set_private_key_file(private_key, SslFiletype::PEM)
198            .map_err(|e| log::info!("apiclient_key.pem not find: {:?}", e));
199        let _ = builder
200            .set_certificate_chain_file(certificate)
201            .map_err(|e| log::info!("apiclient_cert.pem not find: {:?}", e));
202
203        let connector = Connector::new()
204            .timeout(Duration::from_secs(5))
205            .openssl(builder.build());
206
207        let client = HttpClient::builder()
208            .connector(connector)
209            .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
210            .finish();
211
212        ChimesClient {
213            client,
214            charset: "utf-8".to_owned(),
215            headers: header::HeaderMap::new(),
216        }
217    }
218
219    /// 设置获取数据的编码方式
220    pub fn set_charset(mut self, charset: &str) -> Self {
221        self.charset = charset.to_owned();
222        self
223    }
224
225    /// get方式获取站点内容
226    pub async fn get(self, url: &str) -> Result<String> {
227        let mut build = self.client.get(url);
228        for (head_name, head_value) in self.headers {
229            build = build.insert_header((head_name, head_value));
230        }
231        match build.send().await {
232            Ok(mut res) => {
233                // log!("{:?}", res);
234                if res.status().is_success() {
235                    // match res.json() {
236                    //     Ok(res) => {},
237                    //     Err(err) => {}
238                    // }
239                    match res.body().await {
240                        Ok(bs) => {
241                            let s = text_with_charset(res.headers(), &self.charset, bs);
242                            // println!("{:?}", s);
243                            s
244                        }
245                        Err(err) => Err(error! {
246                            code: -1,
247                            msg: format!("error: {}", err)
248                        }),
249                    }
250                } else {
251                    Err(error! {
252                        code: 500,
253                        msg: format!("status={}", res.status())
254                    })
255                }
256            }
257            Err(e) => {
258                log::info!("=== request error === {:?}", e);
259                Err(error! {
260                    code: 500,
261                    msg: format!("Send request error: {}", e)
262                })
263            }
264        }
265    }
266    /// 返回bytes
267    pub async fn get_bytes(self, url: &str) -> Result<Vec<u8>> {
268        let mut build = self.client.get(url);
269        for (head_name, head_value) in self.headers {
270            build = build.insert_header((head_name, head_value));
271        }
272        match build.send().await {
273            Ok(mut res) => {
274                if res.status().is_success() {
275                    match res.body().await {
276                        Ok(bs) => Ok(bs[..].to_vec()),
277                        Err(err) => Err(error! {
278                            code: -1,
279                            msg: format!("error: {}", err)
280                        }),
281                    }
282                } else {
283                    Err(error! {
284                        code: 500,
285                        msg: format!("status={}", res.status())
286                    })
287                }
288            }
289            Err(e) => {
290                log::info!("=== request error === {:?}", e);
291                Err(error! {
292                    code: 500,
293                    msg: format!("Send request error: {}", e)
294                })
295            }
296        }
297    }
298    /// post方式提交数据
299    /// url:
300    /// param:
301    pub async fn post<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
302        self.request(Method::POST, url, params).await
303    }
304
305    /// 请求put方式
306    pub async fn put<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
307        self.request(Method::PUT, url, params).await
308    }
309
310    /// 请求删除方式
311    pub async fn delete<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
312        self.request(Method::DELETE, url, params).await
313    }
314
315    // /// 解析并转换为tube_value::Value
316    // pub fn parse_value(&self, text: &str) -> Result<Value> {
317
318    //     let val: serde_json::Value = match serde_json::from_str(post_str) {
319    //         Ok(val) => val,
320    //         Err(err) => Err(error! {
321    //             code: -1,
322    //             msg: format!("error: {}", err)
323    //         }),
324    //     };
325
326    //     Ok(val.to_value())
327
328    // }
329
330    /// 请求
331    pub async fn request_betyes<T: Serialize>(
332        self,
333        method_str: &str,
334        url: &str,
335        params: &T,
336    ) -> Result<Vec<u8>> {
337        // log!("params === {:?}", params);
338        let method = match Method::from_bytes(method_str.as_bytes()) {
339            Ok(s) => s,
340            Err(_e) => Method::POST,
341        };
342        let mut build = self.client.request(method, url);
343        for (head_name, head_value) in self.headers {
344            build = build.insert_header((head_name, head_value));
345        }
346        match build.send_json(params).await {
347            Ok(mut res) => {
348                // log!("response: {:?}", res);
349                if res.status().is_success() {
350                    match res.body().await {
351                        Ok(bs) => Ok(bs.to_vec()),
352                        Err(err) => Err(error! {
353                            code: -1,
354                            msg: format!("error: {}", err)
355                        }),
356                    }
357                } else {
358                    Err(error! {
359                        code: 500,
360                        msg: format!("status={}", res.status())
361                    })
362                }
363            }
364            Err(e) => Err(error! {
365                code: 500,
366                msg: format!("Send request error: {}", e)
367            }),
368        }
369    }
370
371    /// 请求
372    pub async fn request<T: Serialize>(
373        self,
374        method: Method,
375        url: &str,
376        params: &T,
377    ) -> Result<String> {
378        let mut build = self.client.request(method, url);
379        for (head_name, head_value) in self.headers {
380            build = build.insert_header((head_name, head_value));
381        }
382        match build.send_json(params).await {
383            Ok(mut res) => {
384                // log!("response: {:?}", res);
385                if res.status().is_success() {
386                    match res.body().await {
387                        Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
388                            Ok(s) => {
389                                // println!(" === {:?}", s);
390                                // match serde_json::from_str(&s) {
391                                //     Ok(rs) => Ok(rs),
392                                //     Err(err) => Err(error! {
393                                //         code: -1,
394                                //         msg: format!("error: {}", err)
395                                //     }),
396                                // }
397                                Ok(s)
398                            }
399                            Err(err) => Err(err),
400                        },
401                        Err(err) => Err(error! {
402                            code: -1,
403                            msg: format!("error: {}", err)
404                        }),
405                    }
406                } else {
407                    Err(error! {
408                        code: 500,
409                        msg: format!("status={}", res.status())
410                    })
411                }
412            }
413            Err(e) => Err(error! {
414                code: 500,
415                msg: format!("Send request error: {}", e)
416            }),
417        }
418    }
419
420    /// 请求
421    pub async fn request_form_with_response(
422        self,
423        method: Method,
424        url: &str,
425        params: &String,
426    ) -> Result<(String, HeaderMap)> {
427        let mut build = self.client.request(method, url);
428        for (head_name, head_value) in self.headers {
429            build = build.insert_header((head_name, head_value));
430        }
431        match build.send_body(params.to_owned()).await {
432            Ok(mut res) => {
433                // log!("response: {:?}", res);
434                if res.status().is_success() {
435                    match res.body().await {
436                        Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
437                            Ok(s) => {
438                                // println!(" === {:?}", s);
439                                // match serde_json::from_str(&s) {
440                                //     Ok(rs) => Ok(rs),
441                                //     Err(err) => Err(error! {
442                                //         code: -1,
443                                //         msg: format!("error: {}", err)
444                                //     }),
445                                // }
446                                Ok((s, res.headers().to_owned()))
447                            }
448                            Err(err) => Err(err),
449                        },
450                        Err(err) => Err(error! {
451                            code: -1,
452                            msg: format!("error: {}", err)
453                        }),
454                    }
455                } else {
456                    let resp = match res.body().await {
457                        Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
458                            Ok(s) => s,
459                            Err(err) => err.to_string(),
460                        },
461                        Err(err) => err.to_string(),
462                    };
463                    log::info!("Stop {}", resp.clone());
464                    Err(error! {
465                        code: 500,
466                        msg: format!("status={}, message={}", res.status(), resp)
467                    })
468                }
469            }
470            Err(e) => Err(error! {
471                code: 500,
472                msg: format!("Send request error: {}", e)
473            }),
474        }
475    }
476
477    /// 请求
478    pub async fn request_with_response<T: Serialize>(
479        self,
480        method: Method,
481        url: &str,
482        params: &T,
483    ) -> Result<(String, HeaderMap)> {
484        let mut build = self.client.request(method, url);
485        for (head_name, head_value) in self.headers {
486            build = build.insert_header((head_name, head_value));
487        }
488        match build.send_json(params).await {
489            Ok(mut res) => {
490                // log!("response: {:?}", res);
491                if res.status().is_success() {
492                    match res.body().await {
493                        Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
494                            Ok(s) => {
495                                // println!(" === {:?}", s);
496                                // match serde_json::from_str(&s) {
497                                //     Ok(rs) => Ok(rs),
498                                //     Err(err) => Err(error! {
499                                //         code: -1,
500                                //         msg: format!("error: {}", err)
501                                //     }),
502                                // }
503                                Ok((s, res.headers().to_owned()))
504                            }
505                            Err(err) => Err(err),
506                        },
507                        Err(err) => Err(error! {
508                            code: -1,
509                            msg: format!("error: {}", err)
510                        }),
511                    }
512                } else {
513                    let resp = match res.body().await {
514                        Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
515                            Ok(s) => s,
516                            Err(err) => err.to_string(),
517                        },
518                        Err(err) => err.to_string(),
519                    };
520                    log::info!("Stop {}", resp.clone());
521                    Err(error! {
522                        code: 500,
523                        msg: format!("status={}, message={}", res.status(), resp)
524                    })
525                }
526            }
527            Err(e) => Err(error! {
528                code: 500,
529                msg: format!("Send request error: {}", e)
530            }),
531        }
532    }
533
534    pub fn insert_header(&mut self, key: &str, value: &str) -> &mut Self {
535        let hn = match HeaderName::from_lowercase(key.to_lowercase().as_bytes()) {
536            Ok(v) => Some(v),
537            Err(_) => None,
538        };
539
540        let hv = match HeaderValue::from_str(value) {
541            Ok(v) => Some(v),
542            Err(_) => None,
543        };
544
545        if let Some(hn) = hn {
546            if let Some(hv) = hv {
547                self.headers.insert(hn, hv);
548            }
549        }
550
551        self
552    }
553
554    pub fn put_header(&mut self, key: &str, value: &str) {
555        let hn = match HeaderName::from_lowercase(key.to_lowercase().as_bytes()) {
556            Ok(v) => Some(v),
557            Err(_) => None,
558        };
559
560        let hv = match HeaderValue::from_str(value) {
561            Ok(v) => Some(v),
562            Err(_) => None,
563        };
564
565        if let Some(hn) = hn {
566            if let Some(hv) = hv {
567                self.headers.insert(hn, hv);
568            }
569        }
570    }
571    /// 发送二进制文件
572    pub async fn post_betyes(self, url: &str, body: Bytes) -> Result<String> {
573        let mut build = self.client.post(url);
574        for (head_name, head_value) in self.headers {
575            build = build.insert_header((head_name, head_value));
576        }
577        match build.send_body(body).await {
578            Ok(mut res) => {
579                if res.status().is_success() {
580                    match res.body().await {
581                        Ok(bs) => text_with_charset(res.headers(), &self.charset, bs),
582                        Err(err) => Err(error! {
583                            code: -1,
584                            msg: format!("error: {}", err)
585                        }),
586                    }
587                } else {
588                    Err(error! {
589                        code: 500,
590                        msg: format!("status={}", res.status())
591                    })
592                }
593            }
594            Err(e) => Err(error! {
595                code: 500,
596                msg: format!("Send request error: {}", e)
597            }),
598        }
599    }
600}
601
602use futures_util::{SinkExt as _, StreamExt as _};
603use std::sync::Condvar;
604
605pub struct ChimesWebSocketClient {
606    websocket_url: String,
607    send_queue: Vec<String>,
608    recv_queue: Vec<String>,
609    looping: bool,
610    mutex_send: Mutex<bool>,
611    cond_send: Condvar,
612}
613
614impl ChimesWebSocketClient {
615    pub fn new_websocket(url: &str) -> Result<Self> {
616        Ok(Self {
617            websocket_url: url.to_string(),
618            send_queue: vec![],
619            recv_queue: vec![],
620            looping: false,
621            mutex_send: Mutex::new(false),
622            cond_send: Condvar::new(),
623        })
624    }
625
626    pub fn send(&mut self, body: &str) -> Result<()> {
627        self.send_queue.push(body.to_string());
628        Ok(())
629    }
630
631    pub fn recv(&self) -> &Vec<String> {
632        &self.recv_queue
633    }
634
635    pub fn stop(mut self) {
636        self.looping = false;
637        let lock = self.mutex_send.lock().unwrap();
638        self.cond_send.notify_all();
639        drop(lock);
640    }
641
642    pub async fn start(&mut self) -> Result<()> {
643        self.looping = true;
644        while self.looping {
645            let cli = awc::Client::new().ws(self.websocket_url.as_str());
646            match cli.connect().await {
647                Ok((_cl, mut conn)) => {
648                    let resp = conn.next().await;
649                    if resp.is_none() {
650                        // Received None, it means the connection was lost.
651                        let lock = self.mutex_send.lock().expect("Can not lock");
652                        match self.cond_send.wait_timeout(lock, Duration::from_secs(1)) {
653                            Ok((_l, result)) => {
654                                if result.timed_out() {
655                                    break;
656                                }
657                            }
658                            Err(err) => {
659                                log::info!(
660                                    "Condvar cond_send wait for a timeout with error. {}",
661                                    err
662                                );
663                            }
664                        }
665
666                        let mut top_msg = self.send_queue.pop();
667                        while top_msg.is_some() {
668                            let msg = top_msg.unwrap();
669                            match conn.send(ws::Message::Text(msg.clone().into())).await {
670                                Ok(_) => {}
671                                Err(err) => {
672                                    log::info!("Send msg with error {}", err);
673                                    self.send_queue.insert(0, msg);
674                                    break;
675                                }
676                            }
677                            top_msg = self.send_queue.pop();
678                        }
679
680                        // if self.send_queue.is_empty() {}
681                        break;
682                    } else {
683                        match resp.unwrap() {
684                            Ok(frame) => {
685                                match frame {
686                                    ws::Frame::Text(text) => self
687                                        .recv_queue
688                                        .push(String::from_utf8(text.to_vec()).unwrap()),
689                                    ws::Frame::Binary(bin) => self
690                                        .recv_queue
691                                        .push(String::from_utf8(bin.to_vec()).unwrap()),
692                                    ws::Frame::Continuation(_t) => {}
693                                    ws::Frame::Ping(ping) => {
694                                        let msg = String::from_utf8(ping.to_vec()).unwrap();
695                                        log::info!("recv ping message {}", msg);
696                                        let _ =
697                                            conn.send(Message::Pong("pong".into())).await.is_ok();
698                                    }
699                                    ws::Frame::Pong(pong) => {
700                                        let msg = String::from_utf8(pong.to_vec()).unwrap();
701                                        log::info!("recv pong message {}", msg);
702                                        let _ =
703                                            conn.send(Message::Ping("ping".into())).await.is_ok();
704                                    }
705                                    ws::Frame::Close(_close) => {
706                                        // should close
707                                        break;
708                                    }
709                                }
710                            }
711                            Err(err) => {
712                                log::info!("Unsupport protocol error {}", err);
713                            }
714                        };
715                    }
716                }
717                Err(err) => {
718                    log::info!("WebSocket error {}", err);
719                }
720            }
721            let lock = self.mutex_send.lock().unwrap();
722            let (l, _r) = self
723                .cond_send
724                .wait_timeout(lock, Duration::from_secs(1))
725                .unwrap();
726            drop(l);
727        }
728        Ok(())
729    }
730}