use super::validation::MigrationValidator;
use crate::migration::functions::simple_checksum;
use crate::{Agent, CellError};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationErrorType {
NetworkTransient,
NetworkPermanent,
InsufficientResources,
IncompatibleTarget,
StateCorruption,
Timeout,
TargetUnreachable,
Unknown,
}
impl MigrationErrorType {
pub fn is_retryable(&self) -> bool {
matches!(
self,
Self::NetworkTransient
| Self::InsufficientResources
| Self::Timeout
| Self::TargetUnreachable
)
}
pub fn should_change_target(&self) -> bool {
matches!(
self,
Self::InsufficientResources | Self::IncompatibleTarget | Self::TargetUnreachable
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationError {
pub error_type: MigrationErrorType,
pub message: String,
pub agent_id: [u8; 16],
pub target_node: Option<[u8; 16]>,
pub timestamp: u64,
pub retry_count: u32,
}
impl MigrationError {
pub fn new(
error_type: MigrationErrorType,
message: String,
agent_id: [u8; 16],
target_node: Option<[u8; 16]>,
) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
error_type,
message,
agent_id,
target_node,
timestamp,
retry_count: 0,
}
}
pub fn increment_retry(&mut self) {
self.retry_count += 1;
}
pub fn age_secs(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
now.saturating_sub(self.timestamp)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RecoveryStrategy {
RetryOnSameTarget,
RetryOnDifferentTarget,
Rollback,
GiveUp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoveryAttempt {
pub attempt_number: u32,
pub strategy: RecoveryStrategy,
pub timestamp: u64,
pub target_node: Option<[u8; 16]>,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoveryConfig {
pub max_retries: u32,
pub initial_retry_delay_secs: u32,
pub max_retry_delay_secs: u32,
pub use_exponential_backoff: bool,
pub recovery_timeout_secs: u32,
pub auto_rollback: bool,
}
impl Default for RecoveryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_retry_delay_secs: 10,
max_retry_delay_secs: 300,
use_exponential_backoff: true,
recovery_timeout_secs: 600,
auto_rollback: true,
}
}
}
impl RecoveryConfig {
pub fn calculate_retry_delay(&self, attempt: u32) -> u32 {
if !self.use_exponential_backoff {
return self.initial_retry_delay_secs;
}
let delay = self.initial_retry_delay_secs * 2_u32.pow(attempt.saturating_sub(1));
delay.min(self.max_retry_delay_secs)
}
pub fn validate(&self) -> Result<(), String> {
if self.initial_retry_delay_secs == 0 {
return Err("initial_retry_delay_secs must be > 0".to_string());
}
if self.max_retry_delay_secs < self.initial_retry_delay_secs {
return Err("max_retry_delay_secs must be >= initial_retry_delay_secs".to_string());
}
if self.recovery_timeout_secs == 0 {
return Err("recovery_timeout_secs must be > 0".to_string());
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RollbackInfo {
pub agent_id: [u8; 16],
pub original_state: Vec<u8>,
pub original_checksum: u32,
pub original_policy_data: Vec<u8>,
pub created_at: u64,
pub source_node: Option<[u8; 16]>,
}
impl RollbackInfo {
pub fn capture(agent: &Agent, state: &[u8]) -> Result<Self, CellError> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| CellError::InvalidState("Time error".to_string()))?
.as_secs();
let policy_data = oxicode::encode_to_vec(&oxicode::serde::Compat(agent.policy()))
.map_err(|e| CellError::InvalidState(format!("Policy serialization failed: {}", e)))?;
Ok(Self {
agent_id: *agent.id().as_bytes(),
original_state: state.to_vec(),
original_checksum: simple_checksum(state),
original_policy_data: policy_data,
created_at: timestamp,
source_node: None,
})
}
pub fn age_secs(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
now.saturating_sub(self.created_at)
}
pub fn is_valid(&self, max_age_secs: u64) -> bool {
self.age_secs() < max_age_secs
}
}
#[derive(Debug)]
pub struct MigrationRecoveryManager {
config: RecoveryConfig,
pending_recoveries: HashMap<[u8; 16], MigrationError>,
recovery_history: HashMap<[u8; 16], Vec<RecoveryAttempt>>,
validator: MigrationValidator,
}
impl MigrationRecoveryManager {
pub fn new(config: RecoveryConfig) -> Result<Self, String> {
config.validate()?;
Ok(Self {
config,
pending_recoveries: HashMap::new(),
recovery_history: HashMap::new(),
validator: MigrationValidator::new(),
})
}
pub fn with_defaults() -> Self {
Self {
config: RecoveryConfig::default(),
pending_recoveries: HashMap::new(),
recovery_history: HashMap::new(),
validator: MigrationValidator::new(),
}
}
pub fn record_failure(&mut self, error: MigrationError) {
self.pending_recoveries
.insert(error.agent_id, error.clone());
self.recovery_history.entry(error.agent_id).or_default();
}
pub fn determine_strategy(&self, error: &MigrationError) -> RecoveryStrategy {
if error.retry_count >= self.config.max_retries {
if self.config.auto_rollback {
return RecoveryStrategy::Rollback;
} else {
return RecoveryStrategy::GiveUp;
}
}
match error.error_type {
MigrationErrorType::NetworkTransient | MigrationErrorType::Timeout => {
RecoveryStrategy::RetryOnSameTarget
}
MigrationErrorType::InsufficientResources
| MigrationErrorType::IncompatibleTarget
| MigrationErrorType::TargetUnreachable => RecoveryStrategy::RetryOnDifferentTarget,
MigrationErrorType::StateCorruption
| MigrationErrorType::NetworkPermanent
| MigrationErrorType::Unknown => {
if self.config.auto_rollback {
RecoveryStrategy::Rollback
} else {
RecoveryStrategy::GiveUp
}
}
}
}
pub fn execute_recovery(&mut self, agent_id: &[u8; 16]) -> Result<RecoveryStrategy, String> {
let (strategy, attempt_number, target_node) = {
let error = self
.pending_recoveries
.get(agent_id)
.ok_or_else(|| "No pending recovery for agent".to_string())?;
let strategy = self.determine_strategy(error);
let attempt_number = error.retry_count + 1;
let target_node = error.target_node;
(strategy, attempt_number, target_node)
};
let attempt = RecoveryAttempt {
attempt_number,
strategy,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
target_node,
success: false,
error: None,
};
self.recovery_history
.entry(*agent_id)
.or_default()
.push(attempt);
if let Some(error) = self.pending_recoveries.get_mut(agent_id) {
error.increment_retry();
}
Ok(strategy)
}
pub fn mark_recovery_success(&mut self, agent_id: &[u8; 16]) {
self.pending_recoveries.remove(agent_id);
if let Some(history) = self.recovery_history.get_mut(agent_id) {
if let Some(last_attempt) = history.last_mut() {
last_attempt.success = true;
}
}
}
pub fn mark_recovery_failure(&mut self, agent_id: &[u8; 16], error_msg: String) {
if let Some(history) = self.recovery_history.get_mut(agent_id) {
if let Some(last_attempt) = history.last_mut() {
last_attempt.success = false;
last_attempt.error = Some(error_msg);
}
}
}
pub fn pending_count(&self) -> usize {
self.pending_recoveries.len()
}
pub fn get_history(&self, agent_id: &[u8; 16]) -> Option<&Vec<RecoveryAttempt>> {
self.recovery_history.get(agent_id)
}
pub fn get_pending(&self) -> Vec<&MigrationError> {
self.pending_recoveries.values().collect()
}
pub fn next_retry_time(&self, agent_id: &[u8; 16]) -> Option<u64> {
let error = self.pending_recoveries.get(agent_id)?;
let delay = self.config.calculate_retry_delay(error.retry_count);
Some(error.timestamp + u64::from(delay))
}
pub fn is_ready_for_retry(&self, agent_id: &[u8; 16]) -> bool {
if let Some(next_time) = self.next_retry_time(agent_id) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
now >= next_time
} else {
false
}
}
pub fn get_ready_for_retry(&self) -> Vec<[u8; 16]> {
self.pending_recoveries
.keys()
.filter(|&id| self.is_ready_for_retry(id))
.copied()
.collect()
}
pub fn clear_history(&mut self, agent_id: &[u8; 16]) {
self.recovery_history.remove(agent_id);
self.pending_recoveries.remove(agent_id);
}
pub fn config(&self) -> &RecoveryConfig {
&self.config
}
pub fn set_config(&mut self, config: RecoveryConfig) -> Result<(), String> {
config.validate()?;
self.config = config;
Ok(())
}
pub fn validator(&self) -> &MigrationValidator {
&self.validator
}
pub fn validator_mut(&mut self) -> &mut MigrationValidator {
&mut self.validator
}
}