use chrono::Duration;
use dashmap::DashMap;
use reqwest::Url;
use std::fs::File;
use std::sync::Arc;
use std::{io::BufReader, str::FromStr};
use crate::persistence::file::FilePersister;
use crate::persistence::redis::RedisPersister;
use crate::persistence::EdgePersistence;
use crate::{
auth::token_validator::TokenValidator,
cli::{CliArgs, EdgeArgs, EdgeMode, OfflineArgs},
error::EdgeError,
http::{feature_refresher::FeatureRefresher, unleash_client::UnleashClient},
types::{EdgeResult, EdgeToken, TokenType},
};
use unleash_types::client_features::ClientFeatures;
use unleash_yggdrasil::EngineState;
type CacheContainer = (
Arc<DashMap<String, EdgeToken>>,
Arc<DashMap<String, ClientFeatures>>,
Arc<DashMap<String, EngineState>>,
);
type EdgeInfo = (
CacheContainer,
Option<Arc<TokenValidator>>,
Option<Arc<FeatureRefresher>>,
Option<Arc<dyn EdgePersistence>>,
);
fn build_caches() -> CacheContainer {
let token_cache: DashMap<String, EdgeToken> = DashMap::default();
let features_cache: DashMap<String, ClientFeatures> = DashMap::default();
let engine_cache: DashMap<String, EngineState> = DashMap::default();
(
Arc::new(token_cache),
Arc::new(features_cache),
Arc::new(engine_cache),
)
}
async fn hydrate_from_persistent_storage(
cache: CacheContainer,
feature_refresher: Arc<FeatureRefresher>,
storage: Arc<dyn EdgePersistence>,
) {
let (token_cache, features_cache, engine_cache) = cache;
let tokens = storage.load_tokens().await.unwrap_or_default();
let features = storage.load_features().await.unwrap_or_default();
let refresh_targets = storage.load_refresh_targets().await.unwrap_or_default();
for token in tokens {
tracing::debug!("Hydrating tokens {token:?}");
token_cache.insert(token.token.clone(), token);
}
for (key, features) in features {
tracing::debug!("Hydrating features for {key:?}");
features_cache.insert(key.clone(), features.clone());
let mut engine_state = EngineState::default();
engine_state.take_state(features);
engine_cache.insert(key, engine_state);
}
for target in refresh_targets {
tracing::debug!("Hydrating refresh target for {target:?}");
feature_refresher
.tokens_to_refresh
.insert(target.token.token.clone(), target);
}
}
pub(crate) fn build_offline_mode(
client_features: ClientFeatures,
tokens: Vec<String>,
) -> EdgeResult<CacheContainer> {
let (token_cache, features_cache, engine_cache) = build_caches();
let edge_tokens: Vec<EdgeToken> = tokens
.iter()
.map(|token| EdgeToken::from_str(token).unwrap_or_else(|_| EdgeToken::offline_token(token)))
.collect();
for edge_token in edge_tokens {
token_cache.insert(edge_token.token.clone(), edge_token.clone());
features_cache.insert(
crate::tokens::cache_key(&edge_token),
client_features.clone(),
);
let mut engine_state = EngineState::default();
engine_state.take_state(client_features.clone());
engine_cache.insert(crate::tokens::cache_key(&edge_token), engine_state);
}
Ok((token_cache, features_cache, engine_cache))
}
fn build_offline(offline_args: OfflineArgs) -> EdgeResult<CacheContainer> {
if let Some(bootstrap) = offline_args.bootstrap_file {
let file = File::open(bootstrap.clone()).map_err(|_| EdgeError::NoFeaturesFile)?;
let reader = BufReader::new(file);
let client_features: ClientFeatures = serde_json::from_reader(reader).map_err(|e| {
let path = format!("{}", bootstrap.clone().display());
EdgeError::InvalidBackupFile(path, e.to_string())
})?;
build_offline_mode(client_features, offline_args.tokens)
} else {
Err(EdgeError::NoFeaturesFile)
}
}
async fn get_data_source(args: &EdgeArgs) -> Option<Arc<dyn EdgePersistence>> {
if let Some(redis_url) = args.redis_url.clone() {
let redis_client = RedisPersister::new(&redis_url).expect("Failed to connect to Redis");
return Some(Arc::new(redis_client));
}
if let Some(backup_folder) = args.backup_folder.clone() {
let backup_client = FilePersister::new(&backup_folder);
return Some(Arc::new(backup_client));
}
None
}
async fn build_edge(args: &EdgeArgs) -> EdgeResult<EdgeInfo> {
let (token_cache, feature_cache, engine_cache) = build_caches();
let persistence = get_data_source(args).await;
let unleash_client = Url::parse(&args.upstream_url.clone())
.map(UnleashClient::from_url)
.map(Arc::new)
.map_err(|_| EdgeError::InvalidServerUrl(args.upstream_url.clone()))?;
let token_validator = Arc::new(TokenValidator {
token_cache: token_cache.clone(),
unleash_client: unleash_client.clone(),
persistence: persistence.clone(),
});
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
feature_cache.clone(),
engine_cache.clone(),
Duration::seconds(args.features_refresh_interval_seconds),
persistence.clone(),
));
let _ = token_validator.register_tokens(args.tokens.clone()).await;
if let Some(persistence) = persistence.clone() {
hydrate_from_persistent_storage(
(
token_cache.clone(),
feature_cache.clone(),
engine_cache.clone(),
),
feature_refresher.clone(),
persistence,
)
.await;
}
for validated_token in token_cache
.iter()
.filter(|candidate| candidate.value().token_type == Some(TokenType::Client))
{
let _ = feature_refresher
.register_token_for_refresh(validated_token.clone())
.await;
}
Ok((
(token_cache, feature_cache, engine_cache),
Some(token_validator),
Some(feature_refresher),
persistence,
))
}
pub async fn build_caches_and_refreshers(args: CliArgs) -> EdgeResult<EdgeInfo> {
match args.mode {
EdgeMode::Offline(offline_args) => {
build_offline(offline_args).map(|cache| (cache, None, None, None))
}
EdgeMode::Edge(edge_args) => build_edge(&edge_args).await,
}
}