1extern crate tls_rustls as rustls;
2
3use actix_web::{
4 http::header::{self, HeaderMap, HeaderName},
5 http::Method,
6 web::Bytes,
7};
8use awc::{
9 error::HeaderValue,
10 ws::{self, Message},
11 Client as HttpClient, Connector,
12};
13
14use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
15use serde::Serialize;
16
17use crate::{error, ChimesError, ChimesResult as Result};
18use actix_tls::connect::rustls::webpki_roots_cert_store;
19use encoding_rs::{Encoding, UTF_8};
20use mime::Mime;
21use rustls::ClientConfig;
22use std::borrow::Cow;
23use std::{
24 sync::{Arc, Mutex},
25 time::Duration,
26};
27
28pub const DEFAULT_USER_AGENT: &str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3534.4 Safari/537.36";
31
32mod danger {
33 extern crate tls_rustls as rustls;
35 use rustls::{
36 client::{ServerCertVerified, ServerCertVerifier},
37 Certificate, ServerName,
38 };
39 use std::time::SystemTime;
40 pub struct NoCertificateVerification;
41
42 impl ServerCertVerifier for NoCertificateVerification {
43 fn verify_server_cert(
44 &self,
45 _end_entity: &Certificate,
46 _intermediates: &[Certificate],
47 _server_name: &ServerName,
48 _scts: &mut dyn Iterator<Item = &[u8]>,
49 _ocsp_response: &[u8],
50 _now: SystemTime,
51 ) -> Result<ServerCertVerified, rustls::Error> {
52 Ok(ServerCertVerified::assertion())
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct ChimesClient {
60 pub(crate) client: HttpClient,
61 charset: String,
62 headers: header::HeaderMap,
63}
64
65pub(crate) fn text_with_charset(
84 headers: &header::HeaderMap,
85 default_encoding: &str,
86 bs: Bytes,
87) -> Result<String> {
88 let content_type = headers
89 .get(header::CONTENT_TYPE)
90 .and_then(|value| value.to_str().ok())
91 .and_then(|value| value.parse::<Mime>().ok());
92 let encoding_name = content_type
93 .as_ref()
94 .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
95 .unwrap_or(default_encoding);
96 let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
97
98 let (text, _, _) = encoding.decode(&bs);
99 if let Cow::Owned(s) = text {
100 return Ok(s);
101 }
102 unsafe {
103 Ok(String::from_utf8_unchecked(bs.to_vec()))
106 }
107}
108
109impl Default for ChimesClient {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl ChimesClient {
116 pub fn new() -> Self {
117 Self::new_timeout(60u64)
118 }
119 pub fn new_timeout(tm: u64) -> Self {
122 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
124 builder.set_verify(SslVerifyMode::NONE);
125 let _ = builder
126 .set_alpn_protos(b"\x02h2\x08http/1.1")
127 .map_err(|e| log::info!("Can not set alpn protocol: {:?}", e));
128
129 let connector = Connector::new()
130 .timeout(Duration::from_secs(tm))
131 .handshake_timeout(Duration::from_secs(30))
132 .openssl(builder.build());
133
134 let client = HttpClient::builder()
135 .connector(connector)
136 .timeout(Duration::from_secs(tm))
137 .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
138 .finish();
143
144 ChimesClient {
145 client,
146 charset: "utf-8".to_owned(),
147 headers: header::HeaderMap::new(),
148 }
149 }
150
151 pub fn new_tls() -> Self {
155 let mut config = ClientConfig::builder()
157 .with_safe_defaults()
158 .with_root_certificates(webpki_roots_cert_store())
159 .with_no_client_auth();
160
161 let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
162 config.alpn_protocols = protos;
163
164 config
166 .dangerous()
167 .set_certificate_verifier(Arc::new(danger::NoCertificateVerification));
168
169 let client = awc::Client::builder()
170 .connector(
171 awc::Connector::new()
172 .rustls(Arc::new(config))
173 .handshake_timeout(Duration::from_secs(15))
174 .timeout(Duration::from_secs(15)),
175 )
176 .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
177 .timeout(Duration::from_secs(30))
178 .finish();
179
180 ChimesClient {
181 client,
182 charset: "utf-8".to_owned(),
183 headers: header::HeaderMap::new(),
184 }
185 }
186
187 pub fn new_ssl(private_key: &str, certificate: &str) -> Self {
189 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
191 builder.set_verify(SslVerifyMode::NONE);
192 let _ = builder
193 .set_alpn_protos(b"\x02h2\x08http/1.1")
194 .map_err(|e| log::info!("Can not set alpn protocol: {:?}", e));
195
196 let _ = builder
197 .set_private_key_file(private_key, SslFiletype::PEM)
198 .map_err(|e| log::info!("apiclient_key.pem not find: {:?}", e));
199 let _ = builder
200 .set_certificate_chain_file(certificate)
201 .map_err(|e| log::info!("apiclient_cert.pem not find: {:?}", e));
202
203 let connector = Connector::new()
204 .timeout(Duration::from_secs(5))
205 .openssl(builder.build());
206
207 let client = HttpClient::builder()
208 .connector(connector)
209 .add_default_header((header::USER_AGENT, DEFAULT_USER_AGENT))
210 .finish();
211
212 ChimesClient {
213 client,
214 charset: "utf-8".to_owned(),
215 headers: header::HeaderMap::new(),
216 }
217 }
218
219 pub fn set_charset(mut self, charset: &str) -> Self {
221 self.charset = charset.to_owned();
222 self
223 }
224
225 pub async fn get(self, url: &str) -> Result<String> {
227 let mut build = self.client.get(url);
228 for (head_name, head_value) in self.headers {
229 build = build.insert_header((head_name, head_value));
230 }
231 match build.send().await {
232 Ok(mut res) => {
233 if res.status().is_success() {
235 match res.body().await {
240 Ok(bs) => {
241 let s = text_with_charset(res.headers(), &self.charset, bs);
242 s
244 }
245 Err(err) => Err(error! {
246 code: -1,
247 msg: format!("error: {}", err)
248 }),
249 }
250 } else {
251 Err(error! {
252 code: 500,
253 msg: format!("status={}", res.status())
254 })
255 }
256 }
257 Err(e) => {
258 log::info!("=== request error === {:?}", e);
259 Err(error! {
260 code: 500,
261 msg: format!("Send request error: {}", e)
262 })
263 }
264 }
265 }
266 pub async fn get_bytes(self, url: &str) -> Result<Vec<u8>> {
268 let mut build = self.client.get(url);
269 for (head_name, head_value) in self.headers {
270 build = build.insert_header((head_name, head_value));
271 }
272 match build.send().await {
273 Ok(mut res) => {
274 if res.status().is_success() {
275 match res.body().await {
276 Ok(bs) => Ok(bs[..].to_vec()),
277 Err(err) => Err(error! {
278 code: -1,
279 msg: format!("error: {}", err)
280 }),
281 }
282 } else {
283 Err(error! {
284 code: 500,
285 msg: format!("status={}", res.status())
286 })
287 }
288 }
289 Err(e) => {
290 log::info!("=== request error === {:?}", e);
291 Err(error! {
292 code: 500,
293 msg: format!("Send request error: {}", e)
294 })
295 }
296 }
297 }
298 pub async fn post<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
302 self.request(Method::POST, url, params).await
303 }
304
305 pub async fn put<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
307 self.request(Method::PUT, url, params).await
308 }
309
310 pub async fn delete<T: Serialize>(self, url: &str, params: &T) -> Result<String> {
312 self.request(Method::DELETE, url, params).await
313 }
314
315 pub async fn request_betyes<T: Serialize>(
332 self,
333 method_str: &str,
334 url: &str,
335 params: &T,
336 ) -> Result<Vec<u8>> {
337 let method = match Method::from_bytes(method_str.as_bytes()) {
339 Ok(s) => s,
340 Err(_e) => Method::POST,
341 };
342 let mut build = self.client.request(method, url);
343 for (head_name, head_value) in self.headers {
344 build = build.insert_header((head_name, head_value));
345 }
346 match build.send_json(params).await {
347 Ok(mut res) => {
348 if res.status().is_success() {
350 match res.body().await {
351 Ok(bs) => Ok(bs.to_vec()),
352 Err(err) => Err(error! {
353 code: -1,
354 msg: format!("error: {}", err)
355 }),
356 }
357 } else {
358 Err(error! {
359 code: 500,
360 msg: format!("status={}", res.status())
361 })
362 }
363 }
364 Err(e) => Err(error! {
365 code: 500,
366 msg: format!("Send request error: {}", e)
367 }),
368 }
369 }
370
371 pub async fn request<T: Serialize>(
373 self,
374 method: Method,
375 url: &str,
376 params: &T,
377 ) -> Result<String> {
378 let mut build = self.client.request(method, url);
379 for (head_name, head_value) in self.headers {
380 build = build.insert_header((head_name, head_value));
381 }
382 match build.send_json(params).await {
383 Ok(mut res) => {
384 if res.status().is_success() {
386 match res.body().await {
387 Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
388 Ok(s) => {
389 Ok(s)
398 }
399 Err(err) => Err(err),
400 },
401 Err(err) => Err(error! {
402 code: -1,
403 msg: format!("error: {}", err)
404 }),
405 }
406 } else {
407 Err(error! {
408 code: 500,
409 msg: format!("status={}", res.status())
410 })
411 }
412 }
413 Err(e) => Err(error! {
414 code: 500,
415 msg: format!("Send request error: {}", e)
416 }),
417 }
418 }
419
420 pub async fn request_form_with_response(
422 self,
423 method: Method,
424 url: &str,
425 params: &String,
426 ) -> Result<(String, HeaderMap)> {
427 let mut build = self.client.request(method, url);
428 for (head_name, head_value) in self.headers {
429 build = build.insert_header((head_name, head_value));
430 }
431 match build.send_body(params.to_owned()).await {
432 Ok(mut res) => {
433 if res.status().is_success() {
435 match res.body().await {
436 Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
437 Ok(s) => {
438 Ok((s, res.headers().to_owned()))
447 }
448 Err(err) => Err(err),
449 },
450 Err(err) => Err(error! {
451 code: -1,
452 msg: format!("error: {}", err)
453 }),
454 }
455 } else {
456 let resp = match res.body().await {
457 Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
458 Ok(s) => s,
459 Err(err) => err.to_string(),
460 },
461 Err(err) => err.to_string(),
462 };
463 log::info!("Stop {}", resp.clone());
464 Err(error! {
465 code: 500,
466 msg: format!("status={}, message={}", res.status(), resp)
467 })
468 }
469 }
470 Err(e) => Err(error! {
471 code: 500,
472 msg: format!("Send request error: {}", e)
473 }),
474 }
475 }
476
477 pub async fn request_with_response<T: Serialize>(
479 self,
480 method: Method,
481 url: &str,
482 params: &T,
483 ) -> Result<(String, HeaderMap)> {
484 let mut build = self.client.request(method, url);
485 for (head_name, head_value) in self.headers {
486 build = build.insert_header((head_name, head_value));
487 }
488 match build.send_json(params).await {
489 Ok(mut res) => {
490 if res.status().is_success() {
492 match res.body().await {
493 Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
494 Ok(s) => {
495 Ok((s, res.headers().to_owned()))
504 }
505 Err(err) => Err(err),
506 },
507 Err(err) => Err(error! {
508 code: -1,
509 msg: format!("error: {}", err)
510 }),
511 }
512 } else {
513 let resp = match res.body().await {
514 Ok(bs) => match text_with_charset(res.headers(), &self.charset, bs) {
515 Ok(s) => s,
516 Err(err) => err.to_string(),
517 },
518 Err(err) => err.to_string(),
519 };
520 log::info!("Stop {}", resp.clone());
521 Err(error! {
522 code: 500,
523 msg: format!("status={}, message={}", res.status(), resp)
524 })
525 }
526 }
527 Err(e) => Err(error! {
528 code: 500,
529 msg: format!("Send request error: {}", e)
530 }),
531 }
532 }
533
534 pub fn insert_header(&mut self, key: &str, value: &str) -> &mut Self {
535 let hn = match HeaderName::from_lowercase(key.to_lowercase().as_bytes()) {
536 Ok(v) => Some(v),
537 Err(_) => None,
538 };
539
540 let hv = match HeaderValue::from_str(value) {
541 Ok(v) => Some(v),
542 Err(_) => None,
543 };
544
545 if let Some(hn) = hn {
546 if let Some(hv) = hv {
547 self.headers.insert(hn, hv);
548 }
549 }
550
551 self
552 }
553
554 pub fn put_header(&mut self, key: &str, value: &str) {
555 let hn = match HeaderName::from_lowercase(key.to_lowercase().as_bytes()) {
556 Ok(v) => Some(v),
557 Err(_) => None,
558 };
559
560 let hv = match HeaderValue::from_str(value) {
561 Ok(v) => Some(v),
562 Err(_) => None,
563 };
564
565 if let Some(hn) = hn {
566 if let Some(hv) = hv {
567 self.headers.insert(hn, hv);
568 }
569 }
570 }
571 pub async fn post_betyes(self, url: &str, body: Bytes) -> Result<String> {
573 let mut build = self.client.post(url);
574 for (head_name, head_value) in self.headers {
575 build = build.insert_header((head_name, head_value));
576 }
577 match build.send_body(body).await {
578 Ok(mut res) => {
579 if res.status().is_success() {
580 match res.body().await {
581 Ok(bs) => text_with_charset(res.headers(), &self.charset, bs),
582 Err(err) => Err(error! {
583 code: -1,
584 msg: format!("error: {}", err)
585 }),
586 }
587 } else {
588 Err(error! {
589 code: 500,
590 msg: format!("status={}", res.status())
591 })
592 }
593 }
594 Err(e) => Err(error! {
595 code: 500,
596 msg: format!("Send request error: {}", e)
597 }),
598 }
599 }
600}
601
602use futures_util::{SinkExt as _, StreamExt as _};
603use std::sync::Condvar;
604
605pub struct ChimesWebSocketClient {
606 websocket_url: String,
607 send_queue: Vec<String>,
608 recv_queue: Vec<String>,
609 looping: bool,
610 mutex_send: Mutex<bool>,
611 cond_send: Condvar,
612}
613
614impl ChimesWebSocketClient {
615 pub fn new_websocket(url: &str) -> Result<Self> {
616 Ok(Self {
617 websocket_url: url.to_string(),
618 send_queue: vec![],
619 recv_queue: vec![],
620 looping: false,
621 mutex_send: Mutex::new(false),
622 cond_send: Condvar::new(),
623 })
624 }
625
626 pub fn send(&mut self, body: &str) -> Result<()> {
627 self.send_queue.push(body.to_string());
628 Ok(())
629 }
630
631 pub fn recv(&self) -> &Vec<String> {
632 &self.recv_queue
633 }
634
635 pub fn stop(mut self) {
636 self.looping = false;
637 let lock = self.mutex_send.lock().unwrap();
638 self.cond_send.notify_all();
639 drop(lock);
640 }
641
642 pub async fn start(&mut self) -> Result<()> {
643 self.looping = true;
644 while self.looping {
645 let cli = awc::Client::new().ws(self.websocket_url.as_str());
646 match cli.connect().await {
647 Ok((_cl, mut conn)) => {
648 let resp = conn.next().await;
649 if resp.is_none() {
650 let lock = self.mutex_send.lock().expect("Can not lock");
652 match self.cond_send.wait_timeout(lock, Duration::from_secs(1)) {
653 Ok((_l, result)) => {
654 if result.timed_out() {
655 break;
656 }
657 }
658 Err(err) => {
659 log::info!(
660 "Condvar cond_send wait for a timeout with error. {}",
661 err
662 );
663 }
664 }
665
666 let mut top_msg = self.send_queue.pop();
667 while top_msg.is_some() {
668 let msg = top_msg.unwrap();
669 match conn.send(ws::Message::Text(msg.clone().into())).await {
670 Ok(_) => {}
671 Err(err) => {
672 log::info!("Send msg with error {}", err);
673 self.send_queue.insert(0, msg);
674 break;
675 }
676 }
677 top_msg = self.send_queue.pop();
678 }
679
680 break;
682 } else {
683 match resp.unwrap() {
684 Ok(frame) => {
685 match frame {
686 ws::Frame::Text(text) => self
687 .recv_queue
688 .push(String::from_utf8(text.to_vec()).unwrap()),
689 ws::Frame::Binary(bin) => self
690 .recv_queue
691 .push(String::from_utf8(bin.to_vec()).unwrap()),
692 ws::Frame::Continuation(_t) => {}
693 ws::Frame::Ping(ping) => {
694 let msg = String::from_utf8(ping.to_vec()).unwrap();
695 log::info!("recv ping message {}", msg);
696 let _ =
697 conn.send(Message::Pong("pong".into())).await.is_ok();
698 }
699 ws::Frame::Pong(pong) => {
700 let msg = String::from_utf8(pong.to_vec()).unwrap();
701 log::info!("recv pong message {}", msg);
702 let _ =
703 conn.send(Message::Ping("ping".into())).await.is_ok();
704 }
705 ws::Frame::Close(_close) => {
706 break;
708 }
709 }
710 }
711 Err(err) => {
712 log::info!("Unsupport protocol error {}", err);
713 }
714 };
715 }
716 }
717 Err(err) => {
718 log::info!("WebSocket error {}", err);
719 }
720 }
721 let lock = self.mutex_send.lock().unwrap();
722 let (l, _r) = self
723 .cond_send
724 .wait_timeout(lock, Duration::from_secs(1))
725 .unwrap();
726 drop(l);
727 }
728 Ok(())
729 }
730}