1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use async_std::net::TcpStream;
use cyfs_base::{BuckyError, BuckyErrorCode, ObjectId};
use http_types::{Method, Request, StatusCode, Url};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
pub struct GatewayQuery {
host: Url,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GatewayPeerAssocQueryResult {
pub code: String,
pub msg: String,
pub peer_id: Option<String>,
}
impl GatewayQuery {
pub fn new() -> Self {
Self {
host: Url::parse(super::GATEWAY_CONTROL_URL).unwrap(),
}
}
pub async fn query_assoc_peerid(
&self,
protocol: &str,
port: u16,
) -> Result<ObjectId, BuckyError> {
let url = self.host.join("peer_assoc").unwrap();
let body = format!(r#"{{ "protocol": "{}", "port": "{}" }}"#, protocol, port,);
let req = Request::new(Method::Get, url);
match self.post(req, body).await {
Ok(ret) => {
if ret.code == "0" {
assert!(ret.peer_id.is_some());
debug!(
"query peer assoc success! {} -> {}",
port,
ret.peer_id.as_ref().unwrap()
);
Ok(ObjectId::from_str(&ret.peer_id.as_ref().unwrap()).unwrap())
} else {
let msg = format!("query peer assoc error! ret={:?}", ret);
error!("{}", msg);
Err(BuckyError::from(msg))
}
}
Err(e) => Err(e),
}
}
async fn post(
&self,
mut req: Request,
body: String,
) -> Result<GatewayPeerAssocQueryResult, BuckyError> {
let host = self.host.host_str().unwrap();
let port = self.host.port().unwrap();
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await.map_err(|e| {
error!(
"tcp connect to gateway control interface failed! host={}, err={}",
self.host, e
);
BuckyError::from(e)
})?;
req.set_body(body);
let mut resp = ::async_h1::connect(stream, req).await.map_err(|e| {
error!(
"http connect to gateway control interface error! host={}, err={}",
self.host, e
);
BuckyError::from(e)
})?;
match resp.status() {
StatusCode::Ok => resp.body_json().await.map_err(|e| {
let msg = format!("parse gateway register resp body error! err={}", e);
error!("{}", msg);
BuckyError::from(msg)
}),
StatusCode::NotFound => {
warn!("query assoc peerid but not found!");
Err(BuckyError::from(BuckyErrorCode::NotFound))
}
StatusCode::BadRequest => {
error!("query assoc peerid with invalid format!");
Err(BuckyError::from(BuckyErrorCode::InvalidFormat))
}
v @ _ => {
let msg = format!("query assoc peerid error! status={}", v);
error!("{}", msg);
Err(BuckyError::from(msg))
}
}
}
}