unleash_edge_http_client/
lib.rs

1use crate::tls::build_upstream_certificate;
2use ahash::HashMap;
3use axum::http::{HeaderName, StatusCode};
4use chrono::{Duration, Utc};
5use etag::EntityTag;
6use lazy_static::lazy_static;
7use prometheus::{HistogramVec, IntGaugeVec, Opts, register_histogram_vec, register_int_gauge_vec};
8use reqwest::header::HeaderMap;
9use reqwest::{Client, ClientBuilder, Identity, RequestBuilder, header};
10use serde::{Deserialize, Serialize};
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, Read};
14use std::path::PathBuf;
15use std::str::FromStr;
16use tracing::{debug, error, info, trace, warn};
17use ulid::Ulid;
18use unleash_edge_cli::ClientIdentity;
19use unleash_edge_types::enterprise::{HeartbeatResponse, LicenseState};
20use unleash_edge_types::errors::EdgeError::EdgeMetricsRequestError;
21use unleash_edge_types::errors::{CertificateError, EdgeError, FeatureError};
22use unleash_edge_types::headers::{
23    UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_CONNECTION_ID_HEADER,
24    UNLEASH_INSTANCE_ID_HEADER, UNLEASH_INTERVAL,
25};
26use unleash_edge_types::metrics::batching::MetricsBatch;
27use unleash_edge_types::metrics::instance_data::EdgeInstanceData;
28use unleash_edge_types::tokens::EdgeToken;
29use unleash_edge_types::urls::UnleashUrls;
30use unleash_edge_types::{
31    ClientFeaturesDeltaResponse, ClientFeaturesRequest, ClientFeaturesResponse, EdgeResult,
32    EdgeTokens, TokenValidationStatus, ValidateTokensRequest, build,
33};
34use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta};
35use unleash_types::client_metrics::ClientApplication;
36use url::Url;
37
38pub mod instance_data;
39pub mod tls;
40
41lazy_static! {
42    pub static ref CLIENT_REGISTER_FAILURES: IntGaugeVec = register_int_gauge_vec!(
43        Opts::new(
44            "client_register_failures",
45            "Why we failed to register upstream"
46        ),
47        &["status_code", "app_name", "instance_id"]
48    )
49    .unwrap();
50    pub static ref CLIENT_FEATURE_FETCH: HistogramVec = register_histogram_vec!(
51        "client_feature_fetch",
52        "Timings for fetching features in milliseconds",
53        &["status_code", "app_name", "instance_id"],
54        vec![
55            1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0
56        ]
57    )
58    .unwrap();
59    pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!(
60        "client_feature_delta_fetch",
61        "Timings for fetching feature deltas in milliseconds",
62        &["status_code", "app_name", "instance_id"],
63        vec![
64            1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0
65        ]
66    )
67    .unwrap();
68    pub static ref METRICS_UPLOAD: HistogramVec = register_histogram_vec!(
69        "client_metrics_upload",
70        "Timings for uploading client metrics in milliseconds",
71        &["status_code", "app_name", "instance_id"],
72        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
73    )
74    .unwrap();
75    pub static ref INSTANCE_DATA_UPLOAD: HistogramVec = register_histogram_vec!(
76        "instance_data_upload",
77        "Timings for uploading Edge instance data in milliseconds",
78        &["status_code", "app_name", "instance_id"],
79        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
80    )
81    .unwrap();
82    pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!(
83        Opts::new(
84            "client_feature_fetch_failures",
85            "Why we failed to fetch features"
86        ),
87        &["status_code", "app_name", "instance_id"]
88    )
89    .unwrap();
90    pub static ref TOKEN_VALIDATION_FAILURES: IntGaugeVec = register_int_gauge_vec!(
91        Opts::new(
92            "token_validation_failures",
93            "Why we failed to validate tokens"
94        ),
95        &["status_code", "app_name", "instance_id"]
96    )
97    .unwrap();
98    pub static ref UPSTREAM_VERSION: IntGaugeVec = register_int_gauge_vec!(
99        Opts::new(
100            "upstream_version",
101            "The server type (Unleash or Edge) and version of the upstream we're connected to"
102        ),
103        &["server", "version", "app_name", "instance_id"]
104    )
105    .unwrap();
106}
107
108#[cfg_attr(test, derive(Default))]
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct ClientMetaInformation {
111    pub app_name: String,
112    pub instance_id: Ulid,
113    pub connection_id: Ulid,
114}
115
116#[cfg_attr(test, derive(Default))]
117#[derive(Clone, Debug)]
118pub struct HttpClientArgs {
119    pub skip_ssl_verification: bool,
120    pub client_identity: Option<ClientIdentity>,
121    pub upstream_certificate_file: Option<PathBuf>,
122    pub connect_timeout: Duration,
123    pub socket_timeout: Duration,
124    pub keep_alive_timeout: Duration,
125    pub client_meta_information: ClientMetaInformation,
126}
127
128#[derive(Clone, Debug)]
129pub struct UnleashClient {
130    pub urls: UnleashUrls,
131    backing_client: Client,
132    custom_headers: HashMap<String, String>,
133    token_header: String,
134    meta_info: ClientMetaInformation,
135}
136
137fn load_pkcs12(id: &ClientIdentity) -> EdgeResult<Identity> {
138    let p12_file = fs::read(id.pkcs12_identity_file.clone().unwrap()).map_err(|e| {
139        EdgeError::ClientCertificateError(CertificateError::Pkcs12ArchiveNotFound(format!("{e:?}")))
140    })?;
141    let p12_keystore =
142        p12_keystore::KeyStore::from_pkcs12(&p12_file, &id.pkcs12_passphrase.clone().unwrap())
143            .map_err(|e| {
144                EdgeError::ClientCertificateError(CertificateError::Pkcs12ParseError(format!(
145                    "{e:?}"
146                )))
147            })?;
148    let mut pem = vec![];
149    for (alias, entry) in p12_keystore.entries() {
150        debug!("P12 entry: {alias}");
151        match entry {
152            p12_keystore::KeyStoreEntry::Certificate(_) => {
153                info!(
154                    "Direct Certificate, skipping. We want chain because client identity needs the private key"
155                );
156            }
157            p12_keystore::KeyStoreEntry::PrivateKeyChain(chain) => {
158                let key_pem = pkix::pem::der_to_pem(chain.key(), pkix::pem::PEM_PRIVATE_KEY);
159                pem.extend(key_pem.as_bytes());
160                pem.push(0x0a); // Added new line
161                for cert in chain.chain() {
162                    let cert_pem = pkix::pem::der_to_pem(cert.as_der(), pkix::pem::PEM_CERTIFICATE);
163                    pem.extend(cert_pem.as_bytes());
164                    pem.push(0x0a); // Added new line
165                }
166            }
167            p12_keystore::KeyStoreEntry::Secret(_) => {
168                info!(
169                    "Direct secret, skipping. We want chain because client identity needs the private key"
170                )
171            }
172        }
173    }
174
175    Identity::from_pem(&pem).map_err(|e| {
176        EdgeError::ClientCertificateError(CertificateError::Pkcs12X509Error(format!("{e:?}")))
177    })
178}
179
180fn load_pkcs8_identity(id: &ClientIdentity) -> EdgeResult<Vec<u8>> {
181    let cert = File::open(id.pkcs8_client_certificate_file.clone().unwrap()).map_err(|e| {
182        EdgeError::ClientCertificateError(CertificateError::Pem8ClientCertNotFound(format!("{e:}")))
183    })?;
184    let key = File::open(id.pkcs8_client_key_file.clone().unwrap()).map_err(|e| {
185        EdgeError::ClientCertificateError(CertificateError::Pem8ClientKeyNotFound(format!("{e:?}")))
186    })?;
187    let mut cert_reader = BufReader::new(cert);
188    let mut key_reader = BufReader::new(key);
189    let mut pem = vec![];
190    let _ = key_reader.read_to_end(&mut pem);
191    pem.push(0x0a);
192    let _ = cert_reader.read_to_end(&mut pem);
193    Ok(pem)
194}
195
196fn load_pkcs8(id: &ClientIdentity) -> EdgeResult<Identity> {
197    Identity::from_pem(&load_pkcs8_identity(id)?).map_err(|e| {
198        EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
199            "{e:?}"
200        )))
201    })
202}
203
204fn load_pem_identity(pem_file: PathBuf) -> EdgeResult<Vec<u8>> {
205    let mut pem = vec![];
206    let mut pem_reader = BufReader::new(File::open(pem_file).expect("No such file"));
207    let _ = pem_reader.read_to_end(&mut pem);
208    Ok(pem)
209}
210
211fn load_pem(id: &ClientIdentity) -> EdgeResult<Identity> {
212    Identity::from_pem(&load_pem_identity(id.pem_cert_file.clone().unwrap())?).map_err(|e| {
213        EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
214            "{e:?}"
215        )))
216    })
217}
218
219fn build_identity(tls: Option<ClientIdentity>) -> EdgeResult<ClientBuilder> {
220    tls.map_or_else(
221        || Ok(ClientBuilder::new().use_rustls_tls()),
222        |tls| {
223            let req_identity = if tls.pkcs12_identity_file.is_some() {
224                // We're going to assume that we're using pkcs#12
225                load_pkcs12(&tls)
226            } else if tls.pkcs8_client_certificate_file.is_some() {
227                load_pkcs8(&tls)
228            } else if tls.pem_cert_file.is_some() {
229                load_pem(&tls)
230            } else {
231                Err(EdgeError::ClientCertificateError(
232                    CertificateError::NoCertificateFiles,
233                ))
234            };
235            req_identity.map(|id| ClientBuilder::new().use_rustls_tls().identity(id))
236        },
237    )
238}
239
240pub fn new_reqwest_client(args: HttpClientArgs) -> EdgeResult<Client> {
241    build_identity(args.client_identity)
242        .and_then(|builder| {
243            build_upstream_certificate(args.upstream_certificate_file).map(|cert| match cert {
244                Some(c) => builder.add_root_certificate(c),
245                None => builder,
246            })
247        })
248        .and_then(|client| {
249            let mut header_map = HeaderMap::new();
250            header_map.insert(
251                UNLEASH_APPNAME_HEADER,
252                header::HeaderValue::from_str(&args.client_meta_information.app_name)
253                    .expect("Could not add app name as a header"),
254            );
255            header_map.insert(
256                UNLEASH_INSTANCE_ID_HEADER,
257                header::HeaderValue::from_str(
258                    &args.client_meta_information.instance_id.to_string(),
259                )
260                .unwrap(),
261            );
262            header_map.insert(
263                UNLEASH_CONNECTION_ID_HEADER,
264                header::HeaderValue::from_str(
265                    &args.client_meta_information.connection_id.to_string(),
266                )
267                .unwrap(),
268            );
269            header_map.insert(
270                UNLEASH_CLIENT_SPEC_HEADER,
271                header::HeaderValue::from_static(unleash_yggdrasil::SUPPORTED_SPEC_VERSION),
272            );
273
274            client
275                .user_agent(format!("unleash-edge-{}", build::PKG_VERSION))
276                .default_headers(header_map)
277                .danger_accept_invalid_certs(args.skip_ssl_verification)
278                .timeout(args.socket_timeout.to_std().unwrap())
279                .connect_timeout(args.connect_timeout.to_std().unwrap())
280                .tcp_keepalive(args.keep_alive_timeout.to_std().unwrap())
281                .pool_idle_timeout(std::time::Duration::from_secs(60))
282                .pool_max_idle_per_host(2)
283                .build()
284                .map_err(|e| EdgeError::ClientBuildError(format!("Failed to build client {e:?}")))
285        })
286}
287
288fn redact_token_header(header_map: HeaderMap) -> HashMap<String, String> {
289    header_map
290        .iter()
291        .map(|(k, v)| {
292            if k.as_str().to_lowercase().contains("authorization") {
293                let token = EdgeToken::try_from(v.clone());
294                if let Ok(token) = token {
295                    (k.as_str().to_string(), format!("{token:?}"))
296                } else {
297                    (k.as_str().to_string(), format!("{v:?}"))
298                }
299            } else {
300                (
301                    k.as_str().to_string(),
302                    v.to_str().unwrap_or("Unknown header value").to_string(),
303                )
304            }
305        })
306        .collect::<HashMap<String, String>>()
307}
308
309impl UnleashClient {
310    pub fn from_url_with_backing_client(
311        server_url: Url,
312        token_header: String,
313        backing_client: Client,
314        client_meta_information: ClientMetaInformation,
315    ) -> Self {
316        Self {
317            urls: UnleashUrls::from_base_url(server_url),
318            backing_client,
319            custom_headers: Default::default(),
320            token_header,
321            meta_info: client_meta_information,
322        }
323    }
324
325    #[cfg(test)]
326    pub fn new_insecure(server_url: &str) -> Result<Self, EdgeError> {
327        Ok(Self {
328            urls: UnleashUrls::from_str(server_url)?,
329            backing_client: new_reqwest_client(HttpClientArgs {
330                skip_ssl_verification: true,
331                client_meta_information: ClientMetaInformation::default(),
332                ..Default::default()
333            })
334            .unwrap(),
335            custom_headers: Default::default(),
336            token_header: "Authorization".to_string(),
337            meta_info: ClientMetaInformation::default(),
338        })
339    }
340
341    fn client_features_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
342        let mut client_req = self
343            .backing_client
344            .get(self.urls.client_features_url.to_string())
345            .headers(self.header_map(Some(req.api_key)));
346
347        if let Some(tag) = req.etag {
348            client_req = client_req.header(header::IF_NONE_MATCH, tag.to_string());
349        }
350
351        if let Some(interval) = req.interval {
352            client_req = client_req.header(UNLEASH_INTERVAL, interval.to_string());
353        }
354
355        client_req
356    }
357
358    fn client_features_delta_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
359        let client_req = self
360            .backing_client
361            .get(self.urls.client_features_delta_url.to_string())
362            .headers(self.header_map(Some(req.api_key)));
363        if let Some(tag) = req.etag {
364            client_req.header(header::IF_NONE_MATCH, tag.to_string())
365        } else {
366            client_req
367        }
368    }
369
370    fn header_map(&self, api_key: Option<String>) -> HeaderMap {
371        let mut header_map = HeaderMap::new();
372        let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
373        if let Some(key) = api_key {
374            header_map.insert(token_header, key.parse().unwrap());
375        }
376        for (header_name, header_value) in self.custom_headers.iter() {
377            let key = HeaderName::from_str(header_name.as_str()).unwrap();
378            header_map.insert(key, header_value.parse().unwrap());
379        }
380        header_map
381    }
382
383    pub fn with_custom_client_headers(self, custom_headers: Vec<(String, String)>) -> Self {
384        Self {
385            custom_headers: custom_headers.iter().cloned().collect(),
386            ..self
387        }
388    }
389
390    pub async fn register_as_client(
391        &self,
392        api_key: String,
393        application: ClientApplication,
394    ) -> EdgeResult<()> {
395        self.backing_client
396            .post(self.urls.client_register_app_url.to_string())
397            .headers(self.header_map(Some(api_key)))
398            .json(&application)
399            .send()
400            .await
401            .map_err(|e| {
402                warn!("Failed to register client: {e:?}");
403                EdgeError::ClientRegisterError
404            })
405            .map(|r| {
406                if !r.status().is_success() {
407                    CLIENT_REGISTER_FAILURES
408                        .with_label_values(&[
409                            r.status().as_str(),
410                            &self.meta_info.app_name,
411                            &self.meta_info.instance_id.to_string(),
412                        ])
413                        .inc();
414                    warn!(
415                        "Failed to register client upstream with status code {}",
416                        r.status()
417                    );
418                }
419            })
420    }
421
422    pub async fn get_client_features(
423        &self,
424        request: ClientFeaturesRequest,
425    ) -> EdgeResult<ClientFeaturesResponse> {
426        let start_time = Utc::now();
427        let response = self
428            .client_features_req(request.clone())
429            .send()
430            .await
431            .map_err(|e| {
432                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
433                match e.status() {
434                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
435                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
436                }
437            })?;
438        let stop_time = Utc::now();
439        CLIENT_FEATURE_FETCH
440            .with_label_values(&[
441                &response.status().as_u16().to_string(),
442                &self.meta_info.app_name,
443                &self.meta_info.instance_id.to_string(),
444            ])
445            .observe(
446                stop_time
447                    .signed_duration_since(start_time)
448                    .num_milliseconds() as f64,
449            );
450        if response.status() == StatusCode::NOT_MODIFIED {
451            Ok(ClientFeaturesResponse::NoUpdate(
452                request.etag.expect("Got NOT_MODIFIED without an ETag"),
453            ))
454        } else if response.status().is_success() {
455            let etag = response
456                .headers()
457                .get("ETag")
458                .or_else(|| response.headers().get("etag"))
459                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
460            let features = response.json::<ClientFeatures>().await.map_err(|e| {
461                warn!("Could not parse features response to internal representation");
462                EdgeError::ClientFeaturesParseError(e.to_string())
463            })?;
464            Ok(ClientFeaturesResponse::Updated(features, etag))
465        } else if response.status() == StatusCode::FORBIDDEN {
466            CLIENT_FEATURE_FETCH_FAILURES
467                .with_label_values(&[
468                    response.status().as_str(),
469                    &self.meta_info.app_name,
470                    &self.meta_info.instance_id.to_string(),
471                ])
472                .inc();
473            Err(EdgeError::ClientFeaturesFetchError(
474                FeatureError::AccessDenied,
475            ))
476        } else if response.status() == StatusCode::UNAUTHORIZED {
477            CLIENT_FEATURE_FETCH_FAILURES
478                .with_label_values(&[
479                    response.status().as_str(),
480                    &self.meta_info.app_name,
481                    &self.meta_info.instance_id.to_string(),
482                ])
483                .inc();
484            warn!(
485                "Failed to get features. Url: [{}]. Status code: [401]",
486                self.urls.client_features_url.to_string()
487            );
488            Err(EdgeError::ClientFeaturesFetchError(
489                FeatureError::AccessDenied,
490            ))
491        } else if response.status() == StatusCode::NOT_FOUND {
492            CLIENT_FEATURE_FETCH_FAILURES
493                .with_label_values(&[
494                    response.status().as_str(),
495                    &self.meta_info.app_name,
496                    &self.meta_info.instance_id.to_string(),
497                ])
498                .inc();
499            warn!(
500                "Failed to get features. Url: [{}]. Status code: [{}]",
501                self.urls.client_features_url.to_string(),
502                response.status().as_str()
503            );
504            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
505        } else {
506            CLIENT_FEATURE_FETCH_FAILURES
507                .with_label_values(&[
508                    response.status().as_str(),
509                    &self.meta_info.app_name,
510                    &self.meta_info.instance_id.to_string(),
511                ])
512                .inc();
513            Err(EdgeError::ClientFeaturesFetchError(
514                FeatureError::Retriable(response.status()),
515            ))
516        }
517    }
518
519    pub async fn get_client_features_delta(
520        &self,
521        request: ClientFeaturesRequest,
522    ) -> EdgeResult<ClientFeaturesDeltaResponse> {
523        let start_time = Utc::now();
524        let response = self
525            .client_features_delta_req(request.clone())
526            .send()
527            .await
528            .map_err(|e| {
529                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
530                match e.status() {
531                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
532                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
533                }
534            })?;
535        let stop_time = Utc::now();
536        CLIENT_FEATURE_DELTA_FETCH
537            .with_label_values(&[
538                &response.status().as_u16().to_string(),
539                &self.meta_info.app_name,
540                &self.meta_info.instance_id.to_string(),
541            ])
542            .observe(
543                stop_time
544                    .signed_duration_since(start_time)
545                    .num_milliseconds() as f64,
546            );
547        if response.status() == StatusCode::NOT_MODIFIED {
548            Ok(ClientFeaturesDeltaResponse::NoUpdate(
549                request.etag.expect("Got NOT_MODIFIED without an ETag"),
550            ))
551        } else if response.status().is_success() {
552            let etag = response
553                .headers()
554                .get("ETag")
555                .or_else(|| response.headers().get("etag"))
556                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
557            let features = response.json::<ClientFeaturesDelta>().await.map_err(|e| {
558                warn!("Could not parse features response to internal representation");
559                EdgeError::ClientFeaturesParseError(e.to_string())
560            })?;
561            Ok(ClientFeaturesDeltaResponse::Updated(features, etag))
562        } else if response.status() == StatusCode::FORBIDDEN {
563            CLIENT_FEATURE_FETCH_FAILURES
564                .with_label_values(&[
565                    response.status().as_str(),
566                    &self.meta_info.app_name,
567                    &self.meta_info.instance_id.to_string(),
568                ])
569                .inc();
570            Err(EdgeError::ClientFeaturesFetchError(
571                FeatureError::AccessDenied,
572            ))
573        } else if response.status() == StatusCode::UNAUTHORIZED {
574            CLIENT_FEATURE_FETCH_FAILURES
575                .with_label_values(&[
576                    response.status().as_str(),
577                    &self.meta_info.app_name,
578                    &self.meta_info.instance_id.to_string(),
579                ])
580                .inc();
581            warn!(
582                "Failed to get features. Url: [{}]. Status code: [401]",
583                self.urls.client_features_delta_url.to_string()
584            );
585            Err(EdgeError::ClientFeaturesFetchError(
586                FeatureError::AccessDenied,
587            ))
588        } else if response.status() == StatusCode::NOT_FOUND {
589            CLIENT_FEATURE_FETCH_FAILURES
590                .with_label_values(&[
591                    response.status().as_str(),
592                    &self.meta_info.app_name,
593                    &self.meta_info.instance_id.to_string(),
594                ])
595                .inc();
596            warn!(
597                "Failed to get features. Url: [{}]. Status code: [{}]",
598                self.urls.client_features_delta_url.to_string(),
599                response.status().as_str()
600            );
601            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
602        } else {
603            CLIENT_FEATURE_FETCH_FAILURES
604                .with_label_values(&[
605                    response.status().as_str(),
606                    &self.meta_info.app_name,
607                    &self.meta_info.instance_id.to_string(),
608                ])
609                .inc();
610            Err(EdgeError::ClientFeaturesFetchError(
611                FeatureError::Retriable(response.status()),
612            ))
613        }
614    }
615
616    pub async fn send_heartbeat(
617        &self,
618        api_key: &EdgeToken,
619        connection_id: &Ulid,
620    ) -> EdgeResult<LicenseState> {
621        let response = self
622            .backing_client
623            .post(self.urls.heartbeat_url.to_string())
624            .query(&[("connectionId", connection_id)])
625            .headers(self.header_map(Some(api_key.token.clone())))
626            .send()
627            .await
628            .map_err(|e| {
629                EdgeError::HeartbeatError(
630                    format!("{e}"),
631                    e.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
632                )
633            })?;
634
635        let Ok(heartbeat_response) = response.json::<HeartbeatResponse>().await else {
636            return Err(EdgeError::HeartbeatError(
637                "Failed to parse heartbeat response".into(),
638                StatusCode::INTERNAL_SERVER_ERROR,
639            ));
640        };
641
642        Ok(heartbeat_response.edge_license_state)
643    }
644
645    pub async fn send_bulk_metrics_to_client_endpoint(
646        &self,
647        request: MetricsBatch,
648        token: &str,
649    ) -> EdgeResult<()> {
650        trace!("Sending metrics to bulk endpoint");
651        let started_at = Utc::now();
652        let headers = self.header_map(Some(token.to_string()));
653        debug!(
654            "Using headers: {headers:?}",
655            headers = redact_token_header(headers.clone())
656        );
657        let result = self
658            .backing_client
659            .post(self.urls.client_bulk_metrics_url.to_string())
660            .headers(self.header_map(Some(token.to_string())))
661            .json(&request)
662            .send()
663            .await
664            .map_err(|e| {
665                EdgeError::EdgeMetricsError(format!(
666                    "Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}"
667                ))
668            })?;
669        let ended = Utc::now();
670        METRICS_UPLOAD
671            .with_label_values(&[
672                result.status().as_str(),
673                &self.meta_info.app_name,
674                &self.meta_info.instance_id.to_string(),
675            ])
676            .observe(ended.signed_duration_since(started_at).num_milliseconds() as f64);
677        if result.status().is_success() {
678            Ok(())
679        } else {
680            match result.status() {
681                StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
682                    result.status(),
683                    result.json().await.ok(),
684                )),
685                _ => Err(EdgeMetricsRequestError(result.status(), None)),
686            }
687        }
688    }
689
690    #[tracing::instrument(skip(self, instance_data, token))]
691    pub async fn post_edge_observability_data(
692        &self,
693        instance_data: EdgeInstanceData,
694        token: &str,
695    ) -> EdgeResult<()> {
696        let started_at = Utc::now();
697        let result = self
698            .backing_client
699            .post(self.urls.edge_instance_data_url.to_string())
700            .headers(self.header_map(Some(token.into())))
701            .timeout(Duration::seconds(3).to_std().unwrap())
702            .json(&instance_data)
703            .send()
704            .await
705            .map_err(|e| {
706                EdgeError::EdgeMetricsError(format!("Failed to send instance data: {e:?}"))
707            })?;
708        let ended_at = Utc::now();
709        INSTANCE_DATA_UPLOAD
710            .with_label_values(&[
711                result.status().as_str(),
712                &self.meta_info.app_name,
713                &self.meta_info.instance_id.to_string(),
714            ])
715            .observe(
716                ended_at
717                    .signed_duration_since(started_at)
718                    .num_milliseconds() as f64,
719            );
720        let r = if result.status().is_success() {
721            Ok(())
722        } else {
723            match result.status() {
724                StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
725                    result.status(),
726                    result.json().await.ok(),
727                )),
728                _ => Err(EdgeMetricsRequestError(result.status(), None)),
729            }
730        };
731        debug!("Sent instance data to upstream server");
732        r
733    }
734
735    pub async fn validate_tokens(
736        &self,
737        request: ValidateTokensRequest,
738    ) -> EdgeResult<Vec<EdgeToken>> {
739        let check_api_suffix = || {
740            let base_url = self.urls.base_url.to_string();
741            if base_url.ends_with("/api") || base_url.ends_with("/api/") {
742                error!("Try passing the instance URL without '/api'.");
743            }
744        };
745
746        let result = self
747            .backing_client
748            .post(self.urls.edge_validate_url.to_string())
749            .headers(self.header_map(None))
750            .json(&request)
751            .send()
752            .await
753            .map_err(|e| {
754                info!("Failed to validate tokens: [{e:?}]");
755                EdgeError::EdgeTokenError
756            })?;
757        match result.status() {
758            StatusCode::OK => {
759                let token_response = result.json::<EdgeTokens>().await.map_err(|e| {
760                    warn!("Failed to parse validation response with error: {e:?}");
761                    EdgeError::EdgeTokenParseError
762                })?;
763                Ok(token_response
764                    .tokens
765                    .into_iter()
766                    .map(|t| {
767                        let remaining_info =
768                            EdgeToken::try_from(t.token.clone()).unwrap_or_else(|_| t.clone());
769                        EdgeToken {
770                            token: t.token.clone(),
771                            token_type: t.token_type,
772                            environment: t.environment.or(remaining_info.environment),
773                            projects: t.projects,
774                            status: TokenValidationStatus::Validated,
775                        }
776                    })
777                    .collect())
778            }
779            s => {
780                TOKEN_VALIDATION_FAILURES
781                    .with_label_values(&[
782                        result.status().as_str(),
783                        &self.meta_info.app_name,
784                        &self.meta_info.instance_id.to_string(),
785                    ])
786                    .inc();
787                error!(
788                    "Failed to validate tokens. Requested url: [{}]. Got status: {:?}",
789                    self.urls.edge_validate_url.to_string(),
790                    s
791                );
792                check_api_suffix();
793                Err(EdgeError::TokenValidationError(
794                    StatusCode::from_u16(s.as_u16()).unwrap(),
795                ))
796            }
797        }
798    }
799}