cyfs_util/gateway/
gateway_register.rs1use async_std::net::TcpStream;
2use async_std::prelude::*;
3use cyfs_base::BuckyError;
4use http_types::{Method, Request, StatusCode, Url};
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8pub struct GatewayRegister {
9 pub id: String,
10 pub server_type: String,
11 pub value: String,
12
13 pub host: Url,
14}
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct GatewayRegisterResult {
18 pub code: String,
19 pub msg: String,
20}
21
22impl GatewayRegister {
23 pub fn new(id: String, server_type: String) -> GatewayRegister {
24 GatewayRegister {
25 id,
26 server_type,
27 value: "".to_owned(),
28 host: Url::parse(super::GATEWAY_CONTROL_URL).unwrap(),
29 }
30 }
31
32 pub fn register(id: String, server_type: String, value: String) -> Result<(), BuckyError> {
33 let _: serde_json::Value = serde_json::from_str(&value).map_err(|e| {
34 let msg = format!("invalid register value format! {}, err={}", value, e);
35 error!("{}", msg);
36
37 BuckyError::from(msg)
38 })?;
39
40 assert!(server_type == "http" || server_type == "stream");
41 assert!(id.len() > 0);
42
43 let mut register = GatewayRegister::new(id, server_type);
44 register.value = value;
45
46 async_std::task::spawn(async move {
48 register.run_register().await;
49 });
50
51 Ok(())
52 }
53
54 pub async fn unregister(id: String, server_type: String) -> Result<(), BuckyError> {
55 assert!(server_type == "http" || server_type == "stream");
56 assert!(id.len() > 0);
57
58 let register = GatewayRegister::new(id, server_type);
59
60 register.unregister_once().await
61 }
62
63 pub async fn run_register(self) {
64 let _r = self.register_once().await;
65
66 let mut interval = async_std::stream::interval(Duration::from_secs(45));
68 while let Some(_) = interval.next().await {
69 let _r = self.register_once().await;
70 }
71 }
72
73 async fn register_once(&self) -> Result<(), BuckyError> {
74 let url = self.host.join("register").unwrap();
75
76 let body = format!(
77 r#"{{ "id": "{}", "type": "{}", "value": {} }} "#,
78 self.id, self.server_type, self.value
79 );
80
81 let req = Request::new(Method::Post, url);
82
83 match self.post(req, body).await {
84 Ok(ret) => {
85 if ret.code == "0" {
86 debug!("{} register to gateway success!", self.id);
87 Ok(())
88 } else {
89 let msg = format!("register to gateway error! ret={:?}", ret);
90 error!("{}", msg);
91
92 Err(BuckyError::from(msg))
93 }
94 }
95 Err(e) => Err(e),
96 }
97 }
98
99 async fn unregister_once(&self) -> Result<(), BuckyError> {
100 let url = self.host.join("unregister").unwrap();
101
102 let body = format!(
103 r#"{{ "id": "{}", "type": "{}" }} "#,
104 self.id, self.server_type
105 );
106
107 let req = Request::new(Method::Post, url);
108
109 match self.post(req, body).await {
110 Ok(ret) => {
111 if ret.code == "0" {
112 debug!("register to gateway success! id={}", self.id);
113 Ok(())
114 } else {
115 let msg = format!("register to gateway error! ret={:?}", ret);
116 error!("{}", msg);
117
118 Err(BuckyError::from(msg))
119 }
120 }
121 Err(e) => Err(e),
122 }
123 }
124
125 async fn post(
126 &self,
127 mut req: Request,
128 body: String,
129 ) -> Result<GatewayRegisterResult, BuckyError> {
130 let host = self.host.host_str().unwrap();
131 let port = self.host.port().unwrap();
132 let addr = format!("{}:{}", host, port);
133
134 let stream = TcpStream::connect(addr).await.map_err(|e| {
135 error!(
136 "tcp connect to gateway control interface failed! host={}, err={}",
137 self.host, e
138 );
139 BuckyError::from(e)
140 })?;
141
142 req.set_body(body);
143
144 let mut resp = ::async_h1::connect(stream, req).await.map_err(|e| {
145 error!(
146 "http connect to gateway control interface error! host={}, err={}",
147 self.host, e
148 );
149 BuckyError::from(e)
150 })?;
151
152 if resp.status() != StatusCode::Ok {
153 error!("gateway register resp status: {}", resp.status());
154 }
155
156 resp.body_json().await.map_err(|e| {
157 let msg = format!("parse gateway register resp body error! err={}", e);
158 error!("{}", msg);
159
160 BuckyError::from(msg)
161 })
162 }
163}