use crate::core::error::{Error, Result};
use crate::core::resources::ResourceSpec;
use crate::deployment::ssh::SshDeploymentClient;
use crate::infra::traits::{BlueprintDeploymentResult, CloudProviderAdapter};
use blueprint_core::{debug, error, info, warn};
use blueprint_std::collections::{HashMap, VecDeque};
use blueprint_std::time::{Duration, SystemTime};
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, timeout};
const MAX_VERSION_HISTORY: usize = 10;
#[derive(Debug, Clone)]
pub struct UpdateParams {
pub version: String,
pub new_image: String,
pub resource_spec: ResourceSpec,
pub env_vars: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct RollingUpdateParams {
pub base: UpdateParams,
pub max_unavailable: u32,
pub max_surge: u32,
}
#[derive(Debug, Clone)]
pub struct CanaryUpdateParams {
pub base: UpdateParams,
pub initial_percentage: u8,
pub increment: u8,
pub interval: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStrategy {
RollingUpdate {
max_unavailable: u32,
max_surge: u32,
},
BlueGreen {
switch_timeout: Duration,
health_check_duration: Duration,
},
Canary {
initial_percentage: u8,
increment: u8,
interval: Duration,
},
Recreate,
}
impl Default for UpdateStrategy {
fn default() -> Self {
Self::BlueGreen {
switch_timeout: Duration::from_secs(300),
health_check_duration: Duration::from_secs(60),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentVersion {
pub version: String,
pub blueprint_image: String,
pub resource_spec: ResourceSpec,
pub env_vars: HashMap<String, String>,
pub deployment_time: SystemTime,
pub status: VersionStatus,
pub metadata: HashMap<String, String>,
pub container_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum VersionStatus {
Active,
Inactive,
Failed,
RolledBack,
Staging,
}
pub struct UpdateManager {
versions: VecDeque<DeploymentVersion>,
active_version: Option<String>,
strategy: UpdateStrategy,
}
impl UpdateManager {
pub fn new(strategy: UpdateStrategy) -> Self {
Self {
versions: VecDeque::new(),
active_version: None,
strategy,
}
}
pub fn add_version(&mut self, version: DeploymentVersion) {
info!("Adding deployment version: {}", version.version);
if self.versions.len() >= MAX_VERSION_HISTORY {
self.versions.pop_front();
}
self.versions.push_back(version);
}
pub fn active_version(&self) -> Option<&DeploymentVersion> {
self.active_version
.as_ref()
.and_then(|v| self.versions.iter().find(|ver| ver.version == *v))
}
pub fn get_version(&self, version: &str) -> Option<&DeploymentVersion> {
self.versions.iter().find(|v| v.version == version)
}
pub fn list_versions(&self) -> Vec<&DeploymentVersion> {
self.versions.iter().collect()
}
pub async fn update_blueprint(
&mut self,
adapter: &dyn CloudProviderAdapter,
new_image: &str,
resource_spec: &ResourceSpec,
env_vars: HashMap<String, String>,
current_deployment: &BlueprintDeploymentResult,
) -> Result<BlueprintDeploymentResult> {
let new_version = self.generate_version();
info!("Starting blueprint update to version {}", new_version);
match &self.strategy {
UpdateStrategy::BlueGreen {
switch_timeout,
health_check_duration,
} => {
let params = UpdateParams {
version: new_version.clone(),
new_image: new_image.to_string(),
resource_spec: resource_spec.clone(),
env_vars,
};
self.blue_green_update(
adapter,
¶ms,
current_deployment,
*switch_timeout,
*health_check_duration,
)
.await
}
UpdateStrategy::RollingUpdate {
max_unavailable,
max_surge,
} => {
let params = RollingUpdateParams {
base: UpdateParams {
version: new_version.clone(),
new_image: new_image.to_string(),
resource_spec: resource_spec.clone(),
env_vars,
},
max_unavailable: *max_unavailable,
max_surge: *max_surge,
};
self.rolling_update(adapter, ¶ms, current_deployment)
.await
}
UpdateStrategy::Canary {
initial_percentage,
increment,
interval,
} => {
let params = CanaryUpdateParams {
base: UpdateParams {
version: new_version.clone(),
new_image: new_image.to_string(),
resource_spec: resource_spec.clone(),
env_vars,
},
initial_percentage: *initial_percentage,
increment: *increment,
interval: *interval,
};
self.canary_update(adapter, ¶ms, current_deployment)
.await
}
UpdateStrategy::Recreate => {
self.recreate_update(
adapter,
&new_version,
new_image,
resource_spec,
env_vars,
current_deployment,
)
.await
}
}
}
async fn blue_green_update(
&mut self,
adapter: &dyn CloudProviderAdapter,
params: &UpdateParams,
current_deployment: &BlueprintDeploymentResult,
_switch_timeout: Duration,
health_check_duration: Duration,
) -> Result<BlueprintDeploymentResult> {
info!(
"Starting blue-green deployment for version {}",
params.version
);
let mut green_env = params.env_vars.clone();
green_env.insert("DEPLOYMENT_VERSION".to_string(), params.version.clone());
green_env.insert("DEPLOYMENT_COLOR".to_string(), "green".to_string());
let green_deployment = adapter
.deploy_blueprint(
¤t_deployment.instance,
¶ms.new_image,
¶ms.resource_spec,
green_env.clone(),
)
.await
.map_err(|e| {
error!("Failed to deploy green version: {}", e);
e
})?;
self.add_version(DeploymentVersion {
version: params.version.clone(),
blueprint_image: params.new_image.clone(),
resource_spec: params.resource_spec.clone(),
env_vars: green_env,
deployment_time: SystemTime::now(),
status: VersionStatus::Staging,
metadata: green_deployment.metadata.clone(),
container_id: Some(green_deployment.blueprint_id.clone()),
});
info!("Performing health checks on green deployment");
let health_check_result = timeout(
health_check_duration,
self.wait_for_healthy(&green_deployment, adapter),
)
.await;
match health_check_result {
Ok(Ok(true)) => {
info!("Green deployment is healthy, switching traffic");
if let Err(e) = self
.switch_traffic(&green_deployment, current_deployment)
.await
{
warn!("Failed to switch traffic: {}, rolling back", e);
adapter.cleanup_blueprint(&green_deployment).await?;
return Err(e);
}
if let Some(v) = self
.versions
.iter_mut()
.find(|v| v.version == params.version)
{
v.status = VersionStatus::Active;
}
if let Some(old_version) = &self.active_version {
if let Some(v) = self.versions.iter_mut().find(|v| v.version == *old_version) {
v.status = VersionStatus::Inactive;
}
}
self.active_version = Some(params.version.clone());
sleep(Duration::from_secs(30)).await;
if let Err(e) = adapter.cleanup_blueprint(current_deployment).await {
warn!("Failed to cleanup old deployment: {}", e);
}
Ok(green_deployment)
}
_ => {
error!("Green deployment health check failed, cleaning up");
if let Some(v) = self
.versions
.iter_mut()
.find(|v| v.version == params.version)
{
v.status = VersionStatus::Failed;
}
adapter.cleanup_blueprint(&green_deployment).await?;
Err(Error::Other("Green deployment health check failed".into()))
}
}
}
async fn rolling_update(
&mut self,
adapter: &dyn CloudProviderAdapter,
params: &RollingUpdateParams,
current_deployment: &BlueprintDeploymentResult,
) -> Result<BlueprintDeploymentResult> {
info!("Starting rolling update to version {}", params.base.version);
let mut new_env = params.base.env_vars.clone();
new_env.insert(
"DEPLOYMENT_VERSION".to_string(),
params.base.version.clone(),
);
let new_deployment = adapter
.deploy_blueprint(
¤t_deployment.instance,
¶ms.base.new_image,
¶ms.base.resource_spec,
new_env.clone(),
)
.await?;
if !self.wait_for_healthy(&new_deployment, adapter).await? {
adapter.cleanup_blueprint(&new_deployment).await?;
return Err(Error::Other("New deployment failed health check".into()));
}
adapter.cleanup_blueprint(current_deployment).await?;
self.add_version(DeploymentVersion {
version: params.base.version.clone(),
blueprint_image: params.base.new_image.clone(),
resource_spec: params.base.resource_spec.clone(),
env_vars: new_env,
deployment_time: SystemTime::now(),
status: VersionStatus::Active,
metadata: new_deployment.metadata.clone(),
container_id: Some(new_deployment.blueprint_id.clone()),
});
self.active_version = Some(params.base.version.clone());
Ok(new_deployment)
}
async fn canary_update(
&mut self,
adapter: &dyn CloudProviderAdapter,
params: &CanaryUpdateParams,
current_deployment: &BlueprintDeploymentResult,
) -> Result<BlueprintDeploymentResult> {
info!(
"Starting canary deployment for version {}",
params.base.version
);
let mut canary_env = params.base.env_vars.clone();
canary_env.insert(
"DEPLOYMENT_VERSION".to_string(),
params.base.version.clone(),
);
canary_env.insert("DEPLOYMENT_TYPE".to_string(), "canary".to_string());
let canary_deployment = adapter
.deploy_blueprint(
¤t_deployment.instance,
¶ms.base.new_image,
¶ms.base.resource_spec,
canary_env.clone(),
)
.await?;
let mut current_percentage = params.initial_percentage;
while current_percentage < 100 {
info!("Canary at {}% traffic", current_percentage);
if !adapter.health_check_blueprint(&canary_deployment).await? {
warn!(
"Canary health check failed at {}%, rolling back",
current_percentage
);
adapter.cleanup_blueprint(&canary_deployment).await?;
return Err(Error::Other(format!(
"Canary failed at {current_percentage}%"
)));
}
sleep(params.interval).await;
current_percentage = (current_percentage + params.increment).min(100);
}
info!("Canary deployment successful, completing rollout");
adapter.cleanup_blueprint(current_deployment).await?;
self.add_version(DeploymentVersion {
version: params.base.version.clone(),
blueprint_image: params.base.new_image.clone(),
resource_spec: params.base.resource_spec.clone(),
env_vars: canary_env,
deployment_time: SystemTime::now(),
status: VersionStatus::Active,
metadata: canary_deployment.metadata.clone(),
container_id: Some(canary_deployment.blueprint_id.clone()),
});
self.active_version = Some(params.base.version.clone());
Ok(canary_deployment)
}
async fn recreate_update(
&mut self,
adapter: &dyn CloudProviderAdapter,
version: &str,
new_image: &str,
resource_spec: &ResourceSpec,
env_vars: HashMap<String, String>,
current_deployment: &BlueprintDeploymentResult,
) -> Result<BlueprintDeploymentResult> {
info!("Starting recreate deployment for version {}", version);
adapter.cleanup_blueprint(current_deployment).await?;
let mut new_env = env_vars.clone();
new_env.insert("DEPLOYMENT_VERSION".to_string(), version.to_string());
let new_deployment = adapter
.deploy_blueprint(
¤t_deployment.instance,
new_image,
resource_spec,
new_env.clone(),
)
.await?;
self.add_version(DeploymentVersion {
version: version.to_string(),
blueprint_image: new_image.to_string(),
resource_spec: resource_spec.clone(),
env_vars: new_env,
deployment_time: SystemTime::now(),
status: VersionStatus::Active,
metadata: new_deployment.metadata.clone(),
container_id: Some(new_deployment.blueprint_id.clone()),
});
self.active_version = Some(version.to_string());
Ok(new_deployment)
}
pub async fn rollback(
&mut self,
adapter: &dyn CloudProviderAdapter,
target_version: &str,
current_deployment: &BlueprintDeploymentResult,
) -> Result<BlueprintDeploymentResult> {
info!("Rolling back to version {}", target_version);
let version = self
.get_version(target_version)
.ok_or_else(|| Error::Other(format!("Version {target_version} not found")))?
.clone();
if version.status == VersionStatus::Failed {
return Err(Error::Other("Cannot rollback to a failed version".into()));
}
let rollback_deployment = adapter
.deploy_blueprint(
¤t_deployment.instance,
&version.blueprint_image,
&version.resource_spec,
version.env_vars.clone(),
)
.await?;
if !self.wait_for_healthy(&rollback_deployment, adapter).await? {
error!("Rollback deployment failed health check");
adapter.cleanup_blueprint(&rollback_deployment).await?;
return Err(Error::Other("Rollback failed health check".into()));
}
adapter.cleanup_blueprint(current_deployment).await?;
if let Some(current) = &self.active_version {
if let Some(v) = self.versions.iter_mut().find(|v| v.version == *current) {
v.status = VersionStatus::RolledBack;
}
}
if let Some(v) = self
.versions
.iter_mut()
.find(|v| v.version == target_version)
{
v.status = VersionStatus::Active;
}
self.active_version = Some(target_version.to_string());
Ok(rollback_deployment)
}
async fn wait_for_healthy(
&self,
deployment: &BlueprintDeploymentResult,
adapter: &dyn CloudProviderAdapter,
) -> Result<bool> {
let max_attempts = 30;
let check_interval = Duration::from_secs(10);
for attempt in 1..=max_attempts {
debug!("Health check attempt {}/{}", attempt, max_attempts);
match adapter.health_check_blueprint(deployment).await {
Ok(true) => {
info!("Deployment is healthy");
return Ok(true);
}
Ok(false) => {
if attempt < max_attempts {
sleep(check_interval).await;
}
}
Err(e) => {
warn!("Health check error: {}", e);
if attempt < max_attempts {
sleep(check_interval).await;
}
}
}
}
Ok(false)
}
async fn switch_traffic(
&self,
new_deployment: &BlueprintDeploymentResult,
old_deployment: &BlueprintDeploymentResult,
) -> Result<()> {
info!(
"Switching traffic from {} to {}",
old_deployment.blueprint_id, new_deployment.blueprint_id
);
sleep(Duration::from_secs(5)).await;
Ok(())
}
fn generate_version(&self) -> String {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
format!("v{timestamp}")
}
pub fn get_history(&self, limit: usize) -> Vec<DeploymentVersion> {
self.versions.iter().rev().take(limit).cloned().collect()
}
pub async fn cleanup_old_versions(
&mut self,
adapter: &dyn CloudProviderAdapter,
keep_count: usize,
) -> Result<()> {
let inactive_versions: Vec<_> = self
.versions
.iter()
.filter(|v| v.status == VersionStatus::Inactive)
.skip(keep_count)
.cloned()
.collect();
for version in inactive_versions {
info!("Cleaning up old version: {}", version.version);
if let Some(container_id) = version.container_id {
let deployment = BlueprintDeploymentResult {
instance: crate::infra::types::ProvisionedInstance {
id: format!("update-cleanup-{}", uuid::Uuid::new_v4()),
public_ip: None,
private_ip: None,
status: crate::infra::types::InstanceStatus::Unknown,
provider: crate::core::remote::CloudProvider::Generic,
region: "unknown".to_string(),
instance_type: "unknown".to_string(),
},
blueprint_id: container_id,
port_mappings: HashMap::new(),
metadata: version.metadata.clone(),
};
if let Err(e) = adapter.cleanup_blueprint(&deployment).await {
warn!("Failed to cleanup version {}: {}", version.version, e);
}
}
self.versions.retain(|v| v.version != version.version);
}
Ok(())
}
}
impl UpdateManager {
pub async fn update_via_ssh(
&mut self,
ssh_client: &SshDeploymentClient,
new_image: &str,
resource_spec: &ResourceSpec,
env_vars: HashMap<String, String>,
) -> Result<String> {
let version = self.generate_version();
info!("Starting SSH update to version {}", version);
match &self.strategy {
UpdateStrategy::BlueGreen { .. } => {
let new_container_name = format!("blueprint-{version}");
let new_container_id = ssh_client
.deploy_container_with_resources(
new_image,
&new_container_name,
env_vars.clone(),
Some(resource_spec),
)
.await?;
if ssh_client.health_check_container(&new_container_id).await? {
ssh_client.switch_traffic_to(&new_container_name).await?;
if let Some(old_version) = &self.active_version {
let old_container_name = format!("blueprint-{old_version}");
ssh_client.stop_container(&old_container_name).await?;
}
self.active_version = Some(version.clone());
Ok(new_container_id)
} else {
ssh_client.remove_container(&new_container_id).await?;
Err(Error::Other("New container health check failed".into()))
}
}
_ => {
let new_container_id = ssh_client
.update_container_with_resources(new_image, env_vars, Some(resource_spec))
.await?;
self.active_version = Some(version.clone());
Ok(new_container_id)
}
}
}
pub async fn rollback_via_ssh(
&mut self,
ssh_client: &SshDeploymentClient,
target_version: &str,
) -> Result<()> {
let version = self
.get_version(target_version)
.ok_or_else(|| Error::Other(format!("Version {target_version} not found")))?
.clone();
ssh_client
.deploy_container(&version.blueprint_image, version.env_vars)
.await?;
self.active_version = Some(target_version.to_string());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_management() {
let mut manager = UpdateManager::new(UpdateStrategy::default());
let version1 = DeploymentVersion {
version: "v1".to_string(),
blueprint_image: "image:v1".to_string(),
resource_spec: ResourceSpec::basic(),
env_vars: HashMap::new(),
deployment_time: SystemTime::now(),
status: VersionStatus::Active,
metadata: HashMap::new(),
container_id: Some("container1".to_string()),
};
manager.add_version(version1.clone());
manager.active_version = Some("v1".to_string());
assert_eq!(manager.active_version().unwrap().version, "v1");
assert_eq!(manager.list_versions().len(), 1);
}
#[test]
fn test_version_history_limit() {
let mut manager = UpdateManager::new(UpdateStrategy::default());
for i in 0..15 {
let version = DeploymentVersion {
version: format!("v{i}"),
blueprint_image: format!("image:v{i}"),
resource_spec: ResourceSpec::basic(),
env_vars: HashMap::new(),
deployment_time: SystemTime::now(),
status: VersionStatus::Inactive,
metadata: HashMap::new(),
container_id: Some(format!("container{i}")),
};
manager.add_version(version);
}
assert!(manager.list_versions().len() <= MAX_VERSION_HISTORY);
}
#[tokio::test]
async fn test_generate_version() {
let manager = UpdateManager::new(UpdateStrategy::default());
let version1 = manager.generate_version();
sleep(Duration::from_secs(1)).await;
let version2 = manager.generate_version();
assert_ne!(version1, version2);
assert!(version1.starts_with("v"));
assert!(version2.starts_with("v"));
}
}