unleash-edge 19.2.1

Unleash edge is a proxy for Unleash. It can return both evaluated feature toggles as well as the raw data from Unleash's client API
use std::cmp::max;
use std::sync::Arc;

use actix_web::http::StatusCode;
use chrono::Duration;
use dashmap::DashMap;
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use tracing::{error, info, trace, warn};

use crate::types::TokenRefresh;
use crate::{
    error::EdgeError,
    metrics::client_metrics::{size_of_batch, MetricsCache},
};

use super::feature_refresher::FeatureRefresher;

lazy_static! {
    pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!(
        Opts::new(
            "metrics_upstream_http_errors",
            "Failing requests against upstream metrics endpoint"
        ),
        &["status_code"]
    )
    .unwrap();
    pub static ref METRICS_UNEXPECTED_ERRORS: IntGauge =
        register_int_gauge!(Opts::new("metrics_send_error", "Failures to send metrics")).unwrap();
    pub static ref METRICS_UPSTREAM_OUTDATED: IntGaugeVec = register_int_gauge_vec!(
        Opts::new(
            "metrics_upstream_outdated",
            "Number of times we have tried to send metrics to an outdated endpoint"
        ),
        &["environment"]
    )
    .unwrap();
    pub static ref METRICS_UPSTREAM_CLIENT_BULK: IntGaugeVec = register_int_gauge_vec!(
        Opts::new(
            "metrics_upstream_client_bulk",
            "Number of times we have tried to send metrics to the client bulk endpoint"
        ),
        &["environment"]
    )
    .unwrap();
    pub static ref METRICS_INTERVAL_BETWEEN_SEND: IntGauge = register_int_gauge!(Opts::new(
        "metrics_interval_between_send",
        "Interval between sending metrics"
    ))
    .unwrap();
}

fn decide_where_to_post(
    environment: &String,
    known_tokens: Arc<DashMap<String, TokenRefresh>>,
) -> (bool, String) {
    if let Some(token_refresh) = known_tokens
        .iter()
        .find(|t| t.token.environment == Some(environment.to_string()))
    {
        METRICS_UPSTREAM_CLIENT_BULK
            .with_label_values(&[environment])
            .inc();
        (true, token_refresh.token.token.clone())
    } else {
        (false, "".into())
    }
}

pub async fn send_metrics_task(
    metrics_cache: Arc<MetricsCache>,
    feature_refresher: Arc<FeatureRefresher>,
    send_interval: i64,
) {
    let mut failures = 0;
    let mut interval = Duration::seconds(send_interval);
    loop {
        trace!("Looping metrics");
        let envs = metrics_cache.get_metrics_by_environment();
        for (env, batch) in envs.iter() {
            let (use_new_endpoint, token) =
                decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone());
            let batches = metrics_cache.get_appropriately_sized_env_batches(batch);
            trace!("Posting {} batches for {env}", batches.len());
            for batch in batches {
                if !batch.applications.is_empty() || !batch.metrics.is_empty() {
                    let result = if use_new_endpoint {
                        feature_refresher
                            .unleash_client
                            .send_bulk_metrics_to_client_endpoint(batch.clone(), &token)
                            .await
                    } else {
                        feature_refresher
                            .unleash_client
                            .send_batch_metrics(batch.clone())
                            .await
                    };
                    if let Err(edge_error) = result {
                        match edge_error {
                            EdgeError::EdgeMetricsRequestError(status_code, message) => {
                                METRICS_UPSTREAM_HTTP_ERRORS
                                    .with_label_values(&[status_code.as_str()])
                                    .inc();
                                match status_code {
                                    StatusCode::PAYLOAD_TOO_LARGE => error!(
                                        "Metrics were too large. They were {}",
                                        size_of_batch(&batch)
                                    ),
                                    StatusCode::BAD_REQUEST => {
                                        error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory");
                                    }
                                    StatusCode::NOT_FOUND => {
                                        failures = 10;
                                        interval = new_interval(send_interval, failures);
                                        error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.num_seconds());
                                    }
                                    StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
                                        failures = 10;
                                        interval = new_interval(send_interval, failures);
                                        error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.num_seconds());
                                    }
                                    StatusCode::TOO_MANY_REQUESTS => {
                                        failures = max(10, failures + 1);
                                        interval = new_interval(send_interval, failures);
                                        info!(
                                            "Upstream said it was too busy, backing off to {} seconds",
                                            interval.num_seconds()
                                        );
                                        metrics_cache.reinsert_batch(batch);
                                    }
                                    StatusCode::INTERNAL_SERVER_ERROR
                                    | StatusCode::BAD_GATEWAY
                                    | StatusCode::SERVICE_UNAVAILABLE
                                    | StatusCode::GATEWAY_TIMEOUT => {
                                        failures = max(10, failures + 1);
                                        interval = new_interval(send_interval, failures);
                                        info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.num_seconds());
                                        metrics_cache.reinsert_batch(batch);
                                    }
                                    _ => {
                                        warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt");
                                        metrics_cache.reinsert_batch(batch);
                                    }
                                }
                            }
                            _ => {
                                warn!("Failed to send metrics: {edge_error:?}");
                                METRICS_UNEXPECTED_ERRORS.inc();
                            }
                        }
                    } else {
                        failures = max(0, failures - 1);
                        interval = new_interval(send_interval, failures);
                    }
                }
            }
        }
        trace!(
            "Done posting traces. Sleeping for {} seconds and then going again",
            interval.num_seconds()
        );
        METRICS_INTERVAL_BETWEEN_SEND.set(interval.num_seconds());
        tokio::time::sleep(std::time::Duration::from_secs(interval.num_seconds() as u64)).await;
    }
}

fn new_interval(send_interval: i64, failures: i64) -> Duration {
    let added_interval_from_failure = send_interval * failures;
    Duration::seconds(send_interval + added_interval_from_failure)
}

#[cfg(test)]
mod tests {
    use crate::http::background_send_metrics::new_interval;

    #[tokio::test]
    pub async fn new_interval_does_not_overflow() {
        let metrics = new_interval(300, 10);
        assert!(metrics.num_seconds() < 3305);
    }
}