1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use async_std::net::TcpStream;
use cyfs_base::{BuckyError, BuckyErrorCode, ObjectId};
use http_types::{Method, Request, StatusCode, Url};
use serde::{Deserialize, Serialize};
use std::str::FromStr;

pub struct GatewayQuery {
    host: Url,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct GatewayPeerAssocQueryResult {
    pub code: String,
    pub msg: String,
    pub peer_id: Option<String>,
}

impl GatewayQuery {
    pub fn new() -> Self {
        Self {
            host: Url::parse(super::GATEWAY_CONTROL_URL).unwrap(),
        }
    }

    pub async fn query_assoc_peerid(
        &self,
        protocol: &str,
        port: u16,
    ) -> Result<ObjectId, BuckyError> {
        let url = self.host.join("peer_assoc").unwrap();

        let body = format!(r#"{{ "protocol": "{}", "port": "{}" }}"#, protocol, port,);

        let req = Request::new(Method::Get, url);

        match self.post(req, body).await {
            Ok(ret) => {
                if ret.code == "0" {
                    assert!(ret.peer_id.is_some());

                    debug!(
                        "query peer assoc success! {} -> {}",
                        port,
                        ret.peer_id.as_ref().unwrap()
                    );
                    Ok(ObjectId::from_str(&ret.peer_id.as_ref().unwrap()).unwrap())
                } else {
                    let msg = format!("query peer assoc error! ret={:?}", ret);
                    error!("{}", msg);

                    Err(BuckyError::from(msg))
                }
            }
            Err(e) => Err(e),
        }
    }

    async fn post(
        &self,
        mut req: Request,
        body: String,
    ) -> Result<GatewayPeerAssocQueryResult, BuckyError> {
        let host = self.host.host_str().unwrap();
        let port = self.host.port().unwrap();
        let addr = format!("{}:{}", host, port);

        let stream = TcpStream::connect(addr).await.map_err(|e| {
            error!(
                "tcp connect to gateway control interface failed! host={}, err={}",
                self.host, e
            );
            BuckyError::from(e)
        })?;

        req.set_body(body);

        let mut resp = ::async_h1::connect(stream, req).await.map_err(|e| {
            error!(
                "http connect to gateway control interface error! host={}, err={}",
                self.host, e
            );
            BuckyError::from(e)
        })?;

        match resp.status() {
            StatusCode::Ok => resp.body_json().await.map_err(|e| {
                let msg = format!("parse gateway register resp body error! err={}", e);
                error!("{}", msg);

                BuckyError::from(msg)
            }),
            StatusCode::NotFound => {
                warn!("query assoc peerid but not found!");
                Err(BuckyError::from(BuckyErrorCode::NotFound))
            }
            StatusCode::BadRequest => {
                error!("query assoc peerid with invalid format!");
                Err(BuckyError::from(BuckyErrorCode::InvalidFormat))
            }
            v @ _ => {
                let msg = format!("query assoc peerid error! status={}", v);
                error!("{}", msg);
                Err(BuckyError::from(msg))
            }
        }
    }
}