cyfs_util/gateway/
gateway_query.rs

1use async_std::net::TcpStream;
2use cyfs_base::{BuckyError, BuckyErrorCode, ObjectId};
3use http_types::{Method, Request, StatusCode, Url};
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6
7pub struct GatewayQuery {
8    host: Url,
9}
10
11#[derive(Debug, Serialize, Deserialize)]
12pub struct GatewayPeerAssocQueryResult {
13    pub code: String,
14    pub msg: String,
15    pub peer_id: Option<String>,
16}
17
18impl GatewayQuery {
19    pub fn new() -> Self {
20        Self {
21            host: Url::parse(super::GATEWAY_CONTROL_URL).unwrap(),
22        }
23    }
24
25    pub async fn query_assoc_peerid(
26        &self,
27        protocol: &str,
28        port: u16,
29    ) -> Result<ObjectId, BuckyError> {
30        let url = self.host.join("peer_assoc").unwrap();
31
32        let body = format!(r#"{{ "protocol": "{}", "port": "{}" }}"#, protocol, port,);
33
34        let req = Request::new(Method::Get, url);
35
36        match self.post(req, body).await {
37            Ok(ret) => {
38                if ret.code == "0" {
39                    assert!(ret.peer_id.is_some());
40
41                    debug!(
42                        "query peer assoc success! {} -> {}",
43                        port,
44                        ret.peer_id.as_ref().unwrap()
45                    );
46                    Ok(ObjectId::from_str(&ret.peer_id.as_ref().unwrap()).unwrap())
47                } else {
48                    let msg = format!("query peer assoc error! ret={:?}", ret);
49                    error!("{}", msg);
50
51                    Err(BuckyError::from(msg))
52                }
53            }
54            Err(e) => Err(e),
55        }
56    }
57
58    async fn post(
59        &self,
60        mut req: Request,
61        body: String,
62    ) -> Result<GatewayPeerAssocQueryResult, BuckyError> {
63        let host = self.host.host_str().unwrap();
64        let port = self.host.port().unwrap();
65        let addr = format!("{}:{}", host, port);
66
67        let stream = TcpStream::connect(addr).await.map_err(|e| {
68            error!(
69                "tcp connect to gateway control interface failed! host={}, err={}",
70                self.host, e
71            );
72            BuckyError::from(e)
73        })?;
74
75        req.set_body(body);
76
77        let mut resp = ::async_h1::connect(stream, req).await.map_err(|e| {
78            error!(
79                "http connect to gateway control interface error! host={}, err={}",
80                self.host, e
81            );
82            BuckyError::from(e)
83        })?;
84
85        match resp.status() {
86            StatusCode::Ok => resp.body_json().await.map_err(|e| {
87                let msg = format!("parse gateway register resp body error! err={}", e);
88                error!("{}", msg);
89
90                BuckyError::from(msg)
91            }),
92            StatusCode::NotFound => {
93                warn!("query assoc peerid but not found!");
94                Err(BuckyError::from(BuckyErrorCode::NotFound))
95            }
96            StatusCode::BadRequest => {
97                error!("query assoc peerid with invalid format!");
98                Err(BuckyError::from(BuckyErrorCode::InvalidFormat))
99            }
100            v @ _ => {
101                let msg = format!("query assoc peerid error! status={}", v);
102                error!("{}", msg);
103                Err(BuckyError::from(msg))
104            }
105        }
106    }
107}