cyfs_lib/requestor/
ws.rs

1use super::requestor::*;
2use crate::base::*;
3use crate::ws::*;
4use cyfs_base::*;
5
6use async_std::io::ReadExt;
7use http_types::Url;
8use http_types::{Request, Response};
9use std::sync::Arc;
10
11#[derive(Clone)]
12struct WSHttpRequestorHandler {}
13impl WSHttpRequestorHandler {
14    pub fn new() -> Self {
15        Self {}
16    }
17}
18
19#[async_trait::async_trait]
20impl WebSocketRequestHandler for WSHttpRequestorHandler {
21    async fn on_request(
22        &self,
23        _requestor: Arc<WebSocketRequestManager>,
24        _cmd: u16,
25        _content: Vec<u8>,
26    ) -> BuckyResult<Option<Vec<u8>>> {
27        unreachable!();
28    }
29
30    async fn on_session_begin(&self, _session: &Arc<WebSocketSession>) {}
31
32    async fn on_session_end(&self, _session: &Arc<WebSocketSession>) {}
33
34    fn clone_handler(&self) -> Box<dyn WebSocketRequestHandler> {
35        Box::new(self.clone())
36    }
37}
38
39#[derive(Clone)]
40pub struct WSHttpRequestor {
41    client: WebSocketClient,
42}
43
44impl WSHttpRequestor {
45    pub fn new(service_url: Url) -> Self {
46        let handler = Box::new(WSHttpRequestorHandler::new());
47        let client = WebSocketClient::new(service_url, handler);
48        client.start();
49
50        Self { client }
51    }
52}
53
54#[async_trait::async_trait]
55impl HttpRequestor for WSHttpRequestor {
56    async fn request_ext(
57        &self,
58        req: &mut Option<Request>,
59        conn_info: Option<&mut HttpRequestConnectionInfo>,
60    ) -> BuckyResult<Response> {
61        let begin = std::time::Instant::now();
62
63        // 选择一个ws session
64        let mut session = self.client.select_session();
65        if session.is_none() {
66            warn!("local ws not yet established or disconnected! now will retry once");
67            self.client.retry();
68            async_std::task::sleep(std::time::Duration::from_secs(2)).await;
69
70            session = self.client.select_session();
71            if session.is_none() {
72                let msg = format!("local ws not yet established or disconnected! now will end with error");
73                error!("{}", msg);
74                return Err(BuckyError::new(BuckyErrorCode::ConnectFailed, msg));
75            }
76        }
77        let session = session.unwrap();
78
79        debug!(
80            "will http-ws request via sid={}, url={}",
81            session.sid(),
82            req.as_ref().unwrap().url()
83        );
84
85        if let Some(conn_info) = conn_info {
86            *conn_info = HttpRequestConnectionInfo::Tcp(session.conn_info().to_owned());
87        }
88
89        // request编码到buffer
90        let req = req.take().unwrap();
91        let req  = self.add_default_headers(req);
92        let mut encoder = async_h1::client::Encoder::new(req);
93        let mut buf = vec![];
94        encoder.read_to_end(&mut buf).await.map_err(|e| {
95            let msg = format!(
96                "encode http request to buffer error! sid={}, during={}ms, {}",
97                session.sid(),
98                begin.elapsed().as_millis(),
99                e
100            );
101            error!("{}", msg);
102
103            BuckyError::from(msg)
104        })?;
105
106        // 发起请求并等待应答
107        let resp_buffer = session
108            .requestor()
109            .post_bytes_req(HTTP_CMD_REQUEST, buf)
110            .await?;
111        let resp_reader = async_std::io::Cursor::new(resp_buffer);
112        let resp = async_h1::client::decode(resp_reader).await.map_err(|e| {
113            let msg = format!(
114                "decode http response from buffer error! sid={}, during={}ms, {}",
115                session.sid(),
116                begin.elapsed().as_millis(),
117                e
118            );
119            error!("{}", msg);
120
121            BuckyError::from(msg)
122        })?;
123
124        info!(
125            "http-ws request to {} via sid={} success! during={}ms",
126            self.remote_addr(),
127            session.sid(),
128            begin.elapsed().as_millis()
129        );
130
131        Ok(resp)
132    }
133
134    fn remote_addr(&self) -> String {
135        self.client.service_addr().to_string()
136    }
137
138    fn remote_device(&self) -> Option<DeviceId> {
139        None
140    }
141
142    fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
143        Box::new(self.clone())
144    }
145
146    async fn stop(&self) {
147        self.client.stop().await
148    }
149}