1use crate::tls::build_upstream_certificate;
2use ahash::HashMap;
3use axum::http::{HeaderName, StatusCode};
4use chrono::{Duration, Utc};
5use etag::EntityTag;
6use lazy_static::lazy_static;
7use prometheus::{HistogramVec, IntGaugeVec, Opts, register_histogram_vec, register_int_gauge_vec};
8use reqwest::header::HeaderMap;
9use reqwest::{Client, ClientBuilder, Identity, RequestBuilder, header};
10use serde::{Deserialize, Serialize};
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, Read};
14use std::path::PathBuf;
15use std::str::FromStr;
16use tracing::{debug, error, info, trace, warn};
17use ulid::Ulid;
18use unleash_edge_cli::ClientIdentity;
19use unleash_edge_types::enterprise::{HeartbeatResponse, LicenseState};
20use unleash_edge_types::errors::EdgeError::EdgeMetricsRequestError;
21use unleash_edge_types::errors::{CertificateError, EdgeError, FeatureError};
22use unleash_edge_types::headers::{
23 UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_CONNECTION_ID_HEADER,
24 UNLEASH_INSTANCE_ID_HEADER, UNLEASH_INTERVAL,
25};
26use unleash_edge_types::metrics::batching::MetricsBatch;
27use unleash_edge_types::metrics::instance_data::EdgeInstanceData;
28use unleash_edge_types::tokens::EdgeToken;
29use unleash_edge_types::urls::UnleashUrls;
30use unleash_edge_types::{
31 ClientFeaturesDeltaResponse, ClientFeaturesRequest, ClientFeaturesResponse, EdgeResult,
32 EdgeTokens, TokenValidationStatus, ValidateTokensRequest, build,
33};
34use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta};
35use unleash_types::client_metrics::ClientApplication;
36use url::Url;
37
38pub mod instance_data;
39pub mod tls;
40
41lazy_static! {
42 pub static ref CLIENT_REGISTER_FAILURES: IntGaugeVec = register_int_gauge_vec!(
43 Opts::new(
44 "client_register_failures",
45 "Why we failed to register upstream"
46 ),
47 &["status_code", "app_name", "instance_id"]
48 )
49 .unwrap();
50 pub static ref CLIENT_FEATURE_FETCH: HistogramVec = register_histogram_vec!(
51 "client_feature_fetch",
52 "Timings for fetching features in milliseconds",
53 &["status_code", "app_name", "instance_id"],
54 vec![
55 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0
56 ]
57 )
58 .unwrap();
59 pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!(
60 "client_feature_delta_fetch",
61 "Timings for fetching feature deltas in milliseconds",
62 &["status_code", "app_name", "instance_id"],
63 vec![
64 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0
65 ]
66 )
67 .unwrap();
68 pub static ref METRICS_UPLOAD: HistogramVec = register_histogram_vec!(
69 "client_metrics_upload",
70 "Timings for uploading client metrics in milliseconds",
71 &["status_code", "app_name", "instance_id"],
72 vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
73 )
74 .unwrap();
75 pub static ref INSTANCE_DATA_UPLOAD: HistogramVec = register_histogram_vec!(
76 "instance_data_upload",
77 "Timings for uploading Edge instance data in milliseconds",
78 &["status_code", "app_name", "instance_id"],
79 vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
80 )
81 .unwrap();
82 pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!(
83 Opts::new(
84 "client_feature_fetch_failures",
85 "Why we failed to fetch features"
86 ),
87 &["status_code", "app_name", "instance_id"]
88 )
89 .unwrap();
90 pub static ref TOKEN_VALIDATION_FAILURES: IntGaugeVec = register_int_gauge_vec!(
91 Opts::new(
92 "token_validation_failures",
93 "Why we failed to validate tokens"
94 ),
95 &["status_code", "app_name", "instance_id"]
96 )
97 .unwrap();
98 pub static ref UPSTREAM_VERSION: IntGaugeVec = register_int_gauge_vec!(
99 Opts::new(
100 "upstream_version",
101 "The server type (Unleash or Edge) and version of the upstream we're connected to"
102 ),
103 &["server", "version", "app_name", "instance_id"]
104 )
105 .unwrap();
106}
107
108#[cfg_attr(test, derive(Default))]
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct ClientMetaInformation {
111 pub app_name: String,
112 pub instance_id: Ulid,
113 pub connection_id: Ulid,
114}
115
116#[cfg_attr(test, derive(Default))]
117#[derive(Clone, Debug)]
118pub struct HttpClientArgs {
119 pub skip_ssl_verification: bool,
120 pub client_identity: Option<ClientIdentity>,
121 pub upstream_certificate_file: Option<PathBuf>,
122 pub connect_timeout: Duration,
123 pub socket_timeout: Duration,
124 pub keep_alive_timeout: Duration,
125 pub client_meta_information: ClientMetaInformation,
126}
127
128#[derive(Clone, Debug)]
129pub struct UnleashClient {
130 pub urls: UnleashUrls,
131 backing_client: Client,
132 custom_headers: HashMap<String, String>,
133 token_header: String,
134 meta_info: ClientMetaInformation,
135}
136
137fn load_pkcs12(id: &ClientIdentity) -> EdgeResult<Identity> {
138 let p12_file = fs::read(id.pkcs12_identity_file.clone().unwrap()).map_err(|e| {
139 EdgeError::ClientCertificateError(CertificateError::Pkcs12ArchiveNotFound(format!("{e:?}")))
140 })?;
141 let p12_keystore =
142 p12_keystore::KeyStore::from_pkcs12(&p12_file, &id.pkcs12_passphrase.clone().unwrap())
143 .map_err(|e| {
144 EdgeError::ClientCertificateError(CertificateError::Pkcs12ParseError(format!(
145 "{e:?}"
146 )))
147 })?;
148 let mut pem = vec![];
149 for (alias, entry) in p12_keystore.entries() {
150 debug!("P12 entry: {alias}");
151 match entry {
152 p12_keystore::KeyStoreEntry::Certificate(_) => {
153 info!(
154 "Direct Certificate, skipping. We want chain because client identity needs the private key"
155 );
156 }
157 p12_keystore::KeyStoreEntry::PrivateKeyChain(chain) => {
158 let key_pem = pkix::pem::der_to_pem(chain.key(), pkix::pem::PEM_PRIVATE_KEY);
159 pem.extend(key_pem.as_bytes());
160 pem.push(0x0a); for cert in chain.chain() {
162 let cert_pem = pkix::pem::der_to_pem(cert.as_der(), pkix::pem::PEM_CERTIFICATE);
163 pem.extend(cert_pem.as_bytes());
164 pem.push(0x0a); }
166 }
167 p12_keystore::KeyStoreEntry::Secret(_) => {
168 info!(
169 "Direct secret, skipping. We want chain because client identity needs the private key"
170 )
171 }
172 }
173 }
174
175 Identity::from_pem(&pem).map_err(|e| {
176 EdgeError::ClientCertificateError(CertificateError::Pkcs12X509Error(format!("{e:?}")))
177 })
178}
179
180fn load_pkcs8_identity(id: &ClientIdentity) -> EdgeResult<Vec<u8>> {
181 let cert = File::open(id.pkcs8_client_certificate_file.clone().unwrap()).map_err(|e| {
182 EdgeError::ClientCertificateError(CertificateError::Pem8ClientCertNotFound(format!("{e:}")))
183 })?;
184 let key = File::open(id.pkcs8_client_key_file.clone().unwrap()).map_err(|e| {
185 EdgeError::ClientCertificateError(CertificateError::Pem8ClientKeyNotFound(format!("{e:?}")))
186 })?;
187 let mut cert_reader = BufReader::new(cert);
188 let mut key_reader = BufReader::new(key);
189 let mut pem = vec![];
190 let _ = key_reader.read_to_end(&mut pem);
191 pem.push(0x0a);
192 let _ = cert_reader.read_to_end(&mut pem);
193 Ok(pem)
194}
195
196fn load_pkcs8(id: &ClientIdentity) -> EdgeResult<Identity> {
197 Identity::from_pem(&load_pkcs8_identity(id)?).map_err(|e| {
198 EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
199 "{e:?}"
200 )))
201 })
202}
203
204fn load_pem_identity(pem_file: PathBuf) -> EdgeResult<Vec<u8>> {
205 let mut pem = vec![];
206 let mut pem_reader = BufReader::new(File::open(pem_file).expect("No such file"));
207 let _ = pem_reader.read_to_end(&mut pem);
208 Ok(pem)
209}
210
211fn load_pem(id: &ClientIdentity) -> EdgeResult<Identity> {
212 Identity::from_pem(&load_pem_identity(id.pem_cert_file.clone().unwrap())?).map_err(|e| {
213 EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
214 "{e:?}"
215 )))
216 })
217}
218
219fn build_identity(tls: Option<ClientIdentity>) -> EdgeResult<ClientBuilder> {
220 tls.map_or_else(
221 || Ok(ClientBuilder::new().use_rustls_tls()),
222 |tls| {
223 let req_identity = if tls.pkcs12_identity_file.is_some() {
224 load_pkcs12(&tls)
226 } else if tls.pkcs8_client_certificate_file.is_some() {
227 load_pkcs8(&tls)
228 } else if tls.pem_cert_file.is_some() {
229 load_pem(&tls)
230 } else {
231 Err(EdgeError::ClientCertificateError(
232 CertificateError::NoCertificateFiles,
233 ))
234 };
235 req_identity.map(|id| ClientBuilder::new().use_rustls_tls().identity(id))
236 },
237 )
238}
239
240pub fn new_reqwest_client(args: HttpClientArgs) -> EdgeResult<Client> {
241 build_identity(args.client_identity)
242 .and_then(|builder| {
243 build_upstream_certificate(args.upstream_certificate_file).map(|cert| match cert {
244 Some(c) => builder.add_root_certificate(c),
245 None => builder,
246 })
247 })
248 .and_then(|client| {
249 let mut header_map = HeaderMap::new();
250 header_map.insert(
251 UNLEASH_APPNAME_HEADER,
252 header::HeaderValue::from_str(&args.client_meta_information.app_name)
253 .expect("Could not add app name as a header"),
254 );
255 header_map.insert(
256 UNLEASH_INSTANCE_ID_HEADER,
257 header::HeaderValue::from_str(
258 &args.client_meta_information.instance_id.to_string(),
259 )
260 .unwrap(),
261 );
262 header_map.insert(
263 UNLEASH_CONNECTION_ID_HEADER,
264 header::HeaderValue::from_str(
265 &args.client_meta_information.connection_id.to_string(),
266 )
267 .unwrap(),
268 );
269 header_map.insert(
270 UNLEASH_CLIENT_SPEC_HEADER,
271 header::HeaderValue::from_static(unleash_yggdrasil::SUPPORTED_SPEC_VERSION),
272 );
273
274 client
275 .user_agent(format!("unleash-edge-{}", build::PKG_VERSION))
276 .default_headers(header_map)
277 .danger_accept_invalid_certs(args.skip_ssl_verification)
278 .timeout(args.socket_timeout.to_std().unwrap())
279 .connect_timeout(args.connect_timeout.to_std().unwrap())
280 .tcp_keepalive(args.keep_alive_timeout.to_std().unwrap())
281 .pool_idle_timeout(std::time::Duration::from_secs(60))
282 .pool_max_idle_per_host(2)
283 .build()
284 .map_err(|e| EdgeError::ClientBuildError(format!("Failed to build client {e:?}")))
285 })
286}
287
288fn redact_token_header(header_map: HeaderMap) -> HashMap<String, String> {
289 header_map
290 .iter()
291 .map(|(k, v)| {
292 if k.as_str().to_lowercase().contains("authorization") {
293 let token = EdgeToken::try_from(v.clone());
294 if let Ok(token) = token {
295 (k.as_str().to_string(), format!("{token:?}"))
296 } else {
297 (k.as_str().to_string(), format!("{v:?}"))
298 }
299 } else {
300 (
301 k.as_str().to_string(),
302 v.to_str().unwrap_or("Unknown header value").to_string(),
303 )
304 }
305 })
306 .collect::<HashMap<String, String>>()
307}
308
309impl UnleashClient {
310 pub fn from_url_with_backing_client(
311 server_url: Url,
312 token_header: String,
313 backing_client: Client,
314 client_meta_information: ClientMetaInformation,
315 ) -> Self {
316 Self {
317 urls: UnleashUrls::from_base_url(server_url),
318 backing_client,
319 custom_headers: Default::default(),
320 token_header,
321 meta_info: client_meta_information,
322 }
323 }
324
325 #[cfg(test)]
326 pub fn new_insecure(server_url: &str) -> Result<Self, EdgeError> {
327 Ok(Self {
328 urls: UnleashUrls::from_str(server_url)?,
329 backing_client: new_reqwest_client(HttpClientArgs {
330 skip_ssl_verification: true,
331 client_meta_information: ClientMetaInformation::default(),
332 ..Default::default()
333 })
334 .unwrap(),
335 custom_headers: Default::default(),
336 token_header: "Authorization".to_string(),
337 meta_info: ClientMetaInformation::default(),
338 })
339 }
340
341 fn client_features_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
342 let mut client_req = self
343 .backing_client
344 .get(self.urls.client_features_url.to_string())
345 .headers(self.header_map(Some(req.api_key)));
346
347 if let Some(tag) = req.etag {
348 client_req = client_req.header(header::IF_NONE_MATCH, tag.to_string());
349 }
350
351 if let Some(interval) = req.interval {
352 client_req = client_req.header(UNLEASH_INTERVAL, interval.to_string());
353 }
354
355 client_req
356 }
357
358 fn client_features_delta_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
359 let client_req = self
360 .backing_client
361 .get(self.urls.client_features_delta_url.to_string())
362 .headers(self.header_map(Some(req.api_key)));
363 if let Some(tag) = req.etag {
364 client_req.header(header::IF_NONE_MATCH, tag.to_string())
365 } else {
366 client_req
367 }
368 }
369
370 fn header_map(&self, api_key: Option<String>) -> HeaderMap {
371 let mut header_map = HeaderMap::new();
372 let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
373 if let Some(key) = api_key {
374 header_map.insert(token_header, key.parse().unwrap());
375 }
376 for (header_name, header_value) in self.custom_headers.iter() {
377 let key = HeaderName::from_str(header_name.as_str()).unwrap();
378 header_map.insert(key, header_value.parse().unwrap());
379 }
380 header_map
381 }
382
383 pub fn with_custom_client_headers(self, custom_headers: Vec<(String, String)>) -> Self {
384 Self {
385 custom_headers: custom_headers.iter().cloned().collect(),
386 ..self
387 }
388 }
389
390 pub async fn register_as_client(
391 &self,
392 api_key: String,
393 application: ClientApplication,
394 ) -> EdgeResult<()> {
395 self.backing_client
396 .post(self.urls.client_register_app_url.to_string())
397 .headers(self.header_map(Some(api_key)))
398 .json(&application)
399 .send()
400 .await
401 .map_err(|e| {
402 warn!("Failed to register client: {e:?}");
403 EdgeError::ClientRegisterError
404 })
405 .map(|r| {
406 if !r.status().is_success() {
407 CLIENT_REGISTER_FAILURES
408 .with_label_values(&[
409 r.status().as_str(),
410 &self.meta_info.app_name,
411 &self.meta_info.instance_id.to_string(),
412 ])
413 .inc();
414 warn!(
415 "Failed to register client upstream with status code {}",
416 r.status()
417 );
418 }
419 })
420 }
421
422 pub async fn get_client_features(
423 &self,
424 request: ClientFeaturesRequest,
425 ) -> EdgeResult<ClientFeaturesResponse> {
426 let start_time = Utc::now();
427 let response = self
428 .client_features_req(request.clone())
429 .send()
430 .await
431 .map_err(|e| {
432 warn!("Failed to fetch. Due to [{e:?}] - Will retry");
433 match e.status() {
434 Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
435 None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
436 }
437 })?;
438 let stop_time = Utc::now();
439 CLIENT_FEATURE_FETCH
440 .with_label_values(&[
441 &response.status().as_u16().to_string(),
442 &self.meta_info.app_name,
443 &self.meta_info.instance_id.to_string(),
444 ])
445 .observe(
446 stop_time
447 .signed_duration_since(start_time)
448 .num_milliseconds() as f64,
449 );
450 if response.status() == StatusCode::NOT_MODIFIED {
451 Ok(ClientFeaturesResponse::NoUpdate(
452 request.etag.expect("Got NOT_MODIFIED without an ETag"),
453 ))
454 } else if response.status().is_success() {
455 let etag = response
456 .headers()
457 .get("ETag")
458 .or_else(|| response.headers().get("etag"))
459 .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
460 let features = response.json::<ClientFeatures>().await.map_err(|e| {
461 warn!("Could not parse features response to internal representation");
462 EdgeError::ClientFeaturesParseError(e.to_string())
463 })?;
464 Ok(ClientFeaturesResponse::Updated(features, etag))
465 } else if response.status() == StatusCode::FORBIDDEN {
466 CLIENT_FEATURE_FETCH_FAILURES
467 .with_label_values(&[
468 response.status().as_str(),
469 &self.meta_info.app_name,
470 &self.meta_info.instance_id.to_string(),
471 ])
472 .inc();
473 Err(EdgeError::ClientFeaturesFetchError(
474 FeatureError::AccessDenied,
475 ))
476 } else if response.status() == StatusCode::UNAUTHORIZED {
477 CLIENT_FEATURE_FETCH_FAILURES
478 .with_label_values(&[
479 response.status().as_str(),
480 &self.meta_info.app_name,
481 &self.meta_info.instance_id.to_string(),
482 ])
483 .inc();
484 warn!(
485 "Failed to get features. Url: [{}]. Status code: [401]",
486 self.urls.client_features_url.to_string()
487 );
488 Err(EdgeError::ClientFeaturesFetchError(
489 FeatureError::AccessDenied,
490 ))
491 } else if response.status() == StatusCode::NOT_FOUND {
492 CLIENT_FEATURE_FETCH_FAILURES
493 .with_label_values(&[
494 response.status().as_str(),
495 &self.meta_info.app_name,
496 &self.meta_info.instance_id.to_string(),
497 ])
498 .inc();
499 warn!(
500 "Failed to get features. Url: [{}]. Status code: [{}]",
501 self.urls.client_features_url.to_string(),
502 response.status().as_str()
503 );
504 Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
505 } else {
506 CLIENT_FEATURE_FETCH_FAILURES
507 .with_label_values(&[
508 response.status().as_str(),
509 &self.meta_info.app_name,
510 &self.meta_info.instance_id.to_string(),
511 ])
512 .inc();
513 Err(EdgeError::ClientFeaturesFetchError(
514 FeatureError::Retriable(response.status()),
515 ))
516 }
517 }
518
519 pub async fn get_client_features_delta(
520 &self,
521 request: ClientFeaturesRequest,
522 ) -> EdgeResult<ClientFeaturesDeltaResponse> {
523 let start_time = Utc::now();
524 let response = self
525 .client_features_delta_req(request.clone())
526 .send()
527 .await
528 .map_err(|e| {
529 warn!("Failed to fetch. Due to [{e:?}] - Will retry");
530 match e.status() {
531 Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
532 None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
533 }
534 })?;
535 let stop_time = Utc::now();
536 CLIENT_FEATURE_DELTA_FETCH
537 .with_label_values(&[
538 &response.status().as_u16().to_string(),
539 &self.meta_info.app_name,
540 &self.meta_info.instance_id.to_string(),
541 ])
542 .observe(
543 stop_time
544 .signed_duration_since(start_time)
545 .num_milliseconds() as f64,
546 );
547 if response.status() == StatusCode::NOT_MODIFIED {
548 Ok(ClientFeaturesDeltaResponse::NoUpdate(
549 request.etag.expect("Got NOT_MODIFIED without an ETag"),
550 ))
551 } else if response.status().is_success() {
552 let etag = response
553 .headers()
554 .get("ETag")
555 .or_else(|| response.headers().get("etag"))
556 .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
557 let features = response.json::<ClientFeaturesDelta>().await.map_err(|e| {
558 warn!("Could not parse features response to internal representation");
559 EdgeError::ClientFeaturesParseError(e.to_string())
560 })?;
561 Ok(ClientFeaturesDeltaResponse::Updated(features, etag))
562 } else if response.status() == StatusCode::FORBIDDEN {
563 CLIENT_FEATURE_FETCH_FAILURES
564 .with_label_values(&[
565 response.status().as_str(),
566 &self.meta_info.app_name,
567 &self.meta_info.instance_id.to_string(),
568 ])
569 .inc();
570 Err(EdgeError::ClientFeaturesFetchError(
571 FeatureError::AccessDenied,
572 ))
573 } else if response.status() == StatusCode::UNAUTHORIZED {
574 CLIENT_FEATURE_FETCH_FAILURES
575 .with_label_values(&[
576 response.status().as_str(),
577 &self.meta_info.app_name,
578 &self.meta_info.instance_id.to_string(),
579 ])
580 .inc();
581 warn!(
582 "Failed to get features. Url: [{}]. Status code: [401]",
583 self.urls.client_features_delta_url.to_string()
584 );
585 Err(EdgeError::ClientFeaturesFetchError(
586 FeatureError::AccessDenied,
587 ))
588 } else if response.status() == StatusCode::NOT_FOUND {
589 CLIENT_FEATURE_FETCH_FAILURES
590 .with_label_values(&[
591 response.status().as_str(),
592 &self.meta_info.app_name,
593 &self.meta_info.instance_id.to_string(),
594 ])
595 .inc();
596 warn!(
597 "Failed to get features. Url: [{}]. Status code: [{}]",
598 self.urls.client_features_delta_url.to_string(),
599 response.status().as_str()
600 );
601 Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
602 } else {
603 CLIENT_FEATURE_FETCH_FAILURES
604 .with_label_values(&[
605 response.status().as_str(),
606 &self.meta_info.app_name,
607 &self.meta_info.instance_id.to_string(),
608 ])
609 .inc();
610 Err(EdgeError::ClientFeaturesFetchError(
611 FeatureError::Retriable(response.status()),
612 ))
613 }
614 }
615
616 pub async fn send_heartbeat(
617 &self,
618 api_key: &EdgeToken,
619 connection_id: &Ulid,
620 ) -> EdgeResult<LicenseState> {
621 let response = self
622 .backing_client
623 .post(self.urls.heartbeat_url.to_string())
624 .query(&[("connectionId", connection_id)])
625 .headers(self.header_map(Some(api_key.token.clone())))
626 .send()
627 .await
628 .map_err(|e| {
629 EdgeError::HeartbeatError(
630 format!("{e}"),
631 e.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
632 )
633 })?;
634
635 let Ok(heartbeat_response) = response.json::<HeartbeatResponse>().await else {
636 return Err(EdgeError::HeartbeatError(
637 "Failed to parse heartbeat response".into(),
638 StatusCode::INTERNAL_SERVER_ERROR,
639 ));
640 };
641
642 Ok(heartbeat_response.edge_license_state)
643 }
644
645 pub async fn send_bulk_metrics_to_client_endpoint(
646 &self,
647 request: MetricsBatch,
648 token: &str,
649 ) -> EdgeResult<()> {
650 trace!("Sending metrics to bulk endpoint");
651 let started_at = Utc::now();
652 let headers = self.header_map(Some(token.to_string()));
653 debug!(
654 "Using headers: {headers:?}",
655 headers = redact_token_header(headers.clone())
656 );
657 let result = self
658 .backing_client
659 .post(self.urls.client_bulk_metrics_url.to_string())
660 .headers(self.header_map(Some(token.to_string())))
661 .json(&request)
662 .send()
663 .await
664 .map_err(|e| {
665 EdgeError::EdgeMetricsError(format!(
666 "Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}"
667 ))
668 })?;
669 let ended = Utc::now();
670 METRICS_UPLOAD
671 .with_label_values(&[
672 result.status().as_str(),
673 &self.meta_info.app_name,
674 &self.meta_info.instance_id.to_string(),
675 ])
676 .observe(ended.signed_duration_since(started_at).num_milliseconds() as f64);
677 if result.status().is_success() {
678 Ok(())
679 } else {
680 match result.status() {
681 StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
682 result.status(),
683 result.json().await.ok(),
684 )),
685 _ => Err(EdgeMetricsRequestError(result.status(), None)),
686 }
687 }
688 }
689
690 #[tracing::instrument(skip(self, instance_data, token))]
691 pub async fn post_edge_observability_data(
692 &self,
693 instance_data: EdgeInstanceData,
694 token: &str,
695 ) -> EdgeResult<()> {
696 let started_at = Utc::now();
697 let result = self
698 .backing_client
699 .post(self.urls.edge_instance_data_url.to_string())
700 .headers(self.header_map(Some(token.into())))
701 .timeout(Duration::seconds(3).to_std().unwrap())
702 .json(&instance_data)
703 .send()
704 .await
705 .map_err(|e| {
706 EdgeError::EdgeMetricsError(format!("Failed to send instance data: {e:?}"))
707 })?;
708 let ended_at = Utc::now();
709 INSTANCE_DATA_UPLOAD
710 .with_label_values(&[
711 result.status().as_str(),
712 &self.meta_info.app_name,
713 &self.meta_info.instance_id.to_string(),
714 ])
715 .observe(
716 ended_at
717 .signed_duration_since(started_at)
718 .num_milliseconds() as f64,
719 );
720 let r = if result.status().is_success() {
721 Ok(())
722 } else {
723 match result.status() {
724 StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
725 result.status(),
726 result.json().await.ok(),
727 )),
728 _ => Err(EdgeMetricsRequestError(result.status(), None)),
729 }
730 };
731 debug!("Sent instance data to upstream server");
732 r
733 }
734
735 pub async fn validate_tokens(
736 &self,
737 request: ValidateTokensRequest,
738 ) -> EdgeResult<Vec<EdgeToken>> {
739 let check_api_suffix = || {
740 let base_url = self.urls.base_url.to_string();
741 if base_url.ends_with("/api") || base_url.ends_with("/api/") {
742 error!("Try passing the instance URL without '/api'.");
743 }
744 };
745
746 let result = self
747 .backing_client
748 .post(self.urls.edge_validate_url.to_string())
749 .headers(self.header_map(None))
750 .json(&request)
751 .send()
752 .await
753 .map_err(|e| {
754 info!("Failed to validate tokens: [{e:?}]");
755 EdgeError::EdgeTokenError
756 })?;
757 match result.status() {
758 StatusCode::OK => {
759 let token_response = result.json::<EdgeTokens>().await.map_err(|e| {
760 warn!("Failed to parse validation response with error: {e:?}");
761 EdgeError::EdgeTokenParseError
762 })?;
763 Ok(token_response
764 .tokens
765 .into_iter()
766 .map(|t| {
767 let remaining_info =
768 EdgeToken::try_from(t.token.clone()).unwrap_or_else(|_| t.clone());
769 EdgeToken {
770 token: t.token.clone(),
771 token_type: t.token_type,
772 environment: t.environment.or(remaining_info.environment),
773 projects: t.projects,
774 status: TokenValidationStatus::Validated,
775 }
776 })
777 .collect())
778 }
779 s => {
780 TOKEN_VALIDATION_FAILURES
781 .with_label_values(&[
782 result.status().as_str(),
783 &self.meta_info.app_name,
784 &self.meta_info.instance_id.to_string(),
785 ])
786 .inc();
787 error!(
788 "Failed to validate tokens. Requested url: [{}]. Got status: {:?}",
789 self.urls.edge_validate_url.to_string(),
790 s
791 );
792 check_api_suffix();
793 Err(EdgeError::TokenValidationError(
794 StatusCode::from_u16(s.as_u16()).unwrap(),
795 ))
796 }
797 }
798 }
799}