unleash-edge-http-client 20.1.1

Unleash edge is a proxy for Unleash. It can return both evaluated feature toggles as well as the raw data from Unleash's client API
Documentation
use reqwest::{StatusCode, Url};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::{ClientMetaInformation, UnleashClient};
use tracing::{debug, warn};
use unleash_edge_cli::{CliArgs, EdgeMode};
use unleash_edge_types::BackgroundTask;
use unleash_edge_types::errors::EdgeError;
use unleash_edge_types::metrics::instance_data::EdgeInstanceData;

#[derive(Debug, Clone)]
pub struct InstanceDataSender {
    pub unleash_client: Arc<UnleashClient>,
    pub token: String,
    pub base_path: String,
}

#[derive(Debug, Clone)]
pub enum InstanceDataSending {
    SendNothing,
    SendInstanceData(InstanceDataSender),
}

impl InstanceDataSending {
    pub fn from_args(
        args: CliArgs,
        client_meta_information: &ClientMetaInformation,
        http_client: reqwest::Client,
    ) -> Result<Self, EdgeError> {
        match args.mode {
            EdgeMode::Edge(edge_args) => edge_args
                .tokens
                .first()
                .map(|token| {
                    let unleash_client = Url::parse(&edge_args.upstream_url.clone())
                        .map(|url| {
                            UnleashClient::from_url_with_backing_client(
                                url,
                                args.auth_headers
                                    .upstream_auth_header
                                    .clone()
                                    .unwrap_or("Authorization".to_string()),
                                http_client,
                                client_meta_information.clone(),
                            )
                        })
                        .map(|c| {
                            c.with_custom_client_headers(edge_args.custom_client_headers.clone())
                        })
                        .map(Arc::new)
                        .map_err(|_| EdgeError::InvalidServerUrl(edge_args.upstream_url.clone()))
                        .expect("Could not construct UnleashClient");
                    let instance_data_sender = InstanceDataSender {
                        unleash_client,
                        token: token.clone(),
                        base_path: args.http.base_path.clone(),
                    };
                    InstanceDataSending::SendInstanceData(instance_data_sender)
                })
                .map(Ok)
                .unwrap_or(Ok(InstanceDataSending::SendNothing)),
            _ => Ok(InstanceDataSending::SendNothing),
        }
    }
}

#[derive(Debug)]
enum InstanceDataSendError {
    Backoff(String),
    Unexpected(String),
}

async fn send_instance_data(
    instance_data_sender: &Arc<InstanceDataSending>,
    our_instance_data: &Arc<EdgeInstanceData>,
    downstream_instance_data: &Arc<RwLock<Vec<EdgeInstanceData>>>,
) -> Result<(), InstanceDataSendError> {
    match instance_data_sender.as_ref() {
        InstanceDataSending::SendNothing => {
            debug!("No instance data sender found. Doing nothing.");
            Ok(())
        }
        InstanceDataSending::SendInstanceData(instance_data_sender) => {
            let observed_data = our_instance_data.observe(
                downstream_instance_data.read().await.clone(),
                &instance_data_sender.base_path,
            );
            let status = instance_data_sender
                .unleash_client
                .post_edge_observability_data(observed_data, &instance_data_sender.token)
                .await;

            if let Err(e) = status {
                match e {
                    EdgeError::EdgeMetricsRequestError(status, _) => {
                        match status {
                            StatusCode::NOT_FOUND => {
                                downstream_instance_data.write().await.clear();
                                our_instance_data.clear_time_windowed_metrics();
                                Err(InstanceDataSendError::Backoff("Our upstream is not running a version that supports edge metrics.".into()))
                            }
                            StatusCode::FORBIDDEN => {
                                downstream_instance_data.write().await.clear();
                                our_instance_data.clear_time_windowed_metrics();
                                Err(InstanceDataSendError::Backoff("Upstream edge metrics said our token wasn't allowed to post data".into()))
                            }
                            _ => Err(InstanceDataSendError::Unexpected(format!(
                                "Failed to post instance data due to unknown error {e:?}"
                            ))),
                        }
                    }
                    _ => Err(InstanceDataSendError::Unexpected(format!(
                        "Failed to post instance data due to unknown error {e:?}"
                    ))),
                }
            } else {
                downstream_instance_data.write().await.clear();
                our_instance_data.clear_time_windowed_metrics();
                Ok(())
            }
        }
    }
}

pub fn create_once_off_send_instance_data(
    instance_data_sender: Arc<InstanceDataSending>,
    our_instance_data: Arc<EdgeInstanceData>,
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
) -> BackgroundTask {
    let instance_data_sender = instance_data_sender.clone();
    let our_instance_data = our_instance_data.clone();
    let downstream_instance_data = downstream_instance_data.clone();

    Box::pin(async move {
        let result = send_instance_data(
            &instance_data_sender,
            &our_instance_data,
            &downstream_instance_data,
        )
        .await;

        if let Err(err) = result {
            warn!("Failed to send last set of instance data during graceful exit: {err:?}");
        }
    })
}

pub fn create_send_instance_data_task(
    instance_data_sender: Arc<InstanceDataSending>,
    our_instance_data: Arc<EdgeInstanceData>,
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
) -> BackgroundTask {
    let mut errors = 0;
    let delay = std::time::Duration::from_secs(60);
    Box::pin(async move {
        loop {
            tokio::time::sleep(
                std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10),
            )
            .await;

            let result = send_instance_data(
                &instance_data_sender,
                &our_instance_data,
                &downstream_instance_data,
            )
            .await;
            match result {
                Ok(_) => {
                    debug!("Successfully posted observability metrics.");
                    errors = 0;
                    downstream_instance_data.write().await.clear();
                    our_instance_data.clear_time_windowed_metrics();
                }
                Err(err) => match err {
                    InstanceDataSendError::Backoff(message) => {
                        warn!(message);
                        errors += 1;
                    }
                    InstanceDataSendError::Unexpected(message) => {
                        warn!(message);
                    }
                },
            }
        }
    })
}