xxljob_sdk_rs/client/
admin_client.rs

1use crate::common::client_config::ClientConfig;
2use crate::common::http_utils::{HttpUtils, ResponseWrap};
3use crate::common::model::admin_request::{CallbackParam, RegistryParam};
4use crate::common::model::XxlApiResult;
5use crate::common::{constant, get_app_version};
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[derive(Clone)]
10pub struct AdminClient {
11    client_config: Arc<ClientConfig>,
12    client: reqwest::Client,
13    addrs: Vec<String>,
14    headers: HashMap<String, String>,
15}
16
17impl AdminClient {
18    pub fn new(client_config: Arc<ClientConfig>) -> anyhow::Result<Self> {
19        let addrs_str = client_config.server_address.as_str();
20        if addrs_str.is_empty() {
21            return Err(anyhow::anyhow!("empty admin service address"));
22        }
23        let addrs = addrs_str
24            .split(",")
25            .filter(|&v| !v.is_empty())
26            .map(|v| v.to_owned())
27            .collect();
28        let mut client_builder = reqwest::ClientBuilder::new();
29        #[cfg(feature = "ssl_mode")]
30        if client_config.ssl_danger_accept_invalid_certs {
31            client_builder = client_builder.danger_accept_invalid_certs(true);
32        }
33        client_builder = client_builder.timeout(std::time::Duration::from_millis(3000));
34        let client = client_builder.build()?;
35        let mut headers = HashMap::new();
36        if !client_config.access_token.is_empty() {
37            headers.insert(
38                "XXL-JOB-ACCESS-TOKEN".to_owned(),
39                client_config.access_token.as_ref().clone(),
40            );
41            headers.insert("Content-Type".to_owned(), "application/json".to_owned());
42            headers.insert(
43                "User-Agent".to_owned(),
44                format!("xxljob-sdk-rs/{}", get_app_version()),
45            );
46        }
47        Ok(Self {
48            client,
49            addrs,
50            client_config,
51            headers,
52        })
53    }
54
55    pub async fn registry(&self) -> anyhow::Result<()> {
56        let address = format!(
57            "http://{}:{}",
58            self.client_config.ip, self.client_config.port
59        );
60        let param = RegistryParam {
61            registry_group: constant::EXECUTOR.clone(),
62            registry_key: self.client_config.app_name.clone(),
63            registry_value: Arc::new(address),
64        };
65        let body = serde_json::to_vec(&param)?;
66        match self.request(body, "registry").await {
67            Ok(_) => {
68                log::info!("admin_client|registry success");
69                Ok(())
70            }
71            Err(e) => {
72                log::error!("admin_client|registry error:{}", &e);
73                Err(e)
74            }
75        }
76    }
77
78    pub async fn registry_remove(&self) -> anyhow::Result<()> {
79        let address = format!(
80            "http://{}:{}",
81            self.client_config.ip, self.client_config.port
82        );
83        let param = RegistryParam {
84            registry_group: constant::EXECUTOR.clone(),
85            registry_key: self.client_config.app_name.clone(),
86            registry_value: Arc::new(address),
87        };
88        let body = serde_json::to_vec(&param)?;
89        match self.request(body, "registryRemove").await {
90            Ok(_) => {
91                log::info!("admin_client|registryRemove success");
92                Ok(())
93            }
94            Err(e) => {
95                log::error!("admin_client|registryRemove error:{}", &e);
96                Err(e)
97            }
98        }
99    }
100
101    pub async fn callback(&self, params: &Vec<CallbackParam>) -> anyhow::Result<()> {
102        let body = serde_json::to_vec(params)?;
103        match self.request(body, "callback").await {
104            Ok(_) => {
105                log::info!("admin_client|callback success");
106                Ok(())
107            }
108            Err(e) => {
109                log::error!("admin_client|callback error:{}", &e);
110                Err(e)
111            }
112        }
113    }
114
115    async fn request(&self, body: Vec<u8>, sub_url: &str) -> anyhow::Result<()> {
116        let mut registry_success = false;
117        for addr in &self.addrs {
118            let url = format!("{}/api/{}", addr, &sub_url);
119            match HttpUtils::request(
120                &self.client,
121                "POST",
122                &url,
123                body.clone(),
124                Some(&self.headers),
125                Some(3000),
126            )
127            .await
128            {
129                Ok(resp) => {
130                    if let Ok(v) = Self::convert(&resp) {
131                        if v.is_success() {
132                            registry_success = true;
133                            break;
134                        }
135                    }
136                }
137                Err(err) => {
138                    log::error!("call response error:{},url:{}", err, &url);
139                }
140            }
141        }
142        if !registry_success {
143            Err(anyhow::anyhow!("registry failed"))
144        } else {
145            Ok(())
146        }
147    }
148
149    pub fn convert(resp: &ResponseWrap) -> anyhow::Result<XxlApiResult<String>> {
150        let v = serde_json::from_slice(&resp.body)?;
151        Ok(v)
152    }
153}