use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use aws_smithy_types::Document;
use open_feature::{
provider::FeatureProvider,
provider::{ProviderMetadata, ProviderStatus, ResolutionDetails},
EvaluationContext, EvaluationResult, StructValue,
};
use serde_json::{Map, Value};
use superposition_sdk::{Client, Config as SdkConfig};
use tokio::sync::RwLock;
use crate::traits::{AllFeatureProvider, FeatureExperimentMeta};
use crate::{conversions, types::*};
struct CacheEntry {
value: Map<String, Value>,
created_at: Instant,
}
struct ResponseCache {
entries: HashMap<String, CacheEntry>,
max_entries: usize,
ttl: Duration,
}
impl ResponseCache {
fn new(max_entries: usize, ttl: Duration) -> Self {
Self {
entries: HashMap::new(),
max_entries,
ttl,
}
}
fn get(&self, key: &str) -> Option<&Map<String, Value>> {
self.entries.get(key).and_then(|entry| {
if entry.created_at.elapsed() < self.ttl {
Some(&entry.value)
} else {
None
}
})
}
fn put(&mut self, key: String, value: Map<String, Value>) {
if self.entries.len() >= self.max_entries {
let now = Instant::now();
self.entries
.retain(|_, entry| now.duration_since(entry.created_at) < self.ttl);
}
if self.entries.len() >= self.max_entries {
if let Some(oldest_key) = self
.entries
.iter()
.min_by_key(|(_, entry)| entry.created_at)
.map(|(k, _)| k.clone())
{
self.entries.remove(&oldest_key);
}
}
self.entries.insert(
key,
CacheEntry {
value,
created_at: Instant::now(),
},
);
}
fn cache_key(context: &EvaluationContext) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(ref tk) = context.targeting_key {
parts.push(format!("tk={}", tk));
}
let mut field_keys: Vec<_> = context.custom_fields.keys().cloned().collect();
field_keys.sort();
for k in field_keys {
if let Some(v) = context.custom_fields.get(&k) {
let serde_val = conversions::evaluation_context_to_value(v.clone());
parts.push(format!("{}={}", k, serde_val));
}
}
parts.join("|")
}
}
pub struct SuperpositionAPIProvider {
options: SuperpositionOptions,
cache: Option<Arc<RwLock<ResponseCache>>>,
global_context: RwLock<EvaluationContext>,
metadata: ProviderMetadata,
status: RwLock<ProviderStatus>,
client: Client,
}
fn create_client(options: &SuperpositionOptions) -> Client {
let sdk_config = SdkConfig::builder()
.endpoint_url(&options.endpoint)
.bearer_token(options.token.clone().into())
.behavior_version_latest()
.build();
Client::from_conf(sdk_config)
}
impl SuperpositionAPIProvider {
pub fn new(options: SuperpositionOptions) -> Self {
Self {
client: create_client(&options),
options,
cache: None,
global_context: RwLock::new(EvaluationContext::default()),
metadata: ProviderMetadata {
name: "SuperpositionAPIProvider".to_string(),
},
status: RwLock::new(ProviderStatus::NotReady),
}
}
pub fn with_cache(
options: SuperpositionOptions,
cache_options: CacheOptions,
) -> Self {
let max_entries = cache_options.size.unwrap_or(1000);
let ttl = Duration::from_secs(cache_options.ttl.unwrap_or(300));
let cache = ResponseCache::new(max_entries, ttl);
Self {
client: create_client(&options),
options,
cache: Some(Arc::new(RwLock::new(cache))),
global_context: RwLock::new(EvaluationContext::default()),
metadata: ProviderMetadata {
name: "SuperpositionAPIProvider".to_string(),
},
status: RwLock::new(ProviderStatus::NotReady),
}
}
async fn resolve_remote(
&self,
mut context: EvaluationContext,
prefix_filter: Option<&[String]>,
) -> Result<Map<String, Value>> {
let cache_key = ResponseCache::cache_key(&context);
if let Some(ref cache_arc) = self.cache {
let cache = cache_arc.read().await;
if let Some(cached_value) = cache.get(&cache_key) {
log::debug!("SuperpositionAPIProvider: cache hit for key");
let result = if let Some(prefixes) = prefix_filter {
filter_by_prefix(cached_value, prefixes)
} else {
cached_value.clone()
};
return Ok(result);
}
}
let client = &self.client;
let global_context = self.global_context.read().await;
context.merge_missing(&global_context);
let (query_data, _targeting_key) =
conversions::evaluation_context_to_query(context);
let mut builder = client
.get_resolved_config()
.workspace_id(&self.options.workspace_id)
.org_id(&self.options.org_id);
let sdk_context: HashMap<String, Document> = query_data
.into_iter()
.map(|(k, v)| (k, conversions::value_to_document(v)))
.collect();
builder = builder.set_context(Some(sdk_context));
let response = builder.send().await.map_err(|e| {
SuperpositionError::NetworkError(format!(
"Failed to get resolved config: {}",
e
))
})?;
let config_value = conversions::document_to_value(response.config);
let full_result = match config_value {
Value::Object(map) => map,
other => {
log::warn!(
"SuperpositionAPIProvider: resolved config is not an object, wrapping: {:?}",
other
);
let mut map = Map::new();
map.insert("_value".to_string(), other);
map
}
};
if let Some(ref cache_arc) = self.cache {
let mut cache = cache_arc.write().await;
cache.put(cache_key, full_result.clone());
}
let result = if let Some(prefixes) = prefix_filter {
filter_by_prefix(&full_result, prefixes)
} else {
full_result
};
Ok(result)
}
}
fn filter_by_prefix(
config: &Map<String, Value>,
prefixes: &[String],
) -> Map<String, Value> {
if prefixes.is_empty() {
return config.clone();
}
config
.iter()
.filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix)))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
#[async_trait]
impl AllFeatureProvider for SuperpositionAPIProvider {
async fn resolve_all_features(
&self,
context: EvaluationContext,
) -> Result<Map<String, Value>> {
self.resolve_remote(context, None).await
}
async fn resolve_all_features_with_filter(
&self,
context: EvaluationContext,
prefix_filter: Option<&[String]>,
) -> Result<Map<String, Value>> {
self.resolve_remote(context, prefix_filter).await
}
}
#[async_trait]
impl FeatureExperimentMeta for SuperpositionAPIProvider {
async fn get_applicable_variants(
&self,
_context: EvaluationContext,
) -> Result<Vec<String>> {
Ok(vec![])
}
}
#[async_trait]
impl FeatureProvider for SuperpositionAPIProvider {
async fn initialize(&mut self, _context: &EvaluationContext) {
log::info!("Initializing SuperpositionAPIProvider...");
{
let mut status = self.status.write().await;
*status = ProviderStatus::Ready;
}
log::info!("SuperpositionAPIProvider initialized successfully");
}
async fn resolve_bool_value(
&self,
flag_key: &str,
evaluation_context: &EvaluationContext,
) -> EvaluationResult<ResolutionDetails<bool>> {
self.resolve_bool(flag_key, evaluation_context.clone())
.await
}
async fn resolve_string_value(
&self,
flag_key: &str,
evaluation_context: &EvaluationContext,
) -> EvaluationResult<ResolutionDetails<String>> {
self.resolve_string(flag_key, evaluation_context.clone())
.await
}
async fn resolve_int_value(
&self,
flag_key: &str,
evaluation_context: &EvaluationContext,
) -> EvaluationResult<ResolutionDetails<i64>> {
self.resolve_int(flag_key, evaluation_context.clone()).await
}
async fn resolve_float_value(
&self,
flag_key: &str,
evaluation_context: &EvaluationContext,
) -> EvaluationResult<ResolutionDetails<f64>> {
self.resolve_float(flag_key, evaluation_context.clone())
.await
}
async fn resolve_struct_value(
&self,
flag_key: &str,
evaluation_context: &EvaluationContext,
) -> EvaluationResult<ResolutionDetails<StructValue>> {
self.resolve_struct(flag_key, evaluation_context.clone())
.await
}
fn metadata(&self) -> &ProviderMetadata {
&self.metadata
}
fn status(&self) -> ProviderStatus {
match self.status.try_read() {
Ok(status) => match *status {
ProviderStatus::Ready => ProviderStatus::Ready,
ProviderStatus::Error => ProviderStatus::Error,
ProviderStatus::NotReady => ProviderStatus::NotReady,
ProviderStatus::STALE => ProviderStatus::STALE,
},
Err(_) => ProviderStatus::NotReady,
}
}
}