#![ allow( clippy::missing_inline_in_public_items ) ]
mod private
{
use std::
{
collections ::HashMap,
time ::SystemTime,
};
use core::time::Duration;
use serde::{ Deserialize, Serialize };
use tokio::sync::mpsc;
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum DeploymentStatus
{
Preparing,
Deploying,
Active,
Scaling,
Updating,
RollingBack,
Paused,
Failed( String ),
Terminating,
Terminated,
}
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum DeploymentStrategy
{
Rolling
{
max_surge : u32,
max_unavailable : u32,
},
BlueGreen
{
traffic_split : u8,
},
Canary
{
initial_traffic : u8,
final_traffic : u8,
evaluation_duration_ms : u64,
},
Recreate,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct DeploymentConfig
{
pub name : String,
pub model_id : String,
pub model_version : String,
pub environment : String,
pub replicas : u32,
pub resources : ResourceRequirements,
pub auto_scaling : Option< AutoScalingConfig >,
pub strategy : DeploymentStrategy,
pub health_check : HealthCheckConfig,
pub env_vars : HashMap< String, String >,
pub timeout_ms : u64,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct ResourceRequirements
{
pub cpu : f64,
pub memory_mb : u64,
pub gpu : Option< u32 >,
pub storage_gb : u64,
}
impl Default for ResourceRequirements
{
fn default() -> Self
{
Self
{
cpu : 1.0,
memory_mb : 2048,
gpu : None,
storage_gb : 10,
}
}
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct AutoScalingConfig
{
pub min_replicas : u32,
pub max_replicas : u32,
pub target_cpu_percent : u8,
pub target_memory_percent : u8,
pub scale_up_cooldown_s : u64,
pub scale_down_cooldown_s : u64,
}
impl Default for AutoScalingConfig
{
fn default() -> Self
{
Self
{
min_replicas : 1,
max_replicas : 10,
target_cpu_percent : 70,
target_memory_percent : 80,
scale_up_cooldown_s : 300,
scale_down_cooldown_s : 600,
}
}
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct HealthCheckConfig
{
pub path : String,
pub port : u16,
pub interval_s : u64,
pub timeout_s : u64,
pub failure_threshold : u32,
pub success_threshold : u32,
}
impl Default for HealthCheckConfig
{
fn default() -> Self
{
Self
{
path : "/health".to_string(),
port : 8080,
interval_s : 30,
timeout_s : 5,
failure_threshold : 3,
success_threshold : 1,
}
}
}
#[ derive( Debug, Clone ) ]
pub struct ModelDeployment
{
pub config : DeploymentConfig,
pub status : DeploymentStatus,
pub created_at : SystemTime,
pub updated_at : SystemTime,
pub running_replicas : u32,
pub healthy_replicas : u32,
deployment_history : Vec< DeploymentEvent >,
}
impl ModelDeployment
{
#[ must_use ]
pub fn new( config : DeploymentConfig ) -> Self
{
Self
{
config,
status : DeploymentStatus::Preparing,
created_at : SystemTime::now(),
updated_at : SystemTime::now(),
running_replicas : 0,
healthy_replicas : 0,
deployment_history : Vec::new(),
}
}
pub fn update_status( &mut self, status : DeploymentStatus )
{
let event = DeploymentEvent
{
timestamp : SystemTime::now(),
event_type : DeploymentEventType::StatusChanged
{
from : self.status.clone(),
to : status.clone(),
},
message : format!( "Status changed from {:?} to {:?}", self.status, status ),
};
self.status = status;
self.updated_at = SystemTime::now();
self.deployment_history.push( event );
}
pub fn update_replicas( &mut self, running : u32, healthy : u32 )
{
let event = DeploymentEvent
{
timestamp : SystemTime::now(),
event_type : DeploymentEventType::ReplicasChanged
{
running,
healthy,
},
message : format!( "Replicas updated : {running} running, {healthy} healthy" ),
};
self.running_replicas = running;
self.healthy_replicas = healthy;
self.updated_at = SystemTime::now();
self.deployment_history.push( event );
}
#[ must_use ]
pub fn is_healthy( &self ) -> bool
{
matches!( self.status, DeploymentStatus::Active ) &&
self.healthy_replicas >= self.config.replicas
}
#[ must_use ]
pub fn is_ready( &self ) -> bool
{
matches!( self.status, DeploymentStatus::Active ) &&
self.running_replicas >= self.config.replicas
}
#[ must_use ]
pub fn age( &self ) -> Duration
{
self.created_at.elapsed().unwrap_or( Duration::from_secs( 0 ) )
}
#[ must_use ]
pub fn history( &self ) -> &Vec< DeploymentEvent >
{
&self.deployment_history
}
pub fn add_event( &mut self, event_type : DeploymentEventType, message : String )
{
let event = DeploymentEvent
{
timestamp : SystemTime::now(),
event_type,
message,
};
self.deployment_history.push( event );
self.updated_at = SystemTime::now();
}
}
#[ derive( Debug, Clone ) ]
pub struct DeploymentEvent
{
pub timestamp : SystemTime,
pub event_type : DeploymentEventType,
pub message : String,
}
#[ derive( Debug, Clone ) ]
pub enum DeploymentEventType
{
StatusChanged
{
from : DeploymentStatus,
to : DeploymentStatus,
},
ReplicasChanged
{
running : u32,
healthy : u32,
},
ScalingTriggered
{
from : u32,
to : u32,
reason : String,
},
RollbackTriggered
{
target_version : String,
reason : String,
},
HealthCheckFailed
{
failures : u32,
},
ConfigurationUpdated
{
fields : Vec< String >,
},
}
#[ derive( Debug ) ]
pub struct DeploymentManager
{
deployments : HashMap< String, ModelDeployment >,
config : DeploymentManagerConfig,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct DeploymentManagerConfig
{
pub max_deployments : usize,
pub default_timeout_ms : u64,
pub health_check_retry_s : u64,
pub max_history_size : usize,
}
impl Default for DeploymentManagerConfig
{
fn default() -> Self
{
Self
{
max_deployments : 100,
default_timeout_ms : 600_000, health_check_retry_s : 30,
max_history_size : 1000,
}
}
}
impl DeploymentManager
{
#[ must_use ]
pub fn new( config : DeploymentManagerConfig ) -> Self
{
Self
{
deployments : HashMap::new(),
config,
}
}
pub fn add_deployment( &mut self, deployment : ModelDeployment ) -> Result< (), String >
{
if self.deployments.len() >= self.config.max_deployments
{
return Err( "Maximum number of deployments reached".to_string() );
}
let deployment_name = deployment.config.name.clone();
if self.deployments.contains_key( &deployment_name )
{
return Err( format!( "Deployment '{deployment_name}' already exists" ) );
}
self.deployments.insert( deployment_name, deployment );
Ok( () )
}
#[ must_use ]
pub fn get_deployment( &self, name : &str ) -> Option< &ModelDeployment >
{
self.deployments.get( name )
}
pub fn get_deployment_mut( &mut self, name : &str ) -> Option< &mut ModelDeployment >
{
self.deployments.get_mut( name )
}
pub fn remove_deployment( &mut self, name : &str ) -> Option< ModelDeployment >
{
self.deployments.remove( name )
}
#[ must_use ]
pub fn deployment_names( &self ) -> Vec< String >
{
self.deployments.keys().cloned().collect()
}
#[ must_use ]
pub fn deployment_stats( &self ) -> DeploymentStats
{
let mut stats = DeploymentStats
{
total : self.deployments.len(),
active : 0,
failed : 0,
preparing : 0,
deploying : 0,
scaling : 0,
total_replicas : 0,
healthy_replicas : 0,
};
for deployment in self.deployments.values()
{
match deployment.status
{
DeploymentStatus::Active => stats.active += 1,
DeploymentStatus::Failed( _ ) => stats.failed += 1,
DeploymentStatus::Preparing => stats.preparing += 1,
DeploymentStatus::Deploying => stats.deploying += 1,
DeploymentStatus::Scaling => stats.scaling += 1,
_ => {},
}
stats.total_replicas += deployment.running_replicas;
stats.healthy_replicas += deployment.healthy_replicas;
}
stats
}
pub fn cleanup_history( &mut self )
{
for deployment in self.deployments.values_mut()
{
if deployment.deployment_history.len() > self.config.max_history_size
{
let excess = deployment.deployment_history.len() - self.config.max_history_size;
deployment.deployment_history.drain( 0..excess );
}
}
}
}
#[ derive( Debug, Clone ) ]
pub struct DeploymentStats
{
pub total : usize,
pub active : usize,
pub failed : usize,
pub preparing : usize,
pub deploying : usize,
pub scaling : usize,
pub total_replicas : u32,
pub healthy_replicas : u32,
}
#[ derive( Debug ) ]
pub struct ModelDeploymentUtils;
impl ModelDeploymentUtils
{
#[ must_use ]
pub fn create_event_notifier() -> ( DeploymentEventSender, DeploymentEventReceiver )
{
let ( tx, rx ) = mpsc::unbounded_channel();
( DeploymentEventSender { sender : tx }, DeploymentEventReceiver { receiver : rx } )
}
#[ must_use = "validation errors should be handled" ]
pub fn validate_config( config : &DeploymentConfig ) -> Result< (), Vec< String > >
{
let mut errors = Vec::new();
if config.name.is_empty()
{
errors.push( "Deployment name cannot be empty".to_string() );
}
if config.model_id.is_empty()
{
errors.push( "Model ID cannot be empty".to_string() );
}
if config.model_version.is_empty()
{
errors.push( "Model version cannot be empty".to_string() );
}
if config.replicas == 0
{
errors.push( "Replicas must be greater than 0".to_string() );
}
if config.resources.cpu <= 0.0
{
errors.push( "CPU requirements must be greater than 0".to_string() );
}
if config.resources.memory_mb == 0
{
errors.push( "Memory requirements must be greater than 0".to_string() );
}
if config.timeout_ms == 0
{
errors.push( "Timeout must be greater than 0".to_string() );
}
if let Some( ref auto_scaling ) = config.auto_scaling
{
if auto_scaling.min_replicas > auto_scaling.max_replicas
{
errors.push( "Min replicas cannot be greater than max replicas".to_string() );
}
if auto_scaling.target_cpu_percent == 0 || auto_scaling.target_cpu_percent > 100
{
errors.push( "Target CPU percent must be between 1 and 100".to_string() );
}
if auto_scaling.target_memory_percent == 0 || auto_scaling.target_memory_percent > 100
{
errors.push( "Target memory percent must be between 1 and 100".to_string() );
}
}
match &config.strategy
{
DeploymentStrategy::BlueGreen { traffic_split } =>
{
if *traffic_split > 100
{
errors.push( "Traffic split cannot be greater than 100%".to_string() );
}
}
DeploymentStrategy::Canary { initial_traffic, final_traffic, .. } =>
{
if *initial_traffic > 100 || *final_traffic > 100
{
errors.push( "Traffic percentages cannot be greater than 100%".to_string() );
}
if initial_traffic >= final_traffic
{
errors.push( "Initial traffic must be less than final traffic for canary deployment".to_string() );
}
}
_ => {},
}
if errors.is_empty()
{
Ok( () )
}
else
{
Err( errors )
}
}
#[ must_use ]
pub fn calculate_total_resources( config : &DeploymentConfig ) -> ResourceRequirements
{
ResourceRequirements
{
cpu : config.resources.cpu * f64::from( config.replicas ),
memory_mb : config.resources.memory_mb * u64::from( config.replicas ),
gpu : config.resources.gpu.map( | gpu | gpu * config.replicas ),
storage_gb : config.resources.storage_gb * u64::from( config.replicas ),
}
}
#[ must_use ]
pub fn estimate_deployment_time( config : &DeploymentConfig ) -> Duration
{
let base_time = Duration::from_secs( 60 );
let strategy_multiplier = match &config.strategy
{
DeploymentStrategy::Rolling { .. } => 1.5,
DeploymentStrategy::BlueGreen { .. } => 2.0,
DeploymentStrategy::Canary { evaluation_duration_ms, .. } =>
{
2.0 + ( (*evaluation_duration_ms as f64) / 60000.0 ) }
DeploymentStrategy::Recreate => 0.5,
};
let total_seconds_f64 = ( (base_time.as_secs() as f64) * f64::from( config.replicas ) * strategy_multiplier )
.max( 0.0 )
.min( u64::MAX as f64 );
let total_seconds = if total_seconds_f64.is_finite() && total_seconds_f64 >= 0.0
{
#[ allow(clippy::cast_possible_truncation, clippy::cast_sign_loss) ]
let result = total_seconds_f64 as u64;
result
}
else
{
0u64
};
Duration::from_secs( total_seconds )
}
#[ must_use ]
pub fn create_rollback_plan( current_config : &DeploymentConfig, target_version : String ) -> DeploymentConfig
{
let mut rollback_config = current_config.clone();
rollback_config.model_version = target_version;
rollback_config.strategy = DeploymentStrategy::Rolling
{
max_surge : 1,
max_unavailable : 0,
};
rollback_config
}
#[ must_use ]
pub fn should_scale(
current_replicas : u32,
auto_scaling : &AutoScalingConfig,
cpu_utilization : u8,
memory_utilization : u8
) -> Option< u32 >
{
if cpu_utilization > auto_scaling.target_cpu_percent || memory_utilization > auto_scaling.target_memory_percent
{
Some( core::cmp::min( current_replicas + 1, auto_scaling.max_replicas ) )
}
else if cpu_utilization < auto_scaling.target_cpu_percent / 2 && memory_utilization < auto_scaling.target_memory_percent / 2
{
Some( core::cmp::max( current_replicas.saturating_sub( 1 ), auto_scaling.min_replicas ) )
}
else
{
None
}
}
}
#[ derive( Debug, Clone ) ]
pub struct DeploymentEventSender
{
sender : mpsc::UnboundedSender< DeploymentNotification >,
}
impl DeploymentEventSender
{
pub fn send_event( &self, event : DeploymentNotification ) -> Result< (), &'static str >
{
self.sender.send( event ).map_err( | _ | "Failed to send deployment event" )
}
pub fn send_deployment_started( &self, name : String, config : DeploymentConfig ) -> Result< (), &'static str >
{
self.send_event( DeploymentNotification::DeploymentStarted { name, config } )
}
pub fn send_deployment_completed( &self, name : String ) -> Result< (), &'static str >
{
self.send_event( DeploymentNotification::DeploymentCompleted { name } )
}
pub fn send_deployment_failed( &self, name : String, error : String ) -> Result< (), &'static str >
{
self.send_event( DeploymentNotification::DeploymentFailed { name, error } )
}
pub fn send_scaling_triggered( &self, name : String, from : u32, to : u32 ) -> Result< (), &'static str >
{
self.send_event( DeploymentNotification::ScalingTriggered { name, from, to } )
}
}
#[ derive( Debug ) ]
pub struct DeploymentEventReceiver
{
receiver : mpsc::UnboundedReceiver< DeploymentNotification >,
}
impl DeploymentEventReceiver
{
pub fn try_recv( &mut self ) -> Option< DeploymentNotification >
{
self.receiver.try_recv().ok()
}
pub async fn recv( &mut self ) -> Option< DeploymentNotification >
{
self.receiver.recv().await
}
}
#[ derive( Debug, Clone ) ]
pub enum DeploymentNotification
{
DeploymentStarted
{
name : String,
config : DeploymentConfig,
},
DeploymentCompleted
{
name : String,
},
DeploymentFailed
{
name : String,
error : String,
},
ScalingTriggered
{
name : String,
from : u32,
to : u32,
},
RollbackTriggered
{
name : String,
target_version : String,
},
}
}
crate ::mod_interface!
{
exposed use private::DeploymentStatus;
exposed use private::DeploymentStrategy;
exposed use private::DeploymentConfig;
exposed use private::ResourceRequirements;
exposed use private::AutoScalingConfig;
exposed use private::HealthCheckConfig;
exposed use private::ModelDeployment;
exposed use private::DeploymentEvent;
exposed use private::DeploymentEventType;
exposed use private::DeploymentManager;
exposed use private::DeploymentManagerConfig;
exposed use private::DeploymentStats;
exposed use private::ModelDeploymentUtils;
exposed use private::DeploymentEventSender;
exposed use private::DeploymentEventReceiver;
exposed use private::DeploymentNotification;
}