1use crate::errors::EurekaClientError;
2use crate::request::RegisterRequest;
3use crate::response::{ApplicationResponse, ApplicationsResponse};
4use reqwest::header::{
5 HeaderMap, HeaderValue, ACCEPT, ACCEPT_CHARSET, ACCEPT_ENCODING, CONTENT_TYPE, USER_AGENT,
6};
7use reqwest::{Client, StatusCode, Url};
8use serde_json;
9use std::time::Duration;
10
11pub struct EurekaClient {
13 client: Client,
14 client_name: String,
15 eureka_cluster_url: String,
16}
17
18impl EurekaClient {
22 pub fn new(client_name: &str, eureka_cluster_url: &str) -> EurekaClient {
29 debug!(
30 "Creating new Eureka Client client_name:{:?}, eureka_client:{:?}",
31 client_name, eureka_cluster_url
32 );
33 let client = Client::builder()
34 .timeout(Duration::from_secs(10))
35 .build()
36 .expect("failed to build reqwest client");
37 EurekaClient {
38 client,
39 client_name: client_name.to_owned(),
40 eureka_cluster_url: eureka_cluster_url.to_owned(),
41 }
42 }
43
44 pub async fn register(
45 &self,
46 application_id: &str,
47 register_request: &RegisterRequest,
48 ) -> Result<(), EurekaClientError> {
49 debug!(
50 "register: application_id={:?}, register_request:{:?}",
51 application_id, register_request
52 );
53 let _path = format!("/v2/apps/{}", application_id);
55
56 let inst = ®ister_request.instance;
58 let port_xml = if let Some(p) = inst.port {
59 format!("<port enabled=\"true\">{}</port>", p)
60 } else {
61 String::new()
62 };
63 let secure_port_xml = if let Some(p) = inst.secure_port {
64 format!("<securePort enabled=\"true\">{}</securePort>", p)
65 } else {
66 String::new()
67 };
68 let dci_xml = match inst.data_center_info.name {
69 crate::request::DcName::MyOwn => "<dataCenterInfo class=\"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo\"><name>MyOwn</name></dataCenterInfo>".to_string(),
70 crate::request::DcName::Amazon => "<dataCenterInfo class=\"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo\"><name>Amazon</name></dataCenterInfo>".to_string(),
71 };
72 let lease_xml = if let Some(li) = &inst.lease_info {
73 let eviction = li.eviction_duration_in_secs.unwrap_or(90);
74 format!(
75 "<leaseInfo><evictionDurationInSecs>{}</evictionDurationInSecs></leaseInfo>",
76 eviction
77 )
78 } else {
79 String::new()
80 };
81
82 let xml = format!(
83 "<instance>\n <hostName>{}</hostName>\n <app>{}</app>\n <ipAddr>{}</ipAddr>\n <vipAddress>{}</vipAddress>\n <secureVipAddress>{}</secureVipAddress>\n <status>{}</status>\n {}\n {}\n <countryId>1</countryId>\n {}\n {}\n <metadata class=\"java.util.Collections$EmptyMap\"/>\n <homePageUrl>{}</homePageUrl>\n <statusPageUrl>{}</statusPageUrl>\n <healthCheckUrl>{}</healthCheckUrl>\n</instance>",
84 inst.host_name,
85 inst.app,
86 inst.ip_addr,
87 inst.vip_address,
88 inst.secure_vip_address,
89 String::from(&inst.status),
90 port_xml,
91 secure_port_xml,
92 dci_xml,
93 lease_xml,
94 inst.homepage_url,
95 inst.status_page_url,
96 inst.health_check_url
97 );
98
99 let base = &self.eureka_cluster_url;
101 let candidates = vec![
103 format!("{}/eureka/v2/apps/{}", base, application_id),
104 format!("{}/eureka/apps/{}", base, application_id),
105 format!("{}/v2/apps/{}", base, application_id),
106 format!("{}/apps/{}", base, application_id),
107 ];
108
109 let mut last_err: Option<EurekaClientError> = None;
110 let mut saw_bad_request = false;
111 let mut saw_internal_server_error = false;
112
113 for uri_str in &candidates {
115 let url =
116 Url::parse(uri_str).map_err(|e| EurekaClientError::GenericError(e.to_string()))?;
117 let res = match self
118 .client
119 .post(url.clone())
120 .header("content-type", "application/xml")
121 .body(xml.clone())
122 .send()
123 .await
124 {
125 Ok(r) => r,
126 Err(e) => {
127 last_err = Some(EurekaClientError::from(e));
128 continue;
129 }
130 };
131
132 let status = res.status();
133 let body_bytes = res.bytes().await.map_err(EurekaClientError::from)?;
134 let _body_str = String::from_utf8_lossy(&body_bytes);
135
136 if status.is_success() {
137 return Ok(());
138 }
139
140 if status == StatusCode::BAD_REQUEST {
141 saw_bad_request = true;
142 } else if status == StatusCode::INTERNAL_SERVER_ERROR {
143 saw_internal_server_error = true;
144 }
145 }
146
147 let inst = ®ister_request.instance;
149 let mut inst_map = serde_json::Map::new();
150 inst_map.insert(
151 "hostName".to_string(),
152 serde_json::Value::String(inst.host_name.clone()),
153 );
154 inst_map.insert(
155 "app".to_string(),
156 serde_json::Value::String(inst.app.clone()),
157 );
158 inst_map.insert(
159 "ipAddr".to_string(),
160 serde_json::Value::String(inst.ip_addr.clone()),
161 );
162 inst_map.insert(
163 "vipAddress".to_string(),
164 serde_json::Value::String(inst.vip_address.clone()),
165 );
166 inst_map.insert(
167 "secureVipAddress".to_string(),
168 serde_json::Value::String(inst.secure_vip_address.clone()),
169 );
170 inst_map.insert(
171 "status".to_string(),
172 serde_json::Value::String(String::from(&inst.status)),
173 );
174 if let Some(p) = inst.port {
175 let mut port_obj = serde_json::Map::new();
176 port_obj.insert(
177 "$".to_string(),
178 serde_json::Value::Number(serde_json::Number::from(p)),
179 );
180 port_obj.insert(
181 "@enabled".to_string(),
182 serde_json::Value::String("true".to_string()),
183 );
184 inst_map.insert("port".to_string(), serde_json::Value::Object(port_obj));
185 }
186 if let Some(p) = inst.secure_port {
187 let mut port_obj = serde_json::Map::new();
188 port_obj.insert(
189 "$".to_string(),
190 serde_json::Value::Number(serde_json::Number::from(p)),
191 );
192 port_obj.insert(
193 "@enabled".to_string(),
194 serde_json::Value::String("false".to_string()),
195 );
196 inst_map.insert(
197 "securePort".to_string(),
198 serde_json::Value::Object(port_obj),
199 );
200 }
201 inst_map.insert(
202 "homePageUrl".to_string(),
203 serde_json::Value::String(inst.homepage_url.clone()),
204 );
205 inst_map.insert(
206 "statusPageUrl".to_string(),
207 serde_json::Value::String(inst.status_page_url.clone()),
208 );
209 inst_map.insert(
210 "healthCheckUrl".to_string(),
211 serde_json::Value::String(inst.health_check_url.clone()),
212 );
213 let mut dci = serde_json::Map::new();
215 dci.insert(
216 "@class".to_string(),
217 serde_json::Value::String(
218 "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo".to_string(),
219 ),
220 );
221 dci.insert(
222 "name".to_string(),
223 serde_json::Value::String(match inst.data_center_info.name {
224 crate::request::DcName::MyOwn => "MyOwn".to_string(),
225 crate::request::DcName::Amazon => "Amazon".to_string(),
226 }),
227 );
228 inst_map.insert("dataCenterInfo".to_string(), serde_json::Value::Object(dci));
229
230 if let Some(li) = &inst.lease_info {
232 let mut lease = serde_json::Map::new();
233 lease.insert(
234 "evictionDurationInSecs".to_string(),
235 serde_json::Value::Number(serde_json::Number::from(
236 li.eviction_duration_in_secs.unwrap_or(90),
237 )),
238 );
239 inst_map.insert("leaseInfo".to_string(), serde_json::Value::Object(lease));
240 }
241
242 let mut meta_obj = serde_json::Map::new();
244 meta_obj.insert(
245 "@class".to_string(),
246 serde_json::Value::String("java.util.Collections$EmptyMap".to_string()),
247 );
248 inst_map.insert("metadata".to_string(), serde_json::Value::Object(meta_obj));
249 inst_map.insert(
251 "countryId".to_string(),
252 serde_json::Value::Number(serde_json::Number::from(1)),
253 );
254
255 let mut top = serde_json::Map::new();
256 top.insert("instance".to_string(), serde_json::Value::Object(inst_map));
257 let manual_json = serde_json::Value::Object(top).to_string();
258
259 for uri_str in &candidates {
260 let url =
261 Url::parse(uri_str).map_err(|e| EurekaClientError::GenericError(e.to_string()))?;
262 let req_builder = self.client.post(url.clone());
263 let req_builder = req_builder
265 .header("accept", "application/json")
266 .header("content-type", "application/json;charset=UTF-8")
267 .header("accept-charset", "utf-8")
268 .header("user-agent", format!("Rust Reqwest/{}", self.client_name));
269
270 let res = match req_builder.body(manual_json.clone()).send().await {
271 Ok(r) => r,
272 Err(e) => {
273 last_err = Some(EurekaClientError::from(e));
274 continue;
275 }
276 };
277
278 let status = res.status();
279 let body_bytes = res.bytes().await.map_err(EurekaClientError::from)?;
280 let _body_str = String::from_utf8_lossy(&body_bytes);
281
282 match status {
283 StatusCode::BAD_REQUEST => {
284 saw_bad_request = true;
285 continue;
286 }
287 StatusCode::INTERNAL_SERVER_ERROR => {
288 saw_internal_server_error = true;
289 continue;
290 }
291 StatusCode::NOT_FOUND => {
292 continue;
294 }
295 _ => return Ok(()),
296 }
297 }
298
299 if let Some(e) = last_err {
301 Err(e)
302 } else if saw_internal_server_error {
303 Err(EurekaClientError::InternalServerError)
304 } else if saw_bad_request {
305 Err(EurekaClientError::BadRequest)
306 } else {
307 Err(EurekaClientError::NotFound)
308 }
309 }
310
311 pub async fn deregister(
312 &self,
313 application_id: &str,
314 instance_id: &str,
315 ) -> Result<(), EurekaClientError> {
316 let base_path = format!("/v2/apps/{}", application_id);
318
319 let app_uris = self.build_uris(&base_path)?; let mut last_err: Option<EurekaClientError> = None;
321 let mut saw_bad_request = false;
322 let mut saw_internal_server_error = false;
323
324 for app_url in app_uris {
325 let mut delete_url = app_url.clone();
330 match delete_url.path_segments_mut() {
331 Ok(mut segments) => {
332 segments.push(instance_id);
333 }
334 Err(e) => {
335 last_err = Some(EurekaClientError::GenericError(format!(
336 "Failed to append instance id to URL {e:?}"
337 )));
338 continue;
339 }
340 }
341
342 let headers = self.headers_map();
343 let res = match self
344 .client
345 .delete(delete_url.clone())
346 .headers(headers.clone())
347 .send()
348 .await
349 {
350 Ok(r) => r,
351 Err(e) => {
352 last_err = Some(EurekaClientError::from(e));
353 continue;
354 }
355 };
356
357 let status = res.status();
358
359 debug!(
360 "deregister: server response status={:?} for url={} ",
361 status, delete_url
362 );
363
364 if status.is_success() {
365 return Ok(());
366 }
367
368 if status == StatusCode::BAD_REQUEST {
369 saw_bad_request = true;
370 continue;
371 } else if status == StatusCode::INTERNAL_SERVER_ERROR {
372 saw_internal_server_error = true;
373 continue;
374 } else if status == StatusCode::NOT_FOUND {
375 continue;
377 }
378 }
379
380 if let Some(e) = last_err {
382 Err(e)
383 } else if saw_internal_server_error {
384 Err(EurekaClientError::InternalServerError)
385 } else if saw_bad_request {
386 Err(EurekaClientError::BadRequest)
387 } else {
388 Err(EurekaClientError::NotFound)
389 }
390 }
391
392 pub async fn get_application(
393 &self,
394 application_id: &str,
395 ) -> Result<ApplicationResponse, EurekaClientError> {
396 let _path = format!("/v2/apps/{}", application_id);
397
398 let uris = self.build_uris(&_path)?; let mut last_err: Option<EurekaClientError> = None;
400
401 for url in uris {
402 let headers = self.headers_map();
403 let res = match self
404 .client
405 .get(url.clone())
406 .headers(headers.clone())
407 .header(ACCEPT_ENCODING, "gzip")
408 .send()
409 .await
410 {
411 Ok(r) => r,
412 Err(e) => {
413 last_err = Some(EurekaClientError::from(e));
414 continue;
415 }
416 };
417
418 let status = res.status();
419
420 debug!("get_application: server response status={:?}", status);
421
422 if status == StatusCode::NOT_FOUND {
423 continue;
425 }
426
427 let body_bytes = res.bytes().await.map_err(EurekaClientError::from)?;
428
429 let app: ApplicationResponse = serde_json::from_slice(&body_bytes)?;
430 return Ok(app);
431 }
432
433 if let Some(e) = last_err {
434 Err(e)
435 } else {
436 Err(EurekaClientError::NotFound)
437 }
438 }
439
440 pub async fn get_applications(&self) -> Result<ApplicationsResponse, EurekaClientError> {
441 let path = "/v2/apps";
442
443 let uris = self.build_uris(path)?;
444 let mut last_err: Option<EurekaClientError> = None;
445
446 for url in uris {
447 debug!("get_applications url:{}", url);
448
449 let headers = self.headers_map();
450 let res = match self
451 .client
452 .get(url.clone())
453 .headers(headers.clone())
454 .send()
455 .await
456 {
457 Ok(r) => r,
458 Err(e) => {
459 last_err = Some(EurekaClientError::from(e));
460 continue;
461 }
462 };
463 let status = res.status();
464
465 debug!("get_applications: server response status={:?}", status);
466
467 if status == StatusCode::NOT_FOUND {
468 debug!("received NotFound (404) from server");
469 continue;
470 }
471
472 let body_bytes = res.bytes().await.map_err(EurekaClientError::from)?;
473
474 let apps: ApplicationsResponse = serde_json::from_slice(&body_bytes).map_err(|e| {
475 warn!("serde error: {:?}", e);
476 EurekaClientError::from(e)
477 })?;
478
479 debug!("returning: {:?}", apps);
480 return Ok(apps);
481 }
482
483 if let Some(e) = last_err {
484 Err(e)
485 } else {
486 Err(EurekaClientError::NotFound)
487 }
488 }
489
490 fn build_uri(&self, path: &str) -> Result<Url, EurekaClientError> {
491 let url = format!("{}{}", self.eureka_cluster_url, path);
492 Url::parse(&url).map_err(|e| EurekaClientError::GenericError(format!("Invalid URI: {}", e)))
493 }
494
495 fn build_uris(&self, path: &str) -> Result<Vec<Url>, EurekaClientError> {
499 let mut uris = Vec::new();
500
501 let mut paths = vec![path.to_string()];
503 if path.starts_with("/v2") {
504 paths.push(path.replacen("/v2", "", 1));
505 }
506
507 let prefix = "/eureka";
508
509 for p in paths {
510 if let Ok(u) = self.build_uri(&p) {
512 if !uris.contains(&u) {
513 uris.push(u);
514 }
515 }
516 if !self.eureka_cluster_url.ends_with(prefix) {
518 let path_with_prefix = format!("{}{}", prefix, p);
519 if let Ok(u) = self.build_uri(&path_with_prefix) {
520 if !uris.contains(&u) {
521 uris.push(u);
522 }
523 }
524 }
525 }
526
527 if uris.is_empty() {
528 return Err(EurekaClientError::GenericError("No valid URIs".to_string()));
529 }
530
531 Ok(uris)
532 }
533
534 fn headers_map(&self) -> HeaderMap {
535 let mut headers = HeaderMap::new();
536 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
537 headers.insert(
538 CONTENT_TYPE,
539 HeaderValue::from_static("application/json;charset=UTF-8"),
540 );
541 headers.insert(ACCEPT_CHARSET, HeaderValue::from_static("utf-8"));
542 let ua = HeaderValue::from_str(&format!("Rust Reqwest/{}", self.client_name))
543 .unwrap_or_else(|_| HeaderValue::from_static("Rust Reqwest"));
544 headers.insert(USER_AGENT, ua);
545 headers
546 }
547}