1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use super::requestor::*;
use crate::base::*;
use crate::ws::*;
use cyfs_base::*;

use async_std::io::ReadExt;
use http_types::Url;
use http_types::{Request, Response};
use std::sync::Arc;

#[derive(Clone)]
struct WSHttpRequestorHandler {}
impl WSHttpRequestorHandler {
    pub fn new() -> Self {
        Self {}
    }
}

#[async_trait::async_trait]
impl WebSocketRequestHandler for WSHttpRequestorHandler {
    async fn on_request(
        &self,
        _requestor: Arc<WebSocketRequestManager>,
        _cmd: u16,
        _content: Vec<u8>,
    ) -> BuckyResult<Option<Vec<u8>>> {
        unreachable!();
    }

    async fn on_session_begin(&self, _session: &Arc<WebSocketSession>) {}

    async fn on_session_end(&self, _session: &Arc<WebSocketSession>) {}

    fn clone_handler(&self) -> Box<dyn WebSocketRequestHandler> {
        Box::new(self.clone())
    }
}

#[derive(Clone)]
pub struct WSHttpRequestor {
    client: WebSocketClient,
}

impl WSHttpRequestor {
    pub fn new(service_url: Url) -> Self {
        let handler = Box::new(WSHttpRequestorHandler::new());
        let client = WebSocketClient::new(service_url, handler);
        client.start();

        Self { client }
    }
}

#[async_trait::async_trait]
impl HttpRequestor for WSHttpRequestor {
    async fn request_ext(
        &self,
        req: &mut Option<Request>,
        conn_info: Option<&mut HttpRequestConnectionInfo>,
    ) -> BuckyResult<Response> {
        let begin = std::time::Instant::now();

        // 选择一个ws session
        let mut session = self.client.select_session();
        if session.is_none() {
            warn!("local ws not yet established or disconnected! now will retry once");
            self.client.retry();
            async_std::task::sleep(std::time::Duration::from_secs(2)).await;

            session = self.client.select_session();
            if session.is_none() {
                let msg = format!("local ws not yet established or disconnected! now will end with error");
                error!("{}", msg);
                return Err(BuckyError::new(BuckyErrorCode::ConnectFailed, msg));
            }
        }
        let session = session.unwrap();

        debug!(
            "will http-ws request via sid={}, url={}",
            session.sid(),
            req.as_ref().unwrap().url()
        );

        if let Some(conn_info) = conn_info {
            *conn_info = HttpRequestConnectionInfo::Tcp(session.conn_info().to_owned());
        }

        // request编码到buffer
        let req = req.take().unwrap();
        let mut encoder = async_h1::client::Encoder::new(req);
        let mut buf = vec![];
        encoder.read_to_end(&mut buf).await.map_err(|e| {
            let msg = format!(
                "encode http request to buffer error! sid={}, during={}ms, {}",
                session.sid(),
                begin.elapsed().as_millis(),
                e
            );
            error!("{}", msg);

            BuckyError::from(msg)
        })?;

        // 发起请求并等待应答
        let resp_buffer = session
            .requestor()
            .post_bytes_req(HTTP_CMD_REQUEST, buf)
            .await?;
        let resp_reader = async_std::io::Cursor::new(resp_buffer);
        let resp = async_h1::client::decode(resp_reader).await.map_err(|e| {
            let msg = format!(
                "decode http response from buffer error! sid={}, during={}ms, {}",
                session.sid(),
                begin.elapsed().as_millis(),
                e
            );
            error!("{}", msg);

            BuckyError::from(msg)
        })?;

        info!(
            "http-ws request to {} via sid={} success! during={}ms",
            self.remote_addr(),
            session.sid(),
            begin.elapsed().as_millis()
        );

        Ok(resp)
    }

    fn remote_addr(&self) -> String {
        self.client.service_addr().to_string()
    }

    fn remote_device(&self) -> Option<DeviceId> {
        None
    }

    fn clone_requestor(&self) -> Box<dyn HttpRequestor> {
        Box::new(self.clone())
    }

    async fn stop(&self) {
        self.client.stop().await
    }
}