xxljob_sdk_rs/client/
admin_client.rs1use crate::common::client_config::ClientConfig;
2use crate::common::http_utils::{HttpUtils, ResponseWrap};
3use crate::common::model::admin_request::{CallbackParam, RegistryParam};
4use crate::common::model::XxlApiResult;
5use crate::common::{constant, get_app_version};
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[derive(Clone)]
10pub struct AdminClient {
11 client_config: Arc<ClientConfig>,
12 client: reqwest::Client,
13 addrs: Vec<String>,
14 headers: HashMap<String, String>,
15}
16
17impl AdminClient {
18 pub fn new(client_config: Arc<ClientConfig>) -> anyhow::Result<Self> {
19 let addrs_str = client_config.server_address.as_str();
20 if addrs_str.is_empty() {
21 return Err(anyhow::anyhow!("empty admin service address"));
22 }
23 let addrs = addrs_str
24 .split(",")
25 .filter(|&v| !v.is_empty())
26 .map(|v| v.to_owned())
27 .collect();
28 let mut client_builder = reqwest::ClientBuilder::new();
29 #[cfg(feature = "ssl_mode")]
30 if client_config.ssl_danger_accept_invalid_certs {
31 client_builder = client_builder.danger_accept_invalid_certs(true);
32 }
33 client_builder = client_builder.timeout(std::time::Duration::from_millis(3000));
34 let client = client_builder.build()?;
35 let mut headers = HashMap::new();
36 if !client_config.access_token.is_empty() {
37 headers.insert(
38 "XXL-JOB-ACCESS-TOKEN".to_owned(),
39 client_config.access_token.as_ref().clone(),
40 );
41 headers.insert("Content-Type".to_owned(), "application/json".to_owned());
42 headers.insert(
43 "User-Agent".to_owned(),
44 format!("xxljob-sdk-rs/{}", get_app_version()),
45 );
46 }
47 Ok(Self {
48 client,
49 addrs,
50 client_config,
51 headers,
52 })
53 }
54
55 pub async fn registry(&self) -> anyhow::Result<()> {
56 let address = format!(
57 "http://{}:{}",
58 self.client_config.ip, self.client_config.port
59 );
60 let param = RegistryParam {
61 registry_group: constant::EXECUTOR.clone(),
62 registry_key: self.client_config.app_name.clone(),
63 registry_value: Arc::new(address),
64 };
65 let body = serde_json::to_vec(¶m)?;
66 match self.request(body, "registry").await {
67 Ok(_) => {
68 log::info!("admin_client|registry success");
69 Ok(())
70 }
71 Err(e) => {
72 log::error!("admin_client|registry error:{}", &e);
73 Err(e)
74 }
75 }
76 }
77
78 pub async fn registry_remove(&self) -> anyhow::Result<()> {
79 let address = format!(
80 "http://{}:{}",
81 self.client_config.ip, self.client_config.port
82 );
83 let param = RegistryParam {
84 registry_group: constant::EXECUTOR.clone(),
85 registry_key: self.client_config.app_name.clone(),
86 registry_value: Arc::new(address),
87 };
88 let body = serde_json::to_vec(¶m)?;
89 match self.request(body, "registryRemove").await {
90 Ok(_) => {
91 log::info!("admin_client|registryRemove success");
92 Ok(())
93 }
94 Err(e) => {
95 log::error!("admin_client|registryRemove error:{}", &e);
96 Err(e)
97 }
98 }
99 }
100
101 pub async fn callback(&self, params: &Vec<CallbackParam>) -> anyhow::Result<()> {
102 let body = serde_json::to_vec(params)?;
103 match self.request(body, "callback").await {
104 Ok(_) => {
105 log::info!("admin_client|callback success");
106 Ok(())
107 }
108 Err(e) => {
109 log::error!("admin_client|callback error:{}", &e);
110 Err(e)
111 }
112 }
113 }
114
115 async fn request(&self, body: Vec<u8>, sub_url: &str) -> anyhow::Result<()> {
116 let mut registry_success = false;
117 for addr in &self.addrs {
118 let url = format!("{}/api/{}", addr, &sub_url);
119 match HttpUtils::request(
120 &self.client,
121 "POST",
122 &url,
123 body.clone(),
124 Some(&self.headers),
125 Some(3000),
126 )
127 .await
128 {
129 Ok(resp) => {
130 if let Ok(v) = Self::convert(&resp) {
131 if v.is_success() {
132 registry_success = true;
133 break;
134 }
135 }
136 }
137 Err(err) => {
138 log::error!("call response error:{},url:{}", err, &url);
139 }
140 }
141 }
142 if !registry_success {
143 Err(anyhow::anyhow!("registry failed"))
144 } else {
145 Ok(())
146 }
147 }
148
149 pub fn convert(resp: &ResponseWrap) -> anyhow::Result<XxlApiResult<String>> {
150 let v = serde_json::from_slice(&resp.body)?;
151 Ok(v)
152 }
153}