use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Environment {
Blue,
Green,
}
impl Environment {
pub fn opposite(&self) -> Self {
match self {
Environment::Blue => Environment::Green,
Environment::Green => Environment::Blue,
}
}
pub fn name(&self) -> &'static str {
match self {
Environment::Blue => "blue",
Environment::Green => "green",
}
}
}
impl std::fmt::Display for Environment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum EnvironmentState {
Idle,
Active,
Deploying,
Draining,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum DeploymentStrategy {
#[default]
Instant,
Gradual { increment: u8, interval_secs: u64 },
Shadow,
Manual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentInfo {
pub environment: Environment,
pub state: EnvironmentState,
pub health: HealthStatus,
pub version: Option<String>,
pub traffic_percentage: u8,
pub deployed_at: Option<SystemTime>,
pub last_health_check: Option<SystemTime>,
pub metadata: HashMap<String, String>,
}
impl EnvironmentInfo {
pub fn new(environment: Environment) -> Self {
Self {
environment,
state: EnvironmentState::Idle,
health: HealthStatus::Unknown,
version: None,
traffic_percentage: 0,
deployed_at: None,
last_health_check: None,
metadata: HashMap::new(),
}
}
pub fn is_ready(&self) -> bool {
self.state != EnvironmentState::Deploying
&& self.state != EnvironmentState::Failed
&& self.health == HealthStatus::Healthy
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentRecord {
pub id: String,
pub environment: Environment,
pub version: String,
pub timestamp: SystemTime,
pub duration: Option<Duration>,
pub success: bool,
pub is_rollback: bool,
pub error: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckConfig {
pub endpoint: String,
pub interval: Duration,
pub timeout: Duration,
pub failure_threshold: u32,
pub success_threshold: u32,
pub deep_check: bool,
}
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
endpoint: "/health".to_string(),
interval: Duration::from_secs(10),
timeout: Duration::from_secs(5),
failure_threshold: 3,
success_threshold: 2,
deep_check: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RollbackConfig {
pub auto_rollback: bool,
pub error_rate_threshold: f64,
pub latency_threshold_ms: u64,
pub observation_period: Duration,
pub rollback_timeout: Duration,
}
impl Default for RollbackConfig {
fn default() -> Self {
Self {
auto_rollback: true,
error_rate_threshold: 5.0, latency_threshold_ms: 5000, observation_period: Duration::from_secs(60),
rollback_timeout: Duration::from_secs(300),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentConfig {
pub strategy: DeploymentStrategy,
pub health_check: HealthCheckConfig,
pub rollback: RollbackConfig,
pub max_concurrent_connections: usize,
pub drain_timeout: Duration,
pub notifications_enabled: bool,
pub notification_webhook: Option<String>,
}
impl Default for DeploymentConfig {
fn default() -> Self {
Self {
strategy: DeploymentStrategy::default(),
health_check: HealthCheckConfig::default(),
rollback: RollbackConfig::default(),
max_concurrent_connections: 10000,
drain_timeout: Duration::from_secs(30),
notifications_enabled: false,
notification_webhook: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingDecision {
pub target: Environment,
pub weight: f64,
pub is_shadow: bool,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeploymentEvent {
DeploymentStarted {
environment: Environment,
version: String,
},
DeploymentCompleted {
environment: Environment,
version: String,
duration: Duration,
},
DeploymentFailed {
environment: Environment,
version: String,
error: String,
},
TrafficSwitchStarted {
from: Environment,
to: Environment,
strategy: DeploymentStrategy,
},
TrafficSwitchCompleted { active: Environment },
HealthStatusChanged {
environment: Environment,
old_status: HealthStatus,
new_status: HealthStatus,
},
RollbackStarted {
from: Environment,
to: Environment,
reason: String,
},
RollbackCompleted { active: Environment, success: bool },
}
struct ControllerState {
blue: EnvironmentInfo,
green: EnvironmentInfo,
active_environment: Environment,
deployment_history: Vec<DeploymentRecord>,
events: Vec<(SystemTime, DeploymentEvent)>,
health_check_failures: HashMap<Environment, u32>,
health_check_successes: HashMap<Environment, u32>,
}
impl ControllerState {
fn new() -> Self {
let mut blue = EnvironmentInfo::new(Environment::Blue);
blue.state = EnvironmentState::Active;
blue.traffic_percentage = 100;
let green = EnvironmentInfo::new(Environment::Green);
Self {
blue,
green,
active_environment: Environment::Blue,
deployment_history: Vec::new(),
events: Vec::new(),
health_check_failures: HashMap::new(),
health_check_successes: HashMap::new(),
}
}
fn get_env_mut(&mut self, env: Environment) -> &mut EnvironmentInfo {
match env {
Environment::Blue => &mut self.blue,
Environment::Green => &mut self.green,
}
}
fn get_env(&self, env: Environment) -> &EnvironmentInfo {
match env {
Environment::Blue => &self.blue,
Environment::Green => &self.green,
}
}
}
pub struct BlueGreenController {
config: DeploymentConfig,
state: Arc<RwLock<ControllerState>>,
event_handlers: Arc<RwLock<Vec<Arc<dyn DeploymentEventHandler + Send + Sync>>>>,
}
impl BlueGreenController {
pub fn new(config: DeploymentConfig) -> Self {
Self {
config,
state: Arc::new(RwLock::new(ControllerState::new())),
event_handlers: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn register_event_handler(
&self,
handler: Arc<dyn DeploymentEventHandler + Send + Sync>,
) {
let mut handlers = self.event_handlers.write().await;
handlers.push(handler);
}
async fn emit_event(&self, event: DeploymentEvent) {
let now = SystemTime::now();
{
let mut state = self.state.write().await;
state.events.push((now, event.clone()));
if state.events.len() > 1000 {
state.events.drain(0..100);
}
}
let handlers = self.event_handlers.read().await;
for handler in handlers.iter() {
handler.on_event(&event).await;
}
}
pub async fn get_status(&self) -> BlueGreenStatus {
let state = self.state.read().await;
BlueGreenStatus {
blue: state.blue.clone(),
green: state.green.clone(),
active_environment: state.active_environment,
deployment_history_count: state.deployment_history.len(),
last_deployment: state.deployment_history.last().cloned(),
}
}
pub async fn get_environment(&self, env: Environment) -> EnvironmentInfo {
let state = self.state.read().await;
state.get_env(env).clone()
}
pub async fn deploy_to_environment(
&self,
env: Environment,
version: &str,
) -> Result<DeploymentRecord> {
let start_time = SystemTime::now();
let deployment_id = uuid::Uuid::new_v4().to_string();
{
let state = self.state.read().await;
let env_info = state.get_env(env);
if env_info.state == EnvironmentState::Active && env_info.traffic_percentage > 0 {
return Err(anyhow!(
"Cannot deploy to {} environment while it's receiving traffic",
env
));
}
}
{
let mut state = self.state.write().await;
let env_info = state.get_env_mut(env);
env_info.state = EnvironmentState::Deploying;
env_info.health = HealthStatus::Unknown;
}
self.emit_event(DeploymentEvent::DeploymentStarted {
environment: env,
version: version.to_string(),
})
.await;
let duration = SystemTime::now()
.duration_since(start_time)
.unwrap_or_default();
{
let mut state = self.state.write().await;
let env_info = state.get_env_mut(env);
env_info.state = EnvironmentState::Idle;
env_info.health = HealthStatus::Unknown;
env_info.version = Some(version.to_string());
env_info.deployed_at = Some(SystemTime::now());
}
let record = DeploymentRecord {
id: deployment_id,
environment: env,
version: version.to_string(),
timestamp: start_time,
duration: Some(duration),
success: true,
is_rollback: false,
error: None,
metadata: HashMap::new(),
};
{
let mut state = self.state.write().await;
state.deployment_history.push(record.clone());
}
self.emit_event(DeploymentEvent::DeploymentCompleted {
environment: env,
version: version.to_string(),
duration,
})
.await;
Ok(record)
}
pub async fn deploy_to_green(&self, version: &str) -> Result<DeploymentRecord> {
let active = {
let state = self.state.read().await;
state.active_environment
};
let target = active.opposite();
self.deploy_to_environment(target, version).await
}
pub async fn update_health(&self, env: Environment, status: HealthStatus) {
let old_status;
{
let state = self.state.read().await;
old_status = state.get_env(env).health;
}
{
let mut state = self.state.write().await;
let env_info = state.get_env_mut(env);
env_info.health = status;
env_info.last_health_check = Some(SystemTime::now());
match status {
HealthStatus::Healthy => {
state.health_check_failures.insert(env, 0);
let count = state.health_check_successes.entry(env).or_insert(0);
*count += 1;
}
HealthStatus::Unhealthy | HealthStatus::Degraded => {
state.health_check_successes.insert(env, 0);
let count = state.health_check_failures.entry(env).or_insert(0);
*count += 1;
}
HealthStatus::Unknown => {}
}
}
if old_status != status {
self.emit_event(DeploymentEvent::HealthStatusChanged {
environment: env,
old_status,
new_status: status,
})
.await;
}
}
pub async fn verify_health(&self, env: Environment) -> Result<bool> {
let state = self.state.read().await;
let env_info = state.get_env(env);
Ok(env_info.health == HealthStatus::Healthy)
}
pub async fn switch_traffic(&self, target: Environment) -> Result<()> {
let current;
{
let state = self.state.read().await;
current = state.active_environment;
let target_info = state.get_env(target);
if !target_info.is_ready() {
return Err(anyhow!(
"Target environment {} is not ready for traffic (state: {:?}, health: {:?})",
target,
target_info.state,
target_info.health
));
}
}
if current == target {
return Ok(()); }
self.emit_event(DeploymentEvent::TrafficSwitchStarted {
from: current,
to: target,
strategy: self.config.strategy,
})
.await;
match self.config.strategy {
DeploymentStrategy::Instant => {
self.instant_switch(target).await?;
}
DeploymentStrategy::Gradual {
increment,
interval_secs,
} => {
self.gradual_switch(target, increment, interval_secs)
.await?;
}
DeploymentStrategy::Shadow => {
return Err(anyhow!("Shadow mode does not support traffic switching"));
}
DeploymentStrategy::Manual => {
self.instant_switch(target).await?;
}
}
self.emit_event(DeploymentEvent::TrafficSwitchCompleted { active: target })
.await;
Ok(())
}
async fn instant_switch(&self, target: Environment) -> Result<()> {
let mut state = self.state.write().await;
let old = state.active_environment;
{
let old_env = state.get_env_mut(old);
old_env.state = EnvironmentState::Draining;
old_env.traffic_percentage = 0;
}
{
let new_env = state.get_env_mut(target);
new_env.state = EnvironmentState::Active;
new_env.traffic_percentage = 100;
}
state.active_environment = target;
{
let old_env = state.get_env_mut(old);
old_env.state = EnvironmentState::Idle;
}
Ok(())
}
async fn gradual_switch(
&self,
target: Environment,
increment: u8,
interval_secs: u64,
) -> Result<()> {
let current;
{
let state = self.state.read().await;
current = state.active_environment;
}
let mut current_percentage = 0u8;
while current_percentage < 100 {
current_percentage = current_percentage.saturating_add(increment).min(100);
{
let mut state = self.state.write().await;
state.get_env_mut(target).traffic_percentage = current_percentage;
state.get_env_mut(current).traffic_percentage = 100 - current_percentage;
}
if current_percentage < 100 {
tokio::time::sleep(Duration::from_secs(interval_secs)).await;
let health_ok = self.verify_health(target).await?;
if !health_ok {
return Err(anyhow!(
"Health check failed during gradual switch at {}%",
current_percentage
));
}
}
}
{
let mut state = self.state.write().await;
state.get_env_mut(current).state = EnvironmentState::Idle;
state.get_env_mut(target).state = EnvironmentState::Active;
state.active_environment = target;
}
Ok(())
}
pub async fn switch_to_green(&self) -> Result<()> {
self.switch_traffic(Environment::Green).await
}
pub async fn switch_to_blue(&self) -> Result<()> {
self.switch_traffic(Environment::Blue).await
}
pub async fn rollback(&self, reason: &str) -> Result<()> {
let current;
let target;
{
let state = self.state.read().await;
current = state.active_environment;
target = current.opposite();
let target_info = state.get_env(target);
if target_info.version.is_none() {
return Err(anyhow!(
"Cannot rollback: {} environment has no previous deployment",
target
));
}
}
self.emit_event(DeploymentEvent::RollbackStarted {
from: current,
to: target,
reason: reason.to_string(),
})
.await;
self.update_health(target, HealthStatus::Healthy).await;
let result = self.switch_traffic(target).await;
let success = result.is_ok();
self.emit_event(DeploymentEvent::RollbackCompleted {
active: if success { target } else { current },
success,
})
.await;
if success {
let version = {
let state = self.state.read().await;
state.get_env(target).version.clone().unwrap_or_default()
};
let record = DeploymentRecord {
id: uuid::Uuid::new_v4().to_string(),
environment: target,
version,
timestamp: SystemTime::now(),
duration: None,
success: true,
is_rollback: true,
error: None,
metadata: {
let mut m = HashMap::new();
m.insert("rollback_reason".to_string(), reason.to_string());
m
},
};
let mut state = self.state.write().await;
state.deployment_history.push(record);
}
result
}
pub async fn route_request(&self, request_id: &str) -> RoutingDecision {
let state = self.state.read().await;
let blue_weight = state.blue.traffic_percentage as f64 / 100.0;
let green_weight = state.green.traffic_percentage as f64 / 100.0;
let hash = request_id
.bytes()
.fold(0u64, |acc, b| acc.wrapping_add(b as u64));
let normalized = (hash % 100) as f64 / 100.0;
let (target, weight) = if normalized < blue_weight {
(Environment::Blue, blue_weight)
} else {
(Environment::Green, green_weight)
};
RoutingDecision {
target,
weight,
is_shadow: matches!(self.config.strategy, DeploymentStrategy::Shadow),
metadata: HashMap::new(),
}
}
pub async fn get_deployment_history(&self, limit: Option<usize>) -> Vec<DeploymentRecord> {
let state = self.state.read().await;
let history = &state.deployment_history;
match limit {
Some(n) => history.iter().rev().take(n).cloned().collect(),
None => history.clone(),
}
}
pub async fn get_recent_events(&self, limit: usize) -> Vec<(SystemTime, DeploymentEvent)> {
let state = self.state.read().await;
state.events.iter().rev().take(limit).cloned().collect()
}
pub async fn should_auto_rollback(&self, error_rate: f64, latency_p99_ms: u64) -> bool {
if !self.config.rollback.auto_rollback {
return false;
}
error_rate > self.config.rollback.error_rate_threshold
|| latency_p99_ms > self.config.rollback.latency_threshold_ms
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlueGreenStatus {
pub blue: EnvironmentInfo,
pub green: EnvironmentInfo,
pub active_environment: Environment,
pub deployment_history_count: usize,
pub last_deployment: Option<DeploymentRecord>,
}
#[async_trait::async_trait]
pub trait DeploymentEventHandler {
async fn on_event(&self, event: &DeploymentEvent);
}
pub struct WebhookNotifier {
webhook_url: String,
client: reqwest::Client,
}
impl WebhookNotifier {
pub fn new(webhook_url: String) -> Self {
Self {
webhook_url,
client: reqwest::Client::new(),
}
}
}
#[async_trait::async_trait]
impl DeploymentEventHandler for WebhookNotifier {
async fn on_event(&self, event: &DeploymentEvent) {
let payload = serde_json::json!({
"event": event,
"timestamp": SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
let _ = self
.client
.post(&self.webhook_url)
.json(&payload)
.send()
.await;
}
}
pub struct LoggingEventHandler;
#[async_trait::async_trait]
impl DeploymentEventHandler for LoggingEventHandler {
async fn on_event(&self, event: &DeploymentEvent) {
match event {
DeploymentEvent::DeploymentStarted {
environment,
version,
} => {
tracing::info!("Deployment started: {} -> {}", environment, version);
}
DeploymentEvent::DeploymentCompleted {
environment,
version,
duration,
} => {
tracing::info!(
"Deployment completed: {} -> {} ({:?})",
environment,
version,
duration
);
}
DeploymentEvent::DeploymentFailed {
environment,
version,
error,
} => {
tracing::error!(
"Deployment failed: {} -> {} - {}",
environment,
version,
error
);
}
DeploymentEvent::TrafficSwitchStarted { from, to, strategy } => {
tracing::info!(
"Traffic switch started: {} -> {} ({:?})",
from,
to,
strategy
);
}
DeploymentEvent::TrafficSwitchCompleted { active } => {
tracing::info!("Traffic switch completed: active = {}", active);
}
DeploymentEvent::HealthStatusChanged {
environment,
old_status,
new_status,
} => {
tracing::info!(
"Health status changed: {} {:?} -> {:?}",
environment,
old_status,
new_status
);
}
DeploymentEvent::RollbackStarted { from, to, reason } => {
tracing::warn!("Rollback started: {} -> {} ({})", from, to, reason);
}
DeploymentEvent::RollbackCompleted { active, success } => {
if *success {
tracing::info!("Rollback completed successfully: active = {}", active);
} else {
tracing::error!("Rollback failed: active = {}", active);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_controller_creation() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
let status = controller.get_status().await;
assert_eq!(status.active_environment, Environment::Blue);
assert_eq!(status.blue.traffic_percentage, 100);
assert_eq!(status.green.traffic_percentage, 0);
}
#[tokio::test]
async fn test_deploy_to_green() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
let record = controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
assert_eq!(record.environment, Environment::Green);
assert_eq!(record.version, "v2.0.0");
assert!(record.success);
assert!(!record.is_rollback);
}
#[tokio::test]
async fn test_switch_traffic() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
controller
.update_health(Environment::Green, HealthStatus::Healthy)
.await;
controller.switch_to_green().await.expect("should succeed");
let status = controller.get_status().await;
assert_eq!(status.active_environment, Environment::Green);
assert_eq!(status.green.traffic_percentage, 100);
assert_eq!(status.blue.traffic_percentage, 0);
}
#[tokio::test]
async fn test_rollback() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
{
let mut state = controller.state.write().await;
state.blue.version = Some("v1.0.0".to_string());
}
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
controller
.update_health(Environment::Green, HealthStatus::Healthy)
.await;
controller.switch_to_green().await.expect("should succeed");
controller
.rollback("Test rollback")
.await
.expect("should succeed");
let status = controller.get_status().await;
assert_eq!(status.active_environment, Environment::Blue);
}
#[tokio::test]
async fn test_routing_decision() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
let decision = controller.route_request("test-request-1").await;
assert_eq!(decision.target, Environment::Blue);
assert_eq!(decision.weight, 1.0);
}
#[tokio::test]
async fn test_health_update() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
controller
.update_health(Environment::Green, HealthStatus::Healthy)
.await;
let env = controller.get_environment(Environment::Green).await;
assert_eq!(env.health, HealthStatus::Healthy);
assert!(env.last_health_check.is_some());
}
#[tokio::test]
async fn test_deployment_history() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
controller
.deploy_to_green("v2.1.0")
.await
.expect("should succeed");
let history = controller.get_deployment_history(None).await;
assert_eq!(history.len(), 2);
let limited = controller.get_deployment_history(Some(1)).await;
assert_eq!(limited.len(), 1);
assert_eq!(limited[0].version, "v2.1.0");
}
#[tokio::test]
async fn test_environment_opposite() {
assert_eq!(Environment::Blue.opposite(), Environment::Green);
assert_eq!(Environment::Green.opposite(), Environment::Blue);
}
#[tokio::test]
async fn test_cannot_deploy_to_active() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
let result = controller
.deploy_to_environment(Environment::Blue, "v2.0.0")
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_cannot_switch_to_unhealthy() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
let result = controller.switch_to_green().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_auto_rollback_detection() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
assert!(!controller.should_auto_rollback(1.0, 1000).await);
assert!(controller.should_auto_rollback(10.0, 1000).await);
assert!(controller.should_auto_rollback(1.0, 10000).await);
}
#[tokio::test]
async fn test_gradual_switch_strategy() {
let config = DeploymentConfig {
strategy: DeploymentStrategy::Gradual {
increment: 25,
interval_secs: 0, },
..Default::default()
};
let controller = BlueGreenController::new(config);
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
controller
.update_health(Environment::Green, HealthStatus::Healthy)
.await;
controller.switch_to_green().await.expect("should succeed");
let status = controller.get_status().await;
assert_eq!(status.active_environment, Environment::Green);
assert_eq!(status.green.traffic_percentage, 100);
}
#[tokio::test]
async fn test_event_logging() {
let config = DeploymentConfig::default();
let controller = BlueGreenController::new(config);
controller
.deploy_to_green("v2.0.0")
.await
.expect("should succeed");
let events = controller.get_recent_events(10).await;
assert!(!events.is_empty());
}
}