unleash-edge 0.0.2

Unleash edge is a proxy for Unleash. It can return both evaluated feature toggles as well as the raw data from Unleash's client API
Documentation
use std::sync::Arc;

use reqwest::Url;
use tokio::sync::mpsc;

use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

use crate::{
    auth::token_validator::TokenValidator,
    cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs},
    http::unleash_client::UnleashClient,
    types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken},
};

use super::{
    memory_provider::MemoryProvider, offline_provider::OfflineProvider,
    redis_provider::RedisProvider,
};

pub type DataProviderPair = (Arc<RwLock<dyn EdgeSource>>, Arc<RwLock<dyn EdgeSink>>);

pub struct RepositoryInfo {
    pub source: Arc<RwLock<dyn EdgeSource>>,
    pub sink_info: Option<SinkInfo>,
}

pub struct SinkInfo {
    pub sink: Arc<RwLock<dyn EdgeSink>>,
    pub validated_send: mpsc::Sender<EdgeToken>,
    pub validated_receive: mpsc::Receiver<EdgeToken>,
    pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
    pub unleash_client: UnleashClient,
    pub token_validator: Arc<RwLock<TokenValidator>>,
    pub metrics_interval_seconds: u64,
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<Arc<RwLock<dyn EdgeSource>>> {
    let provider =
        OfflineProvider::instantiate_provider(offline_args.bootstrap_file, offline_args.tokens)?;
    let provider = Arc::new(RwLock::new(provider));
    Ok(provider)
}

fn build_memory(features_refresh_interval_seconds: i64) -> EdgeResult<DataProviderPair> {
    let data_source = Arc::new(RwLock::new(MemoryProvider::new(
        features_refresh_interval_seconds,
    )));
    Ok((data_source.clone(), data_source))
}

fn build_redis(redis_url: String, _sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
    let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?));
    Ok((data_source.clone(), data_source))
}

pub async fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
    match args.mode {
        EdgeMode::Offline(offline_args) => {
            let source = build_offline(offline_args)?;
            Ok(RepositoryInfo {
                source,
                sink_info: None,
            })
        }
        EdgeMode::Edge(edge_args) => {
            let arg: EdgeArg = edge_args.clone().into();
            let unleash_client = UnleashClient::from_url(
                Url::parse(edge_args.upstream_url.as_str()).expect("Cannot parse Upstream URL"),
            );
            let (unvalidated_sender, unvalidated_receiver) = mpsc::channel::<EdgeToken>(32);
            let (validated_sender, validated_receiver) = mpsc::channel::<EdgeToken>(32);
            let (source, sink) = match arg {
                EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender),
                EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds),
            }?;
            let mut token_validator = TokenValidator {
                unleash_client: Arc::new(unleash_client.clone()),
                edge_source: source.clone(),
                edge_sink: sink.clone(),
            };
            if !edge_args.tokens.is_empty() {
                let _ = token_validator.register_tokens(edge_args.tokens).await;
            }
            Ok(RepositoryInfo {
                source,
                sink_info: Some(SinkInfo {
                    sink,
                    validated_send: validated_sender,
                    validated_receive: validated_receiver,
                    unvalidated_receive: unvalidated_receiver,
                    unleash_client,
                    token_validator: Arc::new(RwLock::new(token_validator)),
                    metrics_interval_seconds: edge_args.metrics_interval_seconds,
                }),
            })
        }
    }
}