use crate::yahoo_error::YahooError;
use crate::config::{ConfigProfile, ConfigSource};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use tokio::sync::{broadcast, watch};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigChangeEvent {
pub event_id: String,
pub timestamp: SystemTime,
pub change_type: ConfigChangeType,
pub profile_name: String,
pub previous_config: Option<ConfigProfile>,
pub new_config: ConfigProfile,
pub source: ConfigSource,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ConfigChangeType {
Created,
Updated,
Deleted,
Activated,
RolledBack,
Synchronized,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigVersion {
pub version: u64,
pub config: ConfigProfile,
pub created_at: SystemTime,
pub description: String,
pub author: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeatureFlag {
pub name: String,
pub description: String,
pub enabled: bool,
pub rollout_percentage: f64,
pub target_groups: Vec<String>,
pub conditions: HashMap<String, String>,
pub metadata: HashMap<String, serde_json::Value>,
}
pub type ConfigChangeCallback = Box<dyn Fn(&ConfigChangeEvent) -> Result<(), String> + Send + Sync>;
pub struct RuntimeConfigManager {
config_history: Arc<RwLock<Vec<ConfigVersion>>>,
feature_flags: Arc<RwLock<HashMap<String, FeatureFlag>>>,
change_broadcaster: broadcast::Sender<ConfigChangeEvent>,
config_watcher: Arc<RwLock<Option<watch::Sender<ConfigProfile>>>>,
callbacks: Arc<RwLock<Vec<ConfigChangeCallback>>>,
remote_sources: Arc<RwLock<HashMap<String, RemoteConfigSource>>>,
current_version: Arc<RwLock<u64>>,
sync_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct RemoteConfigSource {
pub name: String,
pub url: String,
pub sync_interval: Duration,
pub auth_headers: HashMap<String, String>,
pub last_sync: Option<SystemTime>,
pub status: RemoteSourceStatus,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RemoteSourceStatus {
Active,
Error(String),
Disabled,
Syncing,
}
impl RuntimeConfigManager {
pub fn new() -> Self {
let (change_tx, _) = broadcast::channel(100);
Self {
config_history: Arc::new(RwLock::new(Vec::new())),
feature_flags: Arc::new(RwLock::new(HashMap::new())),
change_broadcaster: change_tx,
config_watcher: Arc::new(RwLock::new(None)),
callbacks: Arc::new(RwLock::new(Vec::new())),
remote_sources: Arc::new(RwLock::new(HashMap::new())),
current_version: Arc::new(RwLock::new(1)),
sync_interval: Duration::from_secs(60), }
}
pub async fn apply_config_change(
&self,
new_config: ConfigProfile,
source: ConfigSource,
description: String,
author: String,
) -> Result<ConfigChangeEvent, YahooError> {
let event_id = Uuid::new_v4().to_string();
let timestamp = SystemTime::now();
let version = {
let mut version_guard = self.current_version.write().unwrap();
*version_guard += 1;
*version_guard
};
let config_version = ConfigVersion {
version,
config: new_config.clone(),
created_at: timestamp,
description,
author,
};
{
let mut history = self.config_history.write().unwrap();
history.push(config_version);
if history.len() > 100 {
history.remove(0);
}
}
let change_event = ConfigChangeEvent {
event_id,
timestamp,
change_type: ConfigChangeType::Updated,
profile_name: new_config.name.clone(),
previous_config: None, new_config: new_config.clone(),
source,
metadata: HashMap::new(),
};
let _ = self.change_broadcaster.send(change_event.clone());
self.notify_callbacks(&change_event).await?;
if let Some(watcher) = self.config_watcher.read().unwrap().as_ref() {
let _ = watcher.send(new_config);
}
Ok(change_event)
}
pub async fn rollback_to_version(&self, version: u64) -> Result<ConfigChangeEvent, YahooError> {
let history = self.config_history.read().unwrap();
let config_version = history
.iter()
.find(|v| v.version == version)
.ok_or_else(|| YahooError::InvalidStatusCode(format!("Version {} not found", version)))?;
let rollback_config = config_version.config.clone();
drop(history);
self.apply_config_change(
rollback_config,
ConfigSource::Memory,
format!("Rollback to version {}", version),
"system".to_string(),
).await
}
pub fn get_config_history(&self) -> Vec<ConfigVersion> {
self.config_history.read().unwrap().clone()
}
pub async fn add_feature_flag(&self, flag: FeatureFlag) -> Result<(), YahooError> {
let mut flags = self.feature_flags.write().unwrap();
flags.insert(flag.name.clone(), flag);
Ok(())
}
pub fn is_feature_enabled(&self, feature_name: &str, context: &HashMap<String, String>) -> bool {
let flags = self.feature_flags.read().unwrap();
if let Some(flag) = flags.get(feature_name) {
if !flag.enabled {
return false;
}
if flag.rollout_percentage < 100.0 {
let hash = feature_name.len() % 100;
if hash as f64 > flag.rollout_percentage {
return false;
}
}
if !flag.target_groups.is_empty() {
if let Some(user_group) = context.get("user_group") {
if !flag.target_groups.contains(user_group) {
return false;
}
} else {
return false;
}
}
for (key, expected_value) in &flag.conditions {
if let Some(actual_value) = context.get(key) {
if actual_value != expected_value {
return false;
}
} else {
return false;
}
}
true
} else {
false
}
}
pub fn subscribe_to_changes(&self) -> broadcast::Receiver<ConfigChangeEvent> {
self.change_broadcaster.subscribe()
}
pub fn watch_config_changes(&self) -> watch::Receiver<ConfigProfile> {
let (tx, rx) = watch::channel(ConfigProfile::default());
*self.config_watcher.write().unwrap() = Some(tx);
rx
}
pub fn add_change_callback(&self, callback: ConfigChangeCallback) {
self.callbacks.write().unwrap().push(callback);
}
pub async fn add_remote_source(&self, source: RemoteConfigSource) -> Result<(), YahooError> {
let mut sources = self.remote_sources.write().unwrap();
sources.insert(source.name.clone(), source);
Ok(())
}
pub async fn sync_remote_configs(&self) -> Result<Vec<ConfigChangeEvent>, YahooError> {
let mut changes = Vec::new();
let sources: Vec<RemoteConfigSource> = {
let sources_guard = self.remote_sources.read().unwrap();
sources_guard.values().cloned().collect()
};
for mut source in sources {
if source.status == RemoteSourceStatus::Active {
match self.fetch_remote_config(&source).await {
Ok(config) => {
let change = self.apply_config_change(
config,
ConfigSource::Remote(source.url.clone()),
format!("Sync from remote source: {}", source.name),
"remote-sync".to_string(),
).await?;
changes.push(change);
source.last_sync = Some(SystemTime::now());
let mut sources_guard = self.remote_sources.write().unwrap();
sources_guard.insert(source.name.clone(), source);
}
Err(e) => {
eprintln!("Failed to sync from remote source {}: {}", source.name, e);
source.status = RemoteSourceStatus::Error(e.to_string());
let mut sources_guard = self.remote_sources.write().unwrap();
sources_guard.insert(source.name.clone(), source);
}
}
}
}
Ok(changes)
}
pub async fn start_auto_sync(&self) -> Result<(), YahooError> {
let manager = Arc::new(self.clone());
tokio::spawn(async move {
let mut interval = tokio::time::interval(manager.sync_interval);
loop {
interval.tick().await;
if let Err(e) = manager.sync_remote_configs().await {
eprintln!("Auto-sync failed: {}", e);
}
}
});
Ok(())
}
async fn fetch_remote_config(&self, _source: &RemoteConfigSource) -> Result<ConfigProfile, YahooError> {
Ok(ConfigProfile::default())
}
async fn notify_callbacks(&self, event: &ConfigChangeEvent) -> Result<(), YahooError> {
let callbacks = self.callbacks.read().unwrap();
for callback in callbacks.iter() {
if let Err(e) = callback(event) {
eprintln!("Configuration change callback failed: {}", e);
}
}
Ok(())
}
}
impl Clone for RuntimeConfigManager {
fn clone(&self) -> Self {
Self {
config_history: Arc::clone(&self.config_history),
feature_flags: Arc::clone(&self.feature_flags),
change_broadcaster: self.change_broadcaster.clone(),
config_watcher: Arc::clone(&self.config_watcher),
callbacks: Arc::clone(&self.callbacks),
remote_sources: Arc::clone(&self.remote_sources),
current_version: Arc::clone(&self.current_version),
sync_interval: self.sync_interval,
}
}
}
impl Default for RuntimeConfigManager {
fn default() -> Self {
Self::new()
}
}
impl Default for FeatureFlag {
fn default() -> Self {
Self {
name: String::new(),
description: String::new(),
enabled: false,
rollout_percentage: 100.0,
target_groups: Vec::new(),
conditions: HashMap::new(),
metadata: HashMap::new(),
}
}
}
#[derive(Debug)]
pub struct ABTestManager {
tests: Arc<RwLock<HashMap<String, ABTest>>>,
assignments: Arc<RwLock<HashMap<String, String>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABTest {
pub name: String,
pub description: String,
pub variants: Vec<ABTestVariant>,
pub traffic_percentage: f64,
pub status: ABTestStatus,
pub start_time: SystemTime,
pub end_time: Option<SystemTime>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABTestVariant {
pub name: String,
pub traffic_percentage: f64,
pub config: ConfigProfile,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ABTestStatus {
Active,
Paused,
Completed,
Draft,
}
impl ABTestManager {
pub fn new() -> Self {
Self {
tests: Arc::new(RwLock::new(HashMap::new())),
assignments: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn add_test(&self, test: ABTest) -> Result<(), YahooError> {
let total_traffic: f64 = test.variants.iter().map(|v| v.traffic_percentage).sum();
if (total_traffic - 100.0).abs() > 0.001 {
return Err(YahooError::InvalidStatusCode(
"Variant traffic percentages must sum to 100%".into()
));
}
self.tests.write().unwrap().insert(test.name.clone(), test);
Ok(())
}
pub fn get_config_for_user(&self, test_name: &str, user_id: &str) -> Option<ConfigProfile> {
let tests = self.tests.read().unwrap();
let test = tests.get(test_name)?;
if test.status != ABTestStatus::Active {
return None;
}
let assignments = self.assignments.read().unwrap();
if let Some(variant_name) = assignments.get(&format!("{}:{}", test_name, user_id)) {
if let Some(variant) = test.variants.iter().find(|v| &v.name == variant_name) {
return Some(variant.config.clone());
}
}
drop(assignments);
let hash = self.hash_user_for_test(user_id, test_name);
let mut cumulative = 0.0;
for variant in &test.variants {
cumulative += variant.traffic_percentage;
if hash <= cumulative {
let mut assignments = self.assignments.write().unwrap();
assignments.insert(
format!("{}:{}", test_name, user_id),
variant.name.clone()
);
return Some(variant.config.clone());
}
}
None
}
fn hash_user_for_test(&self, user_id: &str, test_name: &str) -> f64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
user_id.hash(&mut hasher);
test_name.hash(&mut hasher);
let hash = hasher.finish();
(hash % 100) as f64
}
}
impl Default for ABTestManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_runtime_config_manager() {
let manager = RuntimeConfigManager::new();
let config = ConfigProfile::development();
let change = manager.apply_config_change(
config,
ConfigSource::Memory,
"Test change".to_string(),
"test".to_string(),
).await.unwrap();
assert_eq!(change.change_type, ConfigChangeType::Updated);
let history = manager.get_config_history();
assert_eq!(history.len(), 1);
}
#[test]
fn test_feature_flags() {
let manager = RuntimeConfigManager::new();
let flag = FeatureFlag {
name: "test_feature".to_string(),
description: "Test feature".to_string(),
enabled: true,
rollout_percentage: 50.0,
target_groups: vec!["beta".to_string()],
conditions: HashMap::new(),
metadata: HashMap::new(),
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
manager.add_feature_flag(flag).await.unwrap();
});
let mut context = HashMap::new();
context.insert("user_group".to_string(), "beta".to_string());
let _is_enabled = manager.is_feature_enabled("test_feature", &context);
}
#[test]
fn test_ab_testing() {
let manager = ABTestManager::new();
let test = ABTest {
name: "test_experiment".to_string(),
description: "Test experiment".to_string(),
variants: vec![
ABTestVariant {
name: "control".to_string(),
traffic_percentage: 50.0,
config: ConfigProfile::default(),
},
ABTestVariant {
name: "treatment".to_string(),
traffic_percentage: 50.0,
config: ConfigProfile::development(),
},
],
traffic_percentage: 100.0,
status: ABTestStatus::Active,
start_time: SystemTime::now(),
end_time: None,
};
manager.add_test(test).unwrap();
let config1 = manager.get_config_for_user("test_experiment", "user1");
let config2 = manager.get_config_for_user("test_experiment", "user1");
assert_eq!(config1.is_some(), config2.is_some());
if let (Some(c1), Some(c2)) = (config1, config2) {
assert_eq!(c1.name, c2.name);
}
}
}