1use super::requestor::*;
2use crate::base::*;
3use crate::ws::*;
4use cyfs_base::*;
5
6use async_std::io::ReadExt;
7use http_types::Url;
8use http_types::{Request, Response};
9use std::sync::Arc;
10
11#[derive(Clone)]
12struct WSHttpRequestorHandler {}
13impl WSHttpRequestorHandler {
14 pub fn new() -> Self {
15 Self {}
16 }
17}
18
19#[async_trait::async_trait]
20impl WebSocketRequestHandler for WSHttpRequestorHandler {
21 async fn on_request(
22 &self,
23 _requestor: Arc<WebSocketRequestManager>,
24 _cmd: u16,
25 _content: Vec<u8>,
26 ) -> BuckyResult<Option<Vec<u8>>> {
27 unreachable!();
28 }
29
30 async fn on_session_begin(&self, _session: &Arc<WebSocketSession>) {}
31
32 async fn on_session_end(&self, _session: &Arc<WebSocketSession>) {}
33
34 fn clone_handler(&self) -> Box<dyn WebSocketRequestHandler> {
35 Box::new(self.clone())
36 }
37}
38
39#[derive(Clone)]
40pub struct WSHttpRequestor {
41 client: WebSocketClient,
42}
43
44impl WSHttpRequestor {
45 pub fn new(service_url: Url) -> Self {
46 let handler = Box::new(WSHttpRequestorHandler::new());
47 let client = WebSocketClient::new(service_url, handler);
48 client.start();
49
50 Self { client }
51 }
52}
53
54#[async_trait::async_trait]
55impl HttpRequestor for WSHttpRequestor {
56 async fn request_ext(
57 &self,
58 req: &mut Option<Request>,
59 conn_info: Option<&mut HttpRequestConnectionInfo>,
60 ) -> BuckyResult<Response> {
61 let begin = std::time::Instant::now();
62
63 let mut session = self.client.select_session();
65 if session.is_none() {
66 warn!("local ws not yet established or disconnected! now will retry once");
67 self.client.retry();
68 async_std::task::sleep(std::time::Duration::from_secs(2)).await;
69
70 session = self.client.select_session();
71 if session.is_none() {
72 let msg = format!("local ws not yet established or disconnected! now will end with error");
73 error!("{}", msg);
74 return Err(BuckyError::new(BuckyErrorCode::ConnectFailed, msg));
75 }
76 }
77 let session = session.unwrap();
78
79 debug!(
80 "will http-ws request via sid={}, url={}",
81 session.sid(),
82 req.as_ref().unwrap().url()
83 );
84
85 if let Some(conn_info) = conn_info {
86 *conn_info = HttpRequestConnectionInfo::Tcp(session.conn_info().to_owned());
87 }
88
89 let req = req.take().unwrap();
91 let req = self.add_default_headers(req);
92 let mut encoder = async_h1::client::Encoder::new(req);
93 let mut buf = vec![];
94 encoder.read_to_end(&mut buf).await.map_err(|e| {
95 let msg = format!(
96 "encode http request to buffer error! sid={}, during={}ms, {}",
97 session.sid(),
98 begin.elapsed().as_millis(),
99 e
100 );
101 error!("{}", msg);
102
103 BuckyError::from(msg)
104 })?;
105
106 let resp_buffer = session
108 .requestor()
109 .post_bytes_req(HTTP_CMD_REQUEST, buf)
110 .await?;
111 let resp_reader = async_std::io::Cursor::new(resp_buffer);
112 let resp = async_h1::client::decode(resp_reader).await.map_err(|e| {
113 let msg = format!(
114 "decode http response from buffer error! sid={}, during={}ms, {}",
115 session.sid(),
116 begin.elapsed().as_millis(),
117 e
118 );
119 error!("{}", msg);
120
121 BuckyError::from(msg)
122 })?;
123
124 info!(
125 "http-ws request to {} via sid={} success! during={}ms",
126 self.remote_addr(),
127 session.sid(),
128 begin.elapsed().as_millis()
129 );
130
131 Ok(resp)
132 }
133
134 fn remote_addr(&self) -> String {
135 self.client.service_addr().to_string()
136 }
137
138 fn remote_device(&self) -> Option<DeviceId> {
139 None
140 }
141
142 fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
143 Box::new(self.clone())
144 }
145
146 async fn stop(&self) {
147 self.client.stop().await
148 }
149}