use crate::core::error::{Error, Result};
use crate::lock_safe;
use crate::ml::serving::{
BatchPredictionRequest, BatchPredictionResponse, DeploymentConfig, HealthCheckConfig,
HealthStatus, ModelInfo, ModelMetadata, ModelServing, MonitoringConfig, PredictionRequest,
PredictionResponse, ResourceConfig, ScalingConfig,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeploymentStatus {
Starting,
Running,
Degraded,
Stopping,
Stopped,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentMetrics {
pub status: DeploymentStatus,
pub active_instances: usize,
pub cpu_utilization: f64,
pub memory_utilization: f64,
pub request_rate: f64,
pub avg_response_time_ms: f64,
pub error_rate: f64,
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub last_health_check: chrono::DateTime<chrono::Utc>,
pub started_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
pub struct DeployedModel {
model: Box<dyn ModelServing>,
config: DeploymentConfig,
metrics: Arc<Mutex<DeploymentMetrics>>,
stats: Arc<Mutex<RequestStats>>,
health_status: Arc<Mutex<HealthStatus>>,
}
#[derive(Debug, Clone)]
struct RequestStats {
request_times: Vec<Instant>,
response_times: Vec<u64>,
error_count: u64,
success_count: u64,
last_cleanup: Instant,
}
impl RequestStats {
fn new() -> Self {
Self {
request_times: Vec::new(),
response_times: Vec::new(),
error_count: 0,
success_count: 0,
last_cleanup: Instant::now(),
}
}
fn record_success(&mut self, response_time_ms: u64) {
let now = Instant::now();
self.request_times.push(now);
self.response_times.push(response_time_ms);
self.success_count += 1;
if now.duration_since(self.last_cleanup) > Duration::from_secs(60) {
self.cleanup_old_entries();
self.last_cleanup = now;
}
}
fn record_error(&mut self) {
let now = Instant::now();
self.request_times.push(now);
self.error_count += 1;
if now.duration_since(self.last_cleanup) > Duration::from_secs(60) {
self.cleanup_old_entries();
self.last_cleanup = now;
}
}
fn cleanup_old_entries(&mut self) {
let cutoff = Instant::now() - Duration::from_secs(300);
let mut valid_indices = Vec::new();
for (i, &time) in self.request_times.iter().enumerate() {
if time > cutoff {
valid_indices.push(i);
}
}
let new_request_times: Vec<_> = valid_indices
.iter()
.map(|&i| self.request_times[i])
.collect();
let new_response_times: Vec<_> = valid_indices
.iter()
.filter_map(|&i| self.response_times.get(i).copied())
.collect();
self.request_times = new_request_times;
self.response_times = new_response_times;
}
fn calculate_request_rate(&self) -> f64 {
let cutoff = Instant::now() - Duration::from_secs(60);
let recent_requests = self
.request_times
.iter()
.filter(|&&time| time > cutoff)
.count();
recent_requests as f64 / 60.0
}
fn calculate_avg_response_time(&self) -> f64 {
if self.response_times.is_empty() {
0.0
} else {
let sum: u64 = self.response_times.iter().sum();
sum as f64 / self.response_times.len() as f64
}
}
fn calculate_error_rate(&self) -> f64 {
let total = self.success_count + self.error_count;
if total == 0 {
0.0
} else {
self.error_count as f64 / total as f64
}
}
}
impl DeployedModel {
pub fn new(model: Box<dyn ModelServing>, config: DeploymentConfig) -> Result<Self> {
let metrics = Arc::new(Mutex::new(DeploymentMetrics {
status: DeploymentStatus::Starting,
active_instances: 1,
cpu_utilization: 0.0,
memory_utilization: 0.0,
request_rate: 0.0,
avg_response_time_ms: 0.0,
error_rate: 0.0,
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
last_health_check: chrono::Utc::now(),
started_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}));
let stats = Arc::new(Mutex::new(RequestStats::new()));
let health_status = Arc::new(Mutex::new(HealthStatus {
status: "starting".to_string(),
details: HashMap::new(),
timestamp: chrono::Utc::now(),
}));
let deployed_model = Self {
model,
config,
metrics,
stats,
health_status,
};
deployed_model.update_health_status()?;
{
let mut metrics = lock_safe!(deployed_model.metrics, "deployment metrics lock")?;
metrics.status = DeploymentStatus::Running;
metrics.updated_at = chrono::Utc::now();
}
Ok(deployed_model)
}
pub fn get_config(&self) -> &DeploymentConfig {
&self.config
}
pub fn get_metrics(&self) -> Result<DeploymentMetrics> {
Ok(lock_safe!(self.metrics, "deployment metrics lock for get")?.clone())
}
fn update_metrics(&self) -> Result<()> {
let stats = lock_safe!(self.stats, "deployment stats lock")?;
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock for update")?;
metrics.request_rate = stats.calculate_request_rate();
metrics.avg_response_time_ms = stats.calculate_avg_response_time();
metrics.error_rate = stats.calculate_error_rate();
metrics.total_requests = stats.success_count + stats.error_count;
metrics.successful_requests = stats.success_count;
metrics.failed_requests = stats.error_count;
metrics.updated_at = chrono::Utc::now();
metrics.cpu_utilization = (metrics.request_rate / 100.0).min(1.0);
metrics.memory_utilization = (metrics.total_requests as f64 / 10000.0).min(1.0);
Ok(())
}
fn update_health_status(&self) -> Result<()> {
let health_result = self.model.health_check();
let mut health_status = lock_safe!(self.health_status, "deployment health status lock")?;
match health_result {
Ok(status) => {
*health_status = status;
}
Err(e) => {
health_status.status = "unhealthy".to_string();
health_status.details.clear();
health_status
.details
.insert("error".to_string(), e.to_string());
health_status.timestamp = chrono::Utc::now();
}
}
{
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
metrics.last_health_check = chrono::Utc::now();
if health_status.status == "healthy" {
if metrics.status == DeploymentStatus::Degraded {
metrics.status = DeploymentStatus::Running;
}
} else if health_status.status == "unhealthy" {
metrics.status = DeploymentStatus::Degraded;
}
}
Ok(())
}
pub fn should_scale_up(&self) -> Result<bool> {
let metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
let config = &self.config.scaling;
Ok(metrics.cpu_utilization > config.scale_up_threshold
|| metrics.memory_utilization > config.scale_up_threshold)
}
pub fn should_scale_down(&self) -> Result<bool> {
let metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
let config = &self.config.scaling;
Ok(metrics.active_instances > config.min_instances
&& metrics.cpu_utilization < config.scale_down_threshold
&& metrics.memory_utilization < config.scale_down_threshold)
}
pub fn scale_up(&self) -> Result<()> {
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
let config = &self.config.scaling;
if metrics.active_instances < config.max_instances {
metrics.active_instances += 1;
metrics.updated_at = chrono::Utc::now();
log::info!(
"Scaled up deployment to {} instances",
metrics.active_instances
);
}
Ok(())
}
pub fn scale_down(&self) -> Result<()> {
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
let config = &self.config.scaling;
if metrics.active_instances > config.min_instances {
metrics.active_instances -= 1;
metrics.updated_at = chrono::Utc::now();
log::info!(
"Scaled down deployment to {} instances",
metrics.active_instances
);
}
Ok(())
}
pub fn stop(&self) -> Result<()> {
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
metrics.status = DeploymentStatus::Stopping;
metrics.updated_at = chrono::Utc::now();
metrics.status = DeploymentStatus::Stopped;
metrics.active_instances = 0;
Ok(())
}
pub fn restart(&self) -> Result<()> {
self.stop()?;
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
metrics.status = DeploymentStatus::Starting;
metrics.active_instances = self.config.scaling.min_instances;
metrics.updated_at = chrono::Utc::now();
drop(metrics);
self.update_health_status()?;
let mut metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
metrics.status = DeploymentStatus::Running;
Ok(())
}
}
impl ModelServing for DeployedModel {
fn predict(&self, request: &PredictionRequest) -> Result<PredictionResponse> {
let start_time = Instant::now();
{
let metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
if metrics.status != DeploymentStatus::Running {
return Err(Error::InvalidOperation(format!(
"Deployment is not running (status: {:?})",
metrics.status
)));
}
}
let result = self.model.predict(request);
let processing_time = start_time.elapsed().as_millis() as u64;
{
let mut stats = lock_safe!(self.stats, "deployment stats lock")?;
match &result {
Ok(_) => stats.record_success(processing_time),
Err(_) => stats.record_error(),
}
}
self.update_metrics()?;
result
}
fn predict_batch(&self, request: &BatchPredictionRequest) -> Result<BatchPredictionResponse> {
let start_time = Instant::now();
{
let metrics = lock_safe!(self.metrics, "deployment metrics lock")?;
if metrics.status != DeploymentStatus::Running {
return Err(Error::InvalidOperation(format!(
"Deployment is not running (status: {:?})",
metrics.status
)));
}
}
let result = self.model.predict_batch(request);
let processing_time = start_time.elapsed().as_millis() as u64;
{
let mut stats = lock_safe!(self.stats, "deployment stats lock")?;
match &result {
Ok(response) => {
let avg_time = processing_time / request.data.len().max(1) as u64;
for _ in 0..response.summary.successful_predictions {
stats.record_success(avg_time);
}
for _ in 0..response.summary.failed_predictions {
stats.record_error();
}
}
Err(_) => stats.record_error(),
}
}
self.update_metrics()?;
result
}
fn get_metadata(&self) -> &ModelMetadata {
self.model.get_metadata()
}
fn health_check(&self) -> Result<HealthStatus> {
self.update_health_status()?;
Ok(lock_safe!(self.health_status, "deployment health status lock")?.clone())
}
fn info(&self) -> ModelInfo {
let mut info = self.model.info();
info.configuration.insert(
"deployment_config".to_string(),
serde_json::to_value(&self.config).unwrap_or(serde_json::Value::Null),
);
info.configuration.insert(
"deployment_metrics".to_string(),
self.get_metrics()
.ok()
.and_then(|m| serde_json::to_value(&m).ok())
.unwrap_or(serde_json::Value::Null),
);
info
}
}
pub struct DeploymentManager {
deployments: HashMap<String, DeployedModel>,
configs: HashMap<String, DeploymentConfig>,
}
impl DeploymentManager {
pub fn new() -> Self {
Self {
deployments: HashMap::new(),
configs: HashMap::new(),
}
}
pub fn deploy(
&mut self,
deployment_name: String,
model: Box<dyn ModelServing>,
config: DeploymentConfig,
) -> Result<()> {
if self.deployments.contains_key(&deployment_name) {
return Err(Error::InvalidOperation(format!(
"Deployment '{}' already exists",
deployment_name
)));
}
let deployed_model = DeployedModel::new(model, config.clone())?;
self.deployments
.insert(deployment_name.clone(), deployed_model);
self.configs.insert(deployment_name, config);
Ok(())
}
pub fn undeploy(&mut self, deployment_name: &str) -> Result<()> {
if let Some(deployment) = self.deployments.get(deployment_name) {
deployment.stop()?;
}
self.deployments.remove(deployment_name);
self.configs.remove(deployment_name);
Ok(())
}
pub fn get_deployment(&self, deployment_name: &str) -> Option<&DeployedModel> {
self.deployments.get(deployment_name)
}
pub fn list_deployments(&self) -> Vec<String> {
self.deployments.keys().cloned().collect()
}
pub fn get_deployment_metrics(&self, deployment_name: &str) -> Option<DeploymentMetrics> {
self.deployments
.get(deployment_name)
.and_then(|deployment| deployment.get_metrics().ok())
}
pub fn scale_deployment(&self, deployment_name: &str, target_instances: usize) -> Result<()> {
let deployment = self.deployments.get(deployment_name).ok_or_else(|| {
Error::KeyNotFound(format!("Deployment '{}' not found", deployment_name))
})?;
let current_instances = deployment.get_metrics()?.active_instances;
if target_instances > current_instances {
for _ in current_instances..target_instances {
deployment.scale_up()?;
}
} else if target_instances < current_instances {
for _ in target_instances..current_instances {
deployment.scale_down()?;
}
}
Ok(())
}
pub fn auto_scale_all(&self) -> Result<()> {
for deployment in self.deployments.values() {
if deployment.should_scale_up()? {
deployment.scale_up()?;
} else if deployment.should_scale_down()? {
deployment.scale_down()?;
}
}
Ok(())
}
pub fn health_check_all(&self) -> HashMap<String, HealthStatus> {
let mut results = HashMap::new();
for (name, deployment) in &self.deployments {
match deployment.health_check() {
Ok(status) => {
results.insert(name.clone(), status);
}
Err(e) => {
results.insert(
name.clone(),
HealthStatus {
status: "error".to_string(),
details: {
let mut details = HashMap::new();
details.insert("error".to_string(), e.to_string());
details
},
timestamp: chrono::Utc::now(),
},
);
}
}
}
results
}
}
impl Default for DeploymentManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ml::serving::serialization::GenericServingModel;
use crate::ml::serving::ModelMetadata;
fn create_test_config() -> DeploymentConfig {
DeploymentConfig {
model_name: "test_model".to_string(),
model_version: "1.0.0".to_string(),
environment: "test".to_string(),
resources: ResourceConfig {
cpu_cores: 1.0,
memory_mb: 1024,
gpu_memory_mb: None,
max_concurrent_requests: 10,
},
scaling: ScalingConfig {
min_instances: 1,
max_instances: 5,
target_cpu_utilization: 0.7,
target_memory_utilization: 0.8,
scale_up_threshold: 0.8,
scale_down_threshold: 0.3,
},
health_check: HealthCheckConfig {
path: "/health".to_string(),
interval_seconds: 30,
timeout_seconds: 5,
failure_threshold: 3,
success_threshold: 2,
},
monitoring: MonitoringConfig {
enable_metrics: true,
enable_logging: true,
enable_tracing: false,
metrics_interval_seconds: 60,
log_level: "info".to_string(),
},
}
}
#[test]
fn test_deployment_status() {
let status = DeploymentStatus::Running;
assert_eq!(status, DeploymentStatus::Running);
}
#[test]
fn test_request_stats() {
let mut stats = RequestStats::new();
stats.record_success(100);
stats.record_success(150);
stats.record_error();
assert!(stats.calculate_avg_response_time() > 0.0);
assert!(stats.calculate_error_rate() > 0.0);
assert_eq!(stats.success_count, 2);
assert_eq!(stats.error_count, 1);
}
#[test]
fn test_deployment_manager() {
let manager = DeploymentManager::new();
assert!(manager.list_deployments().is_empty());
let config = create_test_config();
assert_eq!(config.model_name, "test_model");
assert_eq!(config.scaling.min_instances, 1);
assert_eq!(config.scaling.max_instances, 5);
}
}