Skip to main content

engine/distributed/
upgrades.rs

1//! Rolling Upgrades Support for Distributed Dakera
2//!
3//! Provides coordinated cluster upgrades with:
4//! - Version compatibility checking
5//! - Graceful node draining
6//! - Coordinated upgrade ordering (replicas before leaders)
7//! - Health checks and rollback support
8//! - State persistence during upgrades
9
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use thiserror::Error;
16use tracing::{debug, info, warn};
17
18/// Version information for a node
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct Version {
21    pub major: u32,
22    pub minor: u32,
23    pub patch: u32,
24    /// Build metadata (e.g., commit hash)
25    pub build: Option<String>,
26}
27
28impl Version {
29    pub fn new(major: u32, minor: u32, patch: u32) -> Self {
30        Self {
31            major,
32            minor,
33            patch,
34            build: None,
35        }
36    }
37
38    pub fn with_build(mut self, build: impl Into<String>) -> Self {
39        self.build = Some(build.into());
40        self
41    }
42
43    /// Check if this version is compatible with another version
44    pub fn is_compatible_with(&self, other: &Version) -> bool {
45        // Same major version is required for compatibility
46        // Minor version differences are allowed (backward compatible)
47        self.major == other.major
48    }
49
50    /// Check if this version is newer than another
51    pub fn is_newer_than(&self, other: &Version) -> bool {
52        if self.major != other.major {
53            return self.major > other.major;
54        }
55        if self.minor != other.minor {
56            return self.minor > other.minor;
57        }
58        self.patch > other.patch
59    }
60
61    /// Parse version from string (e.g., "1.2.3")
62    pub fn parse(s: &str) -> Option<Self> {
63        let parts: Vec<&str> = s.split('.').collect();
64        if parts.len() < 3 {
65            return None;
66        }
67        Some(Self {
68            major: parts[0].parse().ok()?,
69            minor: parts[1].parse().ok()?,
70            patch: parts[2].parse().ok()?,
71            build: None,
72        })
73    }
74}
75
76impl std::fmt::Display for Version {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(f, "{}.{}.{}", self.major, self.minor, self.patch)?;
79        if let Some(ref build) = self.build {
80            write!(f, "+{}", build)?;
81        }
82        Ok(())
83    }
84}
85
86/// Configuration for rolling upgrades
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct UpgradeConfig {
89    /// Maximum time to wait for a node to drain (ms)
90    pub drain_timeout_ms: u64,
91    /// Health check interval during upgrade (ms)
92    pub health_check_interval_ms: u64,
93    /// Number of health checks required before marking node ready
94    pub required_health_checks: u32,
95    /// Whether to upgrade replicas before leaders
96    pub replicas_first: bool,
97    /// Maximum concurrent upgrades
98    pub max_concurrent: u32,
99    /// Automatic rollback on failure
100    pub auto_rollback: bool,
101    /// Minimum healthy nodes required during upgrade
102    pub min_healthy_nodes: u32,
103}
104
105impl Default for UpgradeConfig {
106    fn default() -> Self {
107        Self {
108            drain_timeout_ms: 60000,        // 1 minute drain timeout
109            health_check_interval_ms: 5000, // 5 second health checks
110            required_health_checks: 3,      // 3 successful checks
111            replicas_first: true,           // Upgrade replicas before leaders
112            max_concurrent: 1,              // One node at a time by default
113            auto_rollback: true,            // Rollback on failure
114            min_healthy_nodes: 1,           // At least 1 healthy node
115        }
116    }
117}
118
119/// State of a node during upgrade
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub enum NodeUpgradeState {
122    /// Node is running normally
123    Normal,
124    /// Node is scheduled for upgrade
125    Scheduled,
126    /// Node is draining (no new requests)
127    Draining,
128    /// Node is being upgraded
129    Upgrading,
130    /// Node is recovering after upgrade
131    Recovering,
132    /// Node upgrade completed successfully
133    Completed,
134    /// Node upgrade failed
135    Failed,
136    /// Node is rolling back
137    RollingBack,
138}
139
140/// Information about a node's upgrade status
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct NodeUpgradeInfo {
143    pub node_id: String,
144    pub state: NodeUpgradeState,
145    pub current_version: Version,
146    pub target_version: Option<Version>,
147    pub started_at: Option<u64>,
148    pub completed_at: Option<u64>,
149    pub error: Option<String>,
150    pub health_checks_passed: u32,
151    pub is_leader: bool,
152}
153
154/// Overall upgrade status
155#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
156pub enum UpgradeStatus {
157    /// No upgrade in progress
158    Idle,
159    /// Upgrade is being planned
160    Planning,
161    /// Upgrade is in progress
162    InProgress,
163    /// Upgrade is paused
164    Paused,
165    /// Upgrade completed successfully
166    Completed,
167    /// Upgrade failed
168    Failed,
169    /// Rollback in progress
170    RollingBack,
171}
172
173/// Upgrade plan for the cluster
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct UpgradePlan {
176    /// Unique ID for this upgrade
177    pub upgrade_id: String,
178    /// Target version for the upgrade
179    pub target_version: Version,
180    /// Ordered list of nodes to upgrade
181    pub node_order: Vec<String>,
182    /// Current position in the upgrade order
183    pub current_index: usize,
184    /// Overall status
185    pub status: UpgradeStatus,
186    /// When the upgrade was initiated
187    pub created_at: u64,
188    /// When the upgrade started execution
189    pub started_at: Option<u64>,
190    /// When the upgrade completed
191    pub completed_at: Option<u64>,
192}
193
194/// Statistics about the upgrade process
195#[derive(Debug, Clone, Default, Serialize, Deserialize)]
196pub struct UpgradeStats {
197    pub total_nodes: u32,
198    pub upgraded_nodes: u32,
199    pub failed_nodes: u32,
200    pub pending_nodes: u32,
201    pub currently_upgrading: u32,
202    pub rollback_count: u32,
203}
204
205/// Errors during upgrade operations
206#[derive(Debug, Error)]
207pub enum UpgradeError {
208    #[error("Upgrade already in progress: {0}")]
209    AlreadyInProgress(String),
210
211    #[error("Incompatible version: {current} cannot upgrade to {target}")]
212    IncompatibleVersion { current: String, target: String },
213
214    #[error("Node not found: {0}")]
215    NodeNotFound(String),
216
217    #[error("Drain timeout for node: {0}")]
218    DrainTimeout(String),
219
220    #[error("Health check failed for node: {0}")]
221    HealthCheckFailed(String),
222
223    #[error("Not enough healthy nodes: have {have}, need {need}")]
224    NotEnoughHealthyNodes { have: u32, need: u32 },
225
226    #[error("Upgrade failed: {0}")]
227    UpgradeFailed(String),
228
229    #[error("Rollback failed: {0}")]
230    RollbackFailed(String),
231
232    #[error("No upgrade in progress")]
233    NoUpgradeInProgress,
234
235    #[error("Upgrade paused")]
236    UpgradePaused,
237}
238
239pub type Result<T> = std::result::Result<T, UpgradeError>;
240
241/// Manager for rolling upgrades
242pub struct UpgradeManager {
243    config: UpgradeConfig,
244    current_plan: Arc<RwLock<Option<UpgradePlan>>>,
245    node_states: Arc<RwLock<HashMap<String, NodeUpgradeInfo>>>,
246    paused: AtomicBool,
247}
248
249impl UpgradeManager {
250    /// Create a new upgrade manager
251    pub fn new(config: UpgradeConfig) -> Self {
252        Self {
253            config,
254            current_plan: Arc::new(RwLock::new(None)),
255            node_states: Arc::new(RwLock::new(HashMap::new())),
256            paused: AtomicBool::new(false),
257        }
258    }
259
260    /// Register a node with its current version
261    pub fn register_node(&self, node_id: &str, version: Version, is_leader: bool) {
262        let mut states = self.node_states.write();
263        states.insert(
264            node_id.to_string(),
265            NodeUpgradeInfo {
266                node_id: node_id.to_string(),
267                state: NodeUpgradeState::Normal,
268                current_version: version,
269                target_version: None,
270                started_at: None,
271                completed_at: None,
272                error: None,
273                health_checks_passed: 0,
274                is_leader,
275            },
276        );
277    }
278
279    /// Remove a node from tracking
280    pub fn unregister_node(&self, node_id: &str) {
281        let mut states = self.node_states.write();
282        states.remove(node_id);
283    }
284
285    /// Update a node's leader status
286    pub fn update_leader_status(&self, node_id: &str, is_leader: bool) {
287        let mut states = self.node_states.write();
288        if let Some(info) = states.get_mut(node_id) {
289            info.is_leader = is_leader;
290        }
291    }
292
293    /// Plan an upgrade to a target version
294    pub fn plan_upgrade(&self, target_version: Version) -> Result<UpgradePlan> {
295        // Check if upgrade already in progress
296        {
297            let plan = self.current_plan.read();
298            if let Some(ref p) = *plan {
299                if p.status == UpgradeStatus::InProgress {
300                    return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
301                }
302            }
303        }
304
305        let states = self.node_states.read();
306
307        // Check version compatibility for all nodes
308        for (_node_id, info) in states.iter() {
309            if !info.current_version.is_compatible_with(&target_version) {
310                return Err(UpgradeError::IncompatibleVersion {
311                    current: info.current_version.to_string(),
312                    target: target_version.to_string(),
313                });
314            }
315        }
316
317        // Build upgrade order: replicas first, then leaders
318        let mut replicas: Vec<String> = Vec::new();
319        let mut leaders: Vec<String> = Vec::new();
320
321        for (node_id, info) in states.iter() {
322            // Skip nodes already at target version
323            if info.current_version == target_version {
324                continue;
325            }
326
327            if info.is_leader {
328                leaders.push(node_id.clone());
329            } else {
330                replicas.push(node_id.clone());
331            }
332        }
333
334        // Sort for deterministic ordering
335        replicas.sort();
336        leaders.sort();
337
338        let node_order = if self.config.replicas_first {
339            replicas.into_iter().chain(leaders).collect()
340        } else {
341            leaders.into_iter().chain(replicas).collect()
342        };
343
344        let plan = UpgradePlan {
345            upgrade_id: generate_upgrade_id(),
346            target_version,
347            node_order,
348            current_index: 0,
349            status: UpgradeStatus::Planning,
350            created_at: current_time_ms(),
351            started_at: None,
352            completed_at: None,
353        };
354
355        // Store the plan
356        {
357            let mut current = self.current_plan.write();
358            *current = Some(plan.clone());
359        }
360
361        info!("Created upgrade plan: {}", plan.upgrade_id);
362        Ok(plan)
363    }
364
365    /// Start executing the upgrade plan
366    pub fn start_upgrade(&self) -> Result<()> {
367        let mut plan = self.current_plan.write();
368        let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
369
370        if p.status != UpgradeStatus::Planning && p.status != UpgradeStatus::Paused {
371            return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
372        }
373
374        p.status = UpgradeStatus::InProgress;
375        p.started_at = Some(current_time_ms());
376        self.paused.store(false, Ordering::SeqCst);
377
378        // Schedule first batch of nodes
379        let mut states = self.node_states.write();
380        let batch_size = self.config.max_concurrent as usize;
381        for node_id in p.node_order.iter().take(batch_size) {
382            if let Some(info) = states.get_mut(node_id) {
383                info.state = NodeUpgradeState::Scheduled;
384                info.target_version = Some(p.target_version.clone());
385            }
386        }
387
388        info!("Started upgrade: {}", p.upgrade_id);
389        Ok(())
390    }
391
392    /// Pause the upgrade
393    pub fn pause_upgrade(&self) -> Result<()> {
394        let mut plan = self.current_plan.write();
395        let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
396
397        if p.status != UpgradeStatus::InProgress {
398            return Err(UpgradeError::NoUpgradeInProgress);
399        }
400
401        p.status = UpgradeStatus::Paused;
402        self.paused.store(true, Ordering::SeqCst);
403        info!("Paused upgrade: {}", p.upgrade_id);
404        Ok(())
405    }
406
407    /// Resume a paused upgrade
408    pub fn resume_upgrade(&self) -> Result<()> {
409        let mut plan = self.current_plan.write();
410        let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
411
412        if p.status != UpgradeStatus::Paused {
413            return Err(UpgradeError::NoUpgradeInProgress);
414        }
415
416        p.status = UpgradeStatus::InProgress;
417        self.paused.store(false, Ordering::SeqCst);
418        info!("Resumed upgrade: {}", p.upgrade_id);
419        Ok(())
420    }
421
422    /// Check if upgrade is paused
423    pub fn is_paused(&self) -> bool {
424        self.paused.load(Ordering::SeqCst)
425    }
426
427    /// Start draining a node (no new requests)
428    pub fn start_drain(&self, node_id: &str) -> Result<()> {
429        let mut states = self.node_states.write();
430        let info = states
431            .get_mut(node_id)
432            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
433
434        if info.state != NodeUpgradeState::Scheduled {
435            return Ok(()); // Already past this state
436        }
437
438        info.state = NodeUpgradeState::Draining;
439        info.started_at = Some(current_time_ms());
440        debug!("Started draining node: {}", node_id);
441        Ok(())
442    }
443
444    /// Mark node drain complete, start upgrade
445    pub fn complete_drain(&self, node_id: &str) -> Result<()> {
446        let mut states = self.node_states.write();
447        let info = states
448            .get_mut(node_id)
449            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
450
451        if info.state != NodeUpgradeState::Draining {
452            return Ok(()); // Not draining
453        }
454
455        info.state = NodeUpgradeState::Upgrading;
456        debug!("Node {} drain complete, starting upgrade", node_id);
457        Ok(())
458    }
459
460    /// Mark node upgrade as complete
461    pub fn complete_node_upgrade(&self, node_id: &str, new_version: Version) -> Result<()> {
462        let mut states = self.node_states.write();
463        let info = states
464            .get_mut(node_id)
465            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
466
467        info.state = NodeUpgradeState::Recovering;
468        info.current_version = new_version;
469        info.health_checks_passed = 0;
470        debug!("Node {} upgrade complete, starting recovery", node_id);
471        Ok(())
472    }
473
474    /// Record a successful health check for a node
475    pub fn record_health_check(&self, node_id: &str, healthy: bool) -> Result<bool> {
476        let mut states = self.node_states.write();
477        let info = states
478            .get_mut(node_id)
479            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
480
481        if info.state != NodeUpgradeState::Recovering {
482            return Ok(false);
483        }
484
485        if healthy {
486            info.health_checks_passed += 1;
487            if info.health_checks_passed >= self.config.required_health_checks {
488                info.state = NodeUpgradeState::Completed;
489                info.completed_at = Some(current_time_ms());
490                debug!("Node {} recovery complete", node_id);
491
492                // Advance the upgrade plan
493                drop(states);
494                self.advance_upgrade()?;
495                return Ok(true);
496            }
497        } else {
498            info.health_checks_passed = 0;
499            if self.config.auto_rollback {
500                info.state = NodeUpgradeState::Failed;
501                info.error = Some("Health check failed".to_string());
502                return Err(UpgradeError::HealthCheckFailed(node_id.to_string()));
503            }
504        }
505
506        Ok(false)
507    }
508
509    /// Advance to the next node in the upgrade
510    fn advance_upgrade(&self) -> Result<()> {
511        let mut plan = self.current_plan.write();
512        let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
513
514        if p.status != UpgradeStatus::InProgress {
515            return Ok(());
516        }
517
518        p.current_index += 1;
519
520        // Check if all nodes are done
521        if p.current_index >= p.node_order.len() {
522            p.status = UpgradeStatus::Completed;
523            p.completed_at = Some(current_time_ms());
524            info!("Upgrade {} completed successfully", p.upgrade_id);
525            return Ok(());
526        }
527
528        // Schedule next batch
529        let mut states = self.node_states.write();
530        let batch_end =
531            (p.current_index + self.config.max_concurrent as usize).min(p.node_order.len());
532        for node_id in p.node_order[p.current_index..batch_end].iter() {
533            if let Some(info) = states.get_mut(node_id) {
534                info.state = NodeUpgradeState::Scheduled;
535                info.target_version = Some(p.target_version.clone());
536            }
537        }
538
539        Ok(())
540    }
541
542    /// Mark a node upgrade as failed
543    pub fn fail_node_upgrade(&self, node_id: &str, error: &str) -> Result<()> {
544        let mut states = self.node_states.write();
545        let info = states
546            .get_mut(node_id)
547            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
548
549        info.state = NodeUpgradeState::Failed;
550        info.error = Some(error.to_string());
551        warn!("Node {} upgrade failed: {}", node_id, error);
552
553        if self.config.auto_rollback {
554            drop(states);
555            self.initiate_rollback()?;
556        }
557
558        Ok(())
559    }
560
561    /// Initiate rollback of the upgrade
562    pub fn initiate_rollback(&self) -> Result<()> {
563        let mut plan = self.current_plan.write();
564        let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
565
566        p.status = UpgradeStatus::RollingBack;
567        warn!("Initiating rollback for upgrade: {}", p.upgrade_id);
568
569        // Mark all upgrading/scheduled nodes for rollback
570        let mut states = self.node_states.write();
571        for (_, info) in states.iter_mut() {
572            if matches!(
573                info.state,
574                NodeUpgradeState::Scheduled
575                    | NodeUpgradeState::Draining
576                    | NodeUpgradeState::Upgrading
577                    | NodeUpgradeState::Recovering
578            ) {
579                info.state = NodeUpgradeState::RollingBack;
580            }
581        }
582
583        Ok(())
584    }
585
586    /// Mark rollback complete for a node
587    pub fn complete_rollback(&self, node_id: &str, previous_version: Version) -> Result<()> {
588        let mut states = self.node_states.write();
589        let info = states
590            .get_mut(node_id)
591            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
592
593        info.state = NodeUpgradeState::Normal;
594        info.current_version = previous_version;
595        info.target_version = None;
596        info.error = None;
597        debug!("Node {} rollback complete", node_id);
598
599        // Check if all rollbacks are complete
600        let all_normal = states.values().all(|i| {
601            matches!(
602                i.state,
603                NodeUpgradeState::Normal | NodeUpgradeState::Completed | NodeUpgradeState::Failed
604            )
605        });
606
607        if all_normal {
608            drop(states);
609            let mut plan = self.current_plan.write();
610            if let Some(ref mut p) = *plan {
611                if p.status == UpgradeStatus::RollingBack {
612                    p.status = UpgradeStatus::Failed;
613                    p.completed_at = Some(current_time_ms());
614                    info!("Rollback completed for upgrade: {}", p.upgrade_id);
615                }
616            }
617        }
618
619        Ok(())
620    }
621
622    /// Get the current upgrade plan
623    pub fn get_plan(&self) -> Option<UpgradePlan> {
624        self.current_plan.read().clone()
625    }
626
627    /// Get upgrade status for a specific node
628    pub fn get_node_status(&self, node_id: &str) -> Option<NodeUpgradeInfo> {
629        self.node_states.read().get(node_id).cloned()
630    }
631
632    /// Get overall upgrade statistics
633    pub fn get_stats(&self) -> UpgradeStats {
634        let states = self.node_states.read();
635        let mut upgraded_nodes = 0u32;
636        let mut failed_nodes = 0u32;
637        let mut pending_nodes = 0u32;
638        let mut currently_upgrading = 0u32;
639        let mut rollback_count = 0u32;
640
641        for info in states.values() {
642            match info.state {
643                NodeUpgradeState::Completed => upgraded_nodes += 1,
644                NodeUpgradeState::Failed => failed_nodes += 1,
645                NodeUpgradeState::Scheduled | NodeUpgradeState::Normal => pending_nodes += 1,
646                NodeUpgradeState::Draining
647                | NodeUpgradeState::Upgrading
648                | NodeUpgradeState::Recovering => {
649                    currently_upgrading += 1;
650                }
651                NodeUpgradeState::RollingBack => rollback_count += 1,
652            }
653        }
654
655        UpgradeStats {
656            total_nodes: states.len() as u32,
657            upgraded_nodes,
658            failed_nodes,
659            pending_nodes,
660            currently_upgrading,
661            rollback_count,
662        }
663    }
664
665    /// Check if it's safe to proceed with upgrade
666    pub fn can_proceed(&self) -> Result<bool> {
667        if self.paused.load(Ordering::SeqCst) {
668            return Err(UpgradeError::UpgradePaused);
669        }
670
671        let states = self.node_states.read();
672        let healthy_count = states
673            .values()
674            .filter(|i| {
675                matches!(
676                    i.state,
677                    NodeUpgradeState::Normal | NodeUpgradeState::Completed
678                )
679            })
680            .count() as u32;
681
682        if healthy_count < self.config.min_healthy_nodes {
683            return Err(UpgradeError::NotEnoughHealthyNodes {
684                have: healthy_count,
685                need: self.config.min_healthy_nodes,
686            });
687        }
688
689        Ok(true)
690    }
691
692    /// Get nodes that need to start draining
693    pub fn get_nodes_to_drain(&self) -> Vec<String> {
694        self.node_states
695            .read()
696            .iter()
697            .filter(|(_, i)| i.state == NodeUpgradeState::Scheduled)
698            .map(|(id, _)| id.clone())
699            .collect()
700    }
701
702    /// Get nodes currently draining
703    pub fn get_draining_nodes(&self) -> Vec<String> {
704        self.node_states
705            .read()
706            .iter()
707            .filter(|(_, i)| i.state == NodeUpgradeState::Draining)
708            .map(|(id, _)| id.clone())
709            .collect()
710    }
711
712    /// Check drain timeout for a node
713    pub fn check_drain_timeout(&self, node_id: &str) -> Result<bool> {
714        let states = self.node_states.read();
715        let info = states
716            .get(node_id)
717            .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
718
719        if info.state != NodeUpgradeState::Draining {
720            return Ok(false);
721        }
722
723        if let Some(started) = info.started_at {
724            let elapsed = current_time_ms() - started;
725            if elapsed > self.config.drain_timeout_ms {
726                return Err(UpgradeError::DrainTimeout(node_id.to_string()));
727            }
728        }
729
730        Ok(false)
731    }
732}
733
734/// Get current time in milliseconds
735fn current_time_ms() -> u64 {
736    std::time::SystemTime::now()
737        .duration_since(std::time::UNIX_EPOCH)
738        .unwrap_or_default()
739        .as_millis() as u64
740}
741
742/// Generate a unique upgrade ID
743fn generate_upgrade_id() -> String {
744    use std::time::{SystemTime, UNIX_EPOCH};
745    let timestamp = SystemTime::now()
746        .duration_since(UNIX_EPOCH)
747        .unwrap_or_default()
748        .as_millis();
749    format!("upgrade-{}", timestamp)
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755
756    #[test]
757    fn test_version_parsing() {
758        let v = Version::parse("1.2.3").unwrap();
759        assert_eq!(v.major, 1);
760        assert_eq!(v.minor, 2);
761        assert_eq!(v.patch, 3);
762        assert_eq!(v.to_string(), "1.2.3");
763    }
764
765    #[test]
766    fn test_version_with_build() {
767        let v = Version::new(1, 2, 3).with_build("abc123");
768        assert_eq!(v.to_string(), "1.2.3+abc123");
769    }
770
771    #[test]
772    fn test_version_compatibility() {
773        let v1 = Version::new(1, 0, 0);
774        let v2 = Version::new(1, 1, 0);
775        let v3 = Version::new(2, 0, 0);
776
777        assert!(v1.is_compatible_with(&v2));
778        assert!(v2.is_compatible_with(&v1));
779        assert!(!v1.is_compatible_with(&v3));
780    }
781
782    #[test]
783    fn test_version_comparison() {
784        let v1 = Version::new(1, 0, 0);
785        let v2 = Version::new(1, 1, 0);
786        let v3 = Version::new(1, 1, 1);
787        let v4 = Version::new(2, 0, 0);
788
789        assert!(v2.is_newer_than(&v1));
790        assert!(v3.is_newer_than(&v2));
791        assert!(v4.is_newer_than(&v3));
792        assert!(!v1.is_newer_than(&v2));
793    }
794
795    #[test]
796    fn test_upgrade_config_defaults() {
797        let config = UpgradeConfig::default();
798        assert_eq!(config.drain_timeout_ms, 60000);
799        assert_eq!(config.health_check_interval_ms, 5000);
800        assert_eq!(config.required_health_checks, 3);
801        assert!(config.replicas_first);
802        assert_eq!(config.max_concurrent, 1);
803        assert!(config.auto_rollback);
804    }
805
806    #[test]
807    fn test_register_and_unregister_node() {
808        let manager = UpgradeManager::new(UpgradeConfig::default());
809
810        manager.register_node("node1", Version::new(1, 0, 0), false);
811        manager.register_node("node2", Version::new(1, 0, 0), true);
812
813        assert!(manager.get_node_status("node1").is_some());
814        assert!(manager.get_node_status("node2").is_some());
815
816        manager.unregister_node("node1");
817        assert!(manager.get_node_status("node1").is_none());
818        assert!(manager.get_node_status("node2").is_some());
819    }
820
821    #[test]
822    fn test_plan_upgrade() {
823        let manager = UpgradeManager::new(UpgradeConfig::default());
824
825        manager.register_node("replica1", Version::new(1, 0, 0), false);
826        manager.register_node("replica2", Version::new(1, 0, 0), false);
827        manager.register_node("leader1", Version::new(1, 0, 0), true);
828
829        let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
830
831        assert_eq!(plan.target_version, Version::new(1, 1, 0));
832        assert_eq!(plan.node_order.len(), 3);
833        // Replicas should come before leaders
834        assert!(!plan.node_order[0].contains("leader"));
835        assert!(!plan.node_order[1].contains("leader"));
836        assert!(plan.node_order[2].contains("leader"));
837    }
838
839    #[test]
840    fn test_incompatible_version_rejected() {
841        let manager = UpgradeManager::new(UpgradeConfig::default());
842        manager.register_node("node1", Version::new(1, 0, 0), false);
843
844        let result = manager.plan_upgrade(Version::new(2, 0, 0));
845        assert!(matches!(
846            result,
847            Err(UpgradeError::IncompatibleVersion { .. })
848        ));
849    }
850
851    #[test]
852    fn test_upgrade_lifecycle() {
853        let manager = UpgradeManager::new(UpgradeConfig::default());
854
855        manager.register_node("node1", Version::new(1, 0, 0), false);
856
857        // Plan
858        let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
859        assert_eq!(plan.status, UpgradeStatus::Planning);
860
861        // Start
862        manager.start_upgrade().unwrap();
863        let plan = manager.get_plan().unwrap();
864        assert_eq!(plan.status, UpgradeStatus::InProgress);
865
866        // Node should be scheduled
867        let info = manager.get_node_status("node1").unwrap();
868        assert_eq!(info.state, NodeUpgradeState::Scheduled);
869    }
870
871    #[test]
872    fn test_drain_complete_upgrade_cycle() {
873        let config = UpgradeConfig {
874            required_health_checks: 1,
875            ..Default::default()
876        };
877        let manager = UpgradeManager::new(config);
878
879        manager.register_node("node1", Version::new(1, 0, 0), false);
880        manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
881        manager.start_upgrade().unwrap();
882
883        // Drain
884        manager.start_drain("node1").unwrap();
885        assert_eq!(
886            manager.get_node_status("node1").unwrap().state,
887            NodeUpgradeState::Draining
888        );
889
890        // Complete drain
891        manager.complete_drain("node1").unwrap();
892        assert_eq!(
893            manager.get_node_status("node1").unwrap().state,
894            NodeUpgradeState::Upgrading
895        );
896
897        // Complete upgrade
898        manager
899            .complete_node_upgrade("node1", Version::new(1, 1, 0))
900            .unwrap();
901        assert_eq!(
902            manager.get_node_status("node1").unwrap().state,
903            NodeUpgradeState::Recovering
904        );
905
906        // Health check
907        manager.record_health_check("node1", true).unwrap();
908        assert_eq!(
909            manager.get_node_status("node1").unwrap().state,
910            NodeUpgradeState::Completed
911        );
912
913        // Plan should be completed
914        let plan = manager.get_plan().unwrap();
915        assert_eq!(plan.status, UpgradeStatus::Completed);
916    }
917
918    #[test]
919    fn test_pause_and_resume() {
920        let manager = UpgradeManager::new(UpgradeConfig::default());
921        manager.register_node("node1", Version::new(1, 0, 0), false);
922        manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
923        manager.start_upgrade().unwrap();
924
925        // Pause
926        manager.pause_upgrade().unwrap();
927        assert!(manager.is_paused());
928        assert_eq!(manager.get_plan().unwrap().status, UpgradeStatus::Paused);
929
930        // Resume
931        manager.resume_upgrade().unwrap();
932        assert!(!manager.is_paused());
933        assert_eq!(
934            manager.get_plan().unwrap().status,
935            UpgradeStatus::InProgress
936        );
937    }
938
939    #[test]
940    fn test_upgrade_stats() {
941        let manager = UpgradeManager::new(UpgradeConfig::default());
942
943        manager.register_node("node1", Version::new(1, 0, 0), false);
944        manager.register_node("node2", Version::new(1, 0, 0), false);
945        manager.register_node("node3", Version::new(1, 0, 0), true);
946
947        let stats = manager.get_stats();
948        assert_eq!(stats.total_nodes, 3);
949        assert_eq!(stats.pending_nodes, 3);
950        assert_eq!(stats.upgraded_nodes, 0);
951    }
952
953    #[test]
954    fn test_rollback() {
955        let manager = UpgradeManager::new(UpgradeConfig::default());
956        manager.register_node("node1", Version::new(1, 0, 0), false);
957        manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
958        manager.start_upgrade().unwrap();
959        manager.start_drain("node1").unwrap();
960
961        // Initiate rollback
962        manager.initiate_rollback().unwrap();
963
964        let info = manager.get_node_status("node1").unwrap();
965        assert_eq!(info.state, NodeUpgradeState::RollingBack);
966
967        // Complete rollback
968        manager
969            .complete_rollback("node1", Version::new(1, 0, 0))
970            .unwrap();
971
972        let info = manager.get_node_status("node1").unwrap();
973        assert_eq!(info.state, NodeUpgradeState::Normal);
974        assert_eq!(info.current_version, Version::new(1, 0, 0));
975    }
976
977    #[test]
978    fn test_skip_already_upgraded_nodes() {
979        let manager = UpgradeManager::new(UpgradeConfig::default());
980
981        // One node already at target version
982        manager.register_node("node1", Version::new(1, 1, 0), false);
983        manager.register_node("node2", Version::new(1, 0, 0), false);
984
985        let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
986
987        // Only node2 should be in the upgrade plan
988        assert_eq!(plan.node_order.len(), 1);
989        assert_eq!(plan.node_order[0], "node2");
990    }
991
992    #[test]
993    fn test_update_leader_status() {
994        let manager = UpgradeManager::new(UpgradeConfig::default());
995        manager.register_node("node1", Version::new(1, 0, 0), false);
996
997        assert!(!manager.get_node_status("node1").unwrap().is_leader);
998
999        manager.update_leader_status("node1", true);
1000        assert!(manager.get_node_status("node1").unwrap().is_leader);
1001    }
1002}