1use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use thiserror::Error;
16use tracing::{debug, info, warn};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct Version {
21 pub major: u32,
22 pub minor: u32,
23 pub patch: u32,
24 pub build: Option<String>,
26}
27
28impl Version {
29 pub fn new(major: u32, minor: u32, patch: u32) -> Self {
30 Self {
31 major,
32 minor,
33 patch,
34 build: None,
35 }
36 }
37
38 pub fn with_build(mut self, build: impl Into<String>) -> Self {
39 self.build = Some(build.into());
40 self
41 }
42
43 pub fn is_compatible_with(&self, other: &Version) -> bool {
45 self.major == other.major
48 }
49
50 pub fn is_newer_than(&self, other: &Version) -> bool {
52 if self.major != other.major {
53 return self.major > other.major;
54 }
55 if self.minor != other.minor {
56 return self.minor > other.minor;
57 }
58 self.patch > other.patch
59 }
60
61 pub fn parse(s: &str) -> Option<Self> {
63 let parts: Vec<&str> = s.split('.').collect();
64 if parts.len() < 3 {
65 return None;
66 }
67 Some(Self {
68 major: parts[0].parse().ok()?,
69 minor: parts[1].parse().ok()?,
70 patch: parts[2].parse().ok()?,
71 build: None,
72 })
73 }
74}
75
76impl std::fmt::Display for Version {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(f, "{}.{}.{}", self.major, self.minor, self.patch)?;
79 if let Some(ref build) = self.build {
80 write!(f, "+{}", build)?;
81 }
82 Ok(())
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct UpgradeConfig {
89 pub drain_timeout_ms: u64,
91 pub health_check_interval_ms: u64,
93 pub required_health_checks: u32,
95 pub replicas_first: bool,
97 pub max_concurrent: u32,
99 pub auto_rollback: bool,
101 pub min_healthy_nodes: u32,
103}
104
105impl Default for UpgradeConfig {
106 fn default() -> Self {
107 Self {
108 drain_timeout_ms: 60000, health_check_interval_ms: 5000, required_health_checks: 3, replicas_first: true, max_concurrent: 1, auto_rollback: true, min_healthy_nodes: 1, }
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121pub enum NodeUpgradeState {
122 Normal,
124 Scheduled,
126 Draining,
128 Upgrading,
130 Recovering,
132 Completed,
134 Failed,
136 RollingBack,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct NodeUpgradeInfo {
143 pub node_id: String,
144 pub state: NodeUpgradeState,
145 pub current_version: Version,
146 pub target_version: Option<Version>,
147 pub started_at: Option<u64>,
148 pub completed_at: Option<u64>,
149 pub error: Option<String>,
150 pub health_checks_passed: u32,
151 pub is_leader: bool,
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
156pub enum UpgradeStatus {
157 Idle,
159 Planning,
161 InProgress,
163 Paused,
165 Completed,
167 Failed,
169 RollingBack,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct UpgradePlan {
176 pub upgrade_id: String,
178 pub target_version: Version,
180 pub node_order: Vec<String>,
182 pub current_index: usize,
184 pub status: UpgradeStatus,
186 pub created_at: u64,
188 pub started_at: Option<u64>,
190 pub completed_at: Option<u64>,
192}
193
194#[derive(Debug, Clone, Default, Serialize, Deserialize)]
196pub struct UpgradeStats {
197 pub total_nodes: u32,
198 pub upgraded_nodes: u32,
199 pub failed_nodes: u32,
200 pub pending_nodes: u32,
201 pub currently_upgrading: u32,
202 pub rollback_count: u32,
203}
204
205#[derive(Debug, Error)]
207pub enum UpgradeError {
208 #[error("Upgrade already in progress: {0}")]
209 AlreadyInProgress(String),
210
211 #[error("Incompatible version: {current} cannot upgrade to {target}")]
212 IncompatibleVersion { current: String, target: String },
213
214 #[error("Node not found: {0}")]
215 NodeNotFound(String),
216
217 #[error("Drain timeout for node: {0}")]
218 DrainTimeout(String),
219
220 #[error("Health check failed for node: {0}")]
221 HealthCheckFailed(String),
222
223 #[error("Not enough healthy nodes: have {have}, need {need}")]
224 NotEnoughHealthyNodes { have: u32, need: u32 },
225
226 #[error("Upgrade failed: {0}")]
227 UpgradeFailed(String),
228
229 #[error("Rollback failed: {0}")]
230 RollbackFailed(String),
231
232 #[error("No upgrade in progress")]
233 NoUpgradeInProgress,
234
235 #[error("Upgrade paused")]
236 UpgradePaused,
237}
238
239pub type Result<T> = std::result::Result<T, UpgradeError>;
240
241pub struct UpgradeManager {
243 config: UpgradeConfig,
244 current_plan: Arc<RwLock<Option<UpgradePlan>>>,
245 node_states: Arc<RwLock<HashMap<String, NodeUpgradeInfo>>>,
246 paused: AtomicBool,
247}
248
249impl UpgradeManager {
250 pub fn new(config: UpgradeConfig) -> Self {
252 Self {
253 config,
254 current_plan: Arc::new(RwLock::new(None)),
255 node_states: Arc::new(RwLock::new(HashMap::new())),
256 paused: AtomicBool::new(false),
257 }
258 }
259
260 pub fn register_node(&self, node_id: &str, version: Version, is_leader: bool) {
262 let mut states = self.node_states.write();
263 states.insert(
264 node_id.to_string(),
265 NodeUpgradeInfo {
266 node_id: node_id.to_string(),
267 state: NodeUpgradeState::Normal,
268 current_version: version,
269 target_version: None,
270 started_at: None,
271 completed_at: None,
272 error: None,
273 health_checks_passed: 0,
274 is_leader,
275 },
276 );
277 }
278
279 pub fn unregister_node(&self, node_id: &str) {
281 let mut states = self.node_states.write();
282 states.remove(node_id);
283 }
284
285 pub fn update_leader_status(&self, node_id: &str, is_leader: bool) {
287 let mut states = self.node_states.write();
288 if let Some(info) = states.get_mut(node_id) {
289 info.is_leader = is_leader;
290 }
291 }
292
293 pub fn plan_upgrade(&self, target_version: Version) -> Result<UpgradePlan> {
295 {
297 let plan = self.current_plan.read();
298 if let Some(ref p) = *plan {
299 if p.status == UpgradeStatus::InProgress {
300 return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
301 }
302 }
303 }
304
305 let states = self.node_states.read();
306
307 for (_node_id, info) in states.iter() {
309 if !info.current_version.is_compatible_with(&target_version) {
310 return Err(UpgradeError::IncompatibleVersion {
311 current: info.current_version.to_string(),
312 target: target_version.to_string(),
313 });
314 }
315 }
316
317 let mut replicas: Vec<String> = Vec::new();
319 let mut leaders: Vec<String> = Vec::new();
320
321 for (node_id, info) in states.iter() {
322 if info.current_version == target_version {
324 continue;
325 }
326
327 if info.is_leader {
328 leaders.push(node_id.clone());
329 } else {
330 replicas.push(node_id.clone());
331 }
332 }
333
334 replicas.sort();
336 leaders.sort();
337
338 let node_order = if self.config.replicas_first {
339 replicas.into_iter().chain(leaders).collect()
340 } else {
341 leaders.into_iter().chain(replicas).collect()
342 };
343
344 let plan = UpgradePlan {
345 upgrade_id: generate_upgrade_id(),
346 target_version,
347 node_order,
348 current_index: 0,
349 status: UpgradeStatus::Planning,
350 created_at: current_time_ms(),
351 started_at: None,
352 completed_at: None,
353 };
354
355 {
357 let mut current = self.current_plan.write();
358 *current = Some(plan.clone());
359 }
360
361 info!("Created upgrade plan: {}", plan.upgrade_id);
362 Ok(plan)
363 }
364
365 pub fn start_upgrade(&self) -> Result<()> {
367 let mut plan = self.current_plan.write();
368 let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
369
370 if p.status != UpgradeStatus::Planning && p.status != UpgradeStatus::Paused {
371 return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
372 }
373
374 p.status = UpgradeStatus::InProgress;
375 p.started_at = Some(current_time_ms());
376 self.paused.store(false, Ordering::SeqCst);
377
378 let mut states = self.node_states.write();
380 let batch_size = self.config.max_concurrent as usize;
381 for node_id in p.node_order.iter().take(batch_size) {
382 if let Some(info) = states.get_mut(node_id) {
383 info.state = NodeUpgradeState::Scheduled;
384 info.target_version = Some(p.target_version.clone());
385 }
386 }
387
388 info!("Started upgrade: {}", p.upgrade_id);
389 Ok(())
390 }
391
392 pub fn pause_upgrade(&self) -> Result<()> {
394 let mut plan = self.current_plan.write();
395 let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
396
397 if p.status != UpgradeStatus::InProgress {
398 return Err(UpgradeError::NoUpgradeInProgress);
399 }
400
401 p.status = UpgradeStatus::Paused;
402 self.paused.store(true, Ordering::SeqCst);
403 info!("Paused upgrade: {}", p.upgrade_id);
404 Ok(())
405 }
406
407 pub fn resume_upgrade(&self) -> Result<()> {
409 let mut plan = self.current_plan.write();
410 let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
411
412 if p.status != UpgradeStatus::Paused {
413 return Err(UpgradeError::NoUpgradeInProgress);
414 }
415
416 p.status = UpgradeStatus::InProgress;
417 self.paused.store(false, Ordering::SeqCst);
418 info!("Resumed upgrade: {}", p.upgrade_id);
419 Ok(())
420 }
421
422 pub fn is_paused(&self) -> bool {
424 self.paused.load(Ordering::SeqCst)
425 }
426
427 pub fn start_drain(&self, node_id: &str) -> Result<()> {
429 let mut states = self.node_states.write();
430 let info = states
431 .get_mut(node_id)
432 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
433
434 if info.state != NodeUpgradeState::Scheduled {
435 return Ok(()); }
437
438 info.state = NodeUpgradeState::Draining;
439 info.started_at = Some(current_time_ms());
440 debug!("Started draining node: {}", node_id);
441 Ok(())
442 }
443
444 pub fn complete_drain(&self, node_id: &str) -> Result<()> {
446 let mut states = self.node_states.write();
447 let info = states
448 .get_mut(node_id)
449 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
450
451 if info.state != NodeUpgradeState::Draining {
452 return Ok(()); }
454
455 info.state = NodeUpgradeState::Upgrading;
456 debug!("Node {} drain complete, starting upgrade", node_id);
457 Ok(())
458 }
459
460 pub fn complete_node_upgrade(&self, node_id: &str, new_version: Version) -> Result<()> {
462 let mut states = self.node_states.write();
463 let info = states
464 .get_mut(node_id)
465 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
466
467 info.state = NodeUpgradeState::Recovering;
468 info.current_version = new_version;
469 info.health_checks_passed = 0;
470 debug!("Node {} upgrade complete, starting recovery", node_id);
471 Ok(())
472 }
473
474 pub fn record_health_check(&self, node_id: &str, healthy: bool) -> Result<bool> {
476 let mut states = self.node_states.write();
477 let info = states
478 .get_mut(node_id)
479 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
480
481 if info.state != NodeUpgradeState::Recovering {
482 return Ok(false);
483 }
484
485 if healthy {
486 info.health_checks_passed += 1;
487 if info.health_checks_passed >= self.config.required_health_checks {
488 info.state = NodeUpgradeState::Completed;
489 info.completed_at = Some(current_time_ms());
490 debug!("Node {} recovery complete", node_id);
491
492 drop(states);
494 self.advance_upgrade()?;
495 return Ok(true);
496 }
497 } else {
498 info.health_checks_passed = 0;
499 if self.config.auto_rollback {
500 info.state = NodeUpgradeState::Failed;
501 info.error = Some("Health check failed".to_string());
502 return Err(UpgradeError::HealthCheckFailed(node_id.to_string()));
503 }
504 }
505
506 Ok(false)
507 }
508
509 fn advance_upgrade(&self) -> Result<()> {
511 let mut plan = self.current_plan.write();
512 let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
513
514 if p.status != UpgradeStatus::InProgress {
515 return Ok(());
516 }
517
518 p.current_index += 1;
519
520 if p.current_index >= p.node_order.len() {
522 p.status = UpgradeStatus::Completed;
523 p.completed_at = Some(current_time_ms());
524 info!("Upgrade {} completed successfully", p.upgrade_id);
525 return Ok(());
526 }
527
528 let mut states = self.node_states.write();
530 let batch_end =
531 (p.current_index + self.config.max_concurrent as usize).min(p.node_order.len());
532 for node_id in p.node_order[p.current_index..batch_end].iter() {
533 if let Some(info) = states.get_mut(node_id) {
534 info.state = NodeUpgradeState::Scheduled;
535 info.target_version = Some(p.target_version.clone());
536 }
537 }
538
539 Ok(())
540 }
541
542 pub fn fail_node_upgrade(&self, node_id: &str, error: &str) -> Result<()> {
544 let mut states = self.node_states.write();
545 let info = states
546 .get_mut(node_id)
547 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
548
549 info.state = NodeUpgradeState::Failed;
550 info.error = Some(error.to_string());
551 warn!("Node {} upgrade failed: {}", node_id, error);
552
553 if self.config.auto_rollback {
554 drop(states);
555 self.initiate_rollback()?;
556 }
557
558 Ok(())
559 }
560
561 pub fn initiate_rollback(&self) -> Result<()> {
563 let mut plan = self.current_plan.write();
564 let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
565
566 p.status = UpgradeStatus::RollingBack;
567 warn!("Initiating rollback for upgrade: {}", p.upgrade_id);
568
569 let mut states = self.node_states.write();
571 for (_, info) in states.iter_mut() {
572 if matches!(
573 info.state,
574 NodeUpgradeState::Scheduled
575 | NodeUpgradeState::Draining
576 | NodeUpgradeState::Upgrading
577 | NodeUpgradeState::Recovering
578 ) {
579 info.state = NodeUpgradeState::RollingBack;
580 }
581 }
582
583 Ok(())
584 }
585
586 pub fn complete_rollback(&self, node_id: &str, previous_version: Version) -> Result<()> {
588 let mut states = self.node_states.write();
589 let info = states
590 .get_mut(node_id)
591 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
592
593 info.state = NodeUpgradeState::Normal;
594 info.current_version = previous_version;
595 info.target_version = None;
596 info.error = None;
597 debug!("Node {} rollback complete", node_id);
598
599 let all_normal = states.values().all(|i| {
601 matches!(
602 i.state,
603 NodeUpgradeState::Normal | NodeUpgradeState::Completed | NodeUpgradeState::Failed
604 )
605 });
606
607 if all_normal {
608 drop(states);
609 let mut plan = self.current_plan.write();
610 if let Some(ref mut p) = *plan {
611 if p.status == UpgradeStatus::RollingBack {
612 p.status = UpgradeStatus::Failed;
613 p.completed_at = Some(current_time_ms());
614 info!("Rollback completed for upgrade: {}", p.upgrade_id);
615 }
616 }
617 }
618
619 Ok(())
620 }
621
622 pub fn get_plan(&self) -> Option<UpgradePlan> {
624 self.current_plan.read().clone()
625 }
626
627 pub fn get_node_status(&self, node_id: &str) -> Option<NodeUpgradeInfo> {
629 self.node_states.read().get(node_id).cloned()
630 }
631
632 pub fn get_stats(&self) -> UpgradeStats {
634 let states = self.node_states.read();
635 let mut upgraded_nodes = 0u32;
636 let mut failed_nodes = 0u32;
637 let mut pending_nodes = 0u32;
638 let mut currently_upgrading = 0u32;
639 let mut rollback_count = 0u32;
640
641 for info in states.values() {
642 match info.state {
643 NodeUpgradeState::Completed => upgraded_nodes += 1,
644 NodeUpgradeState::Failed => failed_nodes += 1,
645 NodeUpgradeState::Scheduled | NodeUpgradeState::Normal => pending_nodes += 1,
646 NodeUpgradeState::Draining
647 | NodeUpgradeState::Upgrading
648 | NodeUpgradeState::Recovering => {
649 currently_upgrading += 1;
650 }
651 NodeUpgradeState::RollingBack => rollback_count += 1,
652 }
653 }
654
655 UpgradeStats {
656 total_nodes: states.len() as u32,
657 upgraded_nodes,
658 failed_nodes,
659 pending_nodes,
660 currently_upgrading,
661 rollback_count,
662 }
663 }
664
665 pub fn can_proceed(&self) -> Result<bool> {
667 if self.paused.load(Ordering::SeqCst) {
668 return Err(UpgradeError::UpgradePaused);
669 }
670
671 let states = self.node_states.read();
672 let healthy_count = states
673 .values()
674 .filter(|i| {
675 matches!(
676 i.state,
677 NodeUpgradeState::Normal | NodeUpgradeState::Completed
678 )
679 })
680 .count() as u32;
681
682 if healthy_count < self.config.min_healthy_nodes {
683 return Err(UpgradeError::NotEnoughHealthyNodes {
684 have: healthy_count,
685 need: self.config.min_healthy_nodes,
686 });
687 }
688
689 Ok(true)
690 }
691
692 pub fn get_nodes_to_drain(&self) -> Vec<String> {
694 self.node_states
695 .read()
696 .iter()
697 .filter(|(_, i)| i.state == NodeUpgradeState::Scheduled)
698 .map(|(id, _)| id.clone())
699 .collect()
700 }
701
702 pub fn get_draining_nodes(&self) -> Vec<String> {
704 self.node_states
705 .read()
706 .iter()
707 .filter(|(_, i)| i.state == NodeUpgradeState::Draining)
708 .map(|(id, _)| id.clone())
709 .collect()
710 }
711
712 pub fn check_drain_timeout(&self, node_id: &str) -> Result<bool> {
714 let states = self.node_states.read();
715 let info = states
716 .get(node_id)
717 .ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
718
719 if info.state != NodeUpgradeState::Draining {
720 return Ok(false);
721 }
722
723 if let Some(started) = info.started_at {
724 let elapsed = current_time_ms() - started;
725 if elapsed > self.config.drain_timeout_ms {
726 return Err(UpgradeError::DrainTimeout(node_id.to_string()));
727 }
728 }
729
730 Ok(false)
731 }
732}
733
734fn current_time_ms() -> u64 {
736 std::time::SystemTime::now()
737 .duration_since(std::time::UNIX_EPOCH)
738 .unwrap_or_default()
739 .as_millis() as u64
740}
741
742fn generate_upgrade_id() -> String {
744 use std::time::{SystemTime, UNIX_EPOCH};
745 let timestamp = SystemTime::now()
746 .duration_since(UNIX_EPOCH)
747 .unwrap_or_default()
748 .as_millis();
749 format!("upgrade-{}", timestamp)
750}
751
752#[cfg(test)]
753mod tests {
754 use super::*;
755
756 #[test]
757 fn test_version_parsing() {
758 let v = Version::parse("1.2.3").unwrap();
759 assert_eq!(v.major, 1);
760 assert_eq!(v.minor, 2);
761 assert_eq!(v.patch, 3);
762 assert_eq!(v.to_string(), "1.2.3");
763 }
764
765 #[test]
766 fn test_version_with_build() {
767 let v = Version::new(1, 2, 3).with_build("abc123");
768 assert_eq!(v.to_string(), "1.2.3+abc123");
769 }
770
771 #[test]
772 fn test_version_compatibility() {
773 let v1 = Version::new(1, 0, 0);
774 let v2 = Version::new(1, 1, 0);
775 let v3 = Version::new(2, 0, 0);
776
777 assert!(v1.is_compatible_with(&v2));
778 assert!(v2.is_compatible_with(&v1));
779 assert!(!v1.is_compatible_with(&v3));
780 }
781
782 #[test]
783 fn test_version_comparison() {
784 let v1 = Version::new(1, 0, 0);
785 let v2 = Version::new(1, 1, 0);
786 let v3 = Version::new(1, 1, 1);
787 let v4 = Version::new(2, 0, 0);
788
789 assert!(v2.is_newer_than(&v1));
790 assert!(v3.is_newer_than(&v2));
791 assert!(v4.is_newer_than(&v3));
792 assert!(!v1.is_newer_than(&v2));
793 }
794
795 #[test]
796 fn test_upgrade_config_defaults() {
797 let config = UpgradeConfig::default();
798 assert_eq!(config.drain_timeout_ms, 60000);
799 assert_eq!(config.health_check_interval_ms, 5000);
800 assert_eq!(config.required_health_checks, 3);
801 assert!(config.replicas_first);
802 assert_eq!(config.max_concurrent, 1);
803 assert!(config.auto_rollback);
804 }
805
806 #[test]
807 fn test_register_and_unregister_node() {
808 let manager = UpgradeManager::new(UpgradeConfig::default());
809
810 manager.register_node("node1", Version::new(1, 0, 0), false);
811 manager.register_node("node2", Version::new(1, 0, 0), true);
812
813 assert!(manager.get_node_status("node1").is_some());
814 assert!(manager.get_node_status("node2").is_some());
815
816 manager.unregister_node("node1");
817 assert!(manager.get_node_status("node1").is_none());
818 assert!(manager.get_node_status("node2").is_some());
819 }
820
821 #[test]
822 fn test_plan_upgrade() {
823 let manager = UpgradeManager::new(UpgradeConfig::default());
824
825 manager.register_node("replica1", Version::new(1, 0, 0), false);
826 manager.register_node("replica2", Version::new(1, 0, 0), false);
827 manager.register_node("leader1", Version::new(1, 0, 0), true);
828
829 let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
830
831 assert_eq!(plan.target_version, Version::new(1, 1, 0));
832 assert_eq!(plan.node_order.len(), 3);
833 assert!(!plan.node_order[0].contains("leader"));
835 assert!(!plan.node_order[1].contains("leader"));
836 assert!(plan.node_order[2].contains("leader"));
837 }
838
839 #[test]
840 fn test_incompatible_version_rejected() {
841 let manager = UpgradeManager::new(UpgradeConfig::default());
842 manager.register_node("node1", Version::new(1, 0, 0), false);
843
844 let result = manager.plan_upgrade(Version::new(2, 0, 0));
845 assert!(matches!(
846 result,
847 Err(UpgradeError::IncompatibleVersion { .. })
848 ));
849 }
850
851 #[test]
852 fn test_upgrade_lifecycle() {
853 let manager = UpgradeManager::new(UpgradeConfig::default());
854
855 manager.register_node("node1", Version::new(1, 0, 0), false);
856
857 let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
859 assert_eq!(plan.status, UpgradeStatus::Planning);
860
861 manager.start_upgrade().unwrap();
863 let plan = manager.get_plan().unwrap();
864 assert_eq!(plan.status, UpgradeStatus::InProgress);
865
866 let info = manager.get_node_status("node1").unwrap();
868 assert_eq!(info.state, NodeUpgradeState::Scheduled);
869 }
870
871 #[test]
872 fn test_drain_complete_upgrade_cycle() {
873 let config = UpgradeConfig {
874 required_health_checks: 1,
875 ..Default::default()
876 };
877 let manager = UpgradeManager::new(config);
878
879 manager.register_node("node1", Version::new(1, 0, 0), false);
880 manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
881 manager.start_upgrade().unwrap();
882
883 manager.start_drain("node1").unwrap();
885 assert_eq!(
886 manager.get_node_status("node1").unwrap().state,
887 NodeUpgradeState::Draining
888 );
889
890 manager.complete_drain("node1").unwrap();
892 assert_eq!(
893 manager.get_node_status("node1").unwrap().state,
894 NodeUpgradeState::Upgrading
895 );
896
897 manager
899 .complete_node_upgrade("node1", Version::new(1, 1, 0))
900 .unwrap();
901 assert_eq!(
902 manager.get_node_status("node1").unwrap().state,
903 NodeUpgradeState::Recovering
904 );
905
906 manager.record_health_check("node1", true).unwrap();
908 assert_eq!(
909 manager.get_node_status("node1").unwrap().state,
910 NodeUpgradeState::Completed
911 );
912
913 let plan = manager.get_plan().unwrap();
915 assert_eq!(plan.status, UpgradeStatus::Completed);
916 }
917
918 #[test]
919 fn test_pause_and_resume() {
920 let manager = UpgradeManager::new(UpgradeConfig::default());
921 manager.register_node("node1", Version::new(1, 0, 0), false);
922 manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
923 manager.start_upgrade().unwrap();
924
925 manager.pause_upgrade().unwrap();
927 assert!(manager.is_paused());
928 assert_eq!(manager.get_plan().unwrap().status, UpgradeStatus::Paused);
929
930 manager.resume_upgrade().unwrap();
932 assert!(!manager.is_paused());
933 assert_eq!(
934 manager.get_plan().unwrap().status,
935 UpgradeStatus::InProgress
936 );
937 }
938
939 #[test]
940 fn test_upgrade_stats() {
941 let manager = UpgradeManager::new(UpgradeConfig::default());
942
943 manager.register_node("node1", Version::new(1, 0, 0), false);
944 manager.register_node("node2", Version::new(1, 0, 0), false);
945 manager.register_node("node3", Version::new(1, 0, 0), true);
946
947 let stats = manager.get_stats();
948 assert_eq!(stats.total_nodes, 3);
949 assert_eq!(stats.pending_nodes, 3);
950 assert_eq!(stats.upgraded_nodes, 0);
951 }
952
953 #[test]
954 fn test_rollback() {
955 let manager = UpgradeManager::new(UpgradeConfig::default());
956 manager.register_node("node1", Version::new(1, 0, 0), false);
957 manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
958 manager.start_upgrade().unwrap();
959 manager.start_drain("node1").unwrap();
960
961 manager.initiate_rollback().unwrap();
963
964 let info = manager.get_node_status("node1").unwrap();
965 assert_eq!(info.state, NodeUpgradeState::RollingBack);
966
967 manager
969 .complete_rollback("node1", Version::new(1, 0, 0))
970 .unwrap();
971
972 let info = manager.get_node_status("node1").unwrap();
973 assert_eq!(info.state, NodeUpgradeState::Normal);
974 assert_eq!(info.current_version, Version::new(1, 0, 0));
975 }
976
977 #[test]
978 fn test_skip_already_upgraded_nodes() {
979 let manager = UpgradeManager::new(UpgradeConfig::default());
980
981 manager.register_node("node1", Version::new(1, 1, 0), false);
983 manager.register_node("node2", Version::new(1, 0, 0), false);
984
985 let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
986
987 assert_eq!(plan.node_order.len(), 1);
989 assert_eq!(plan.node_order[0], "node2");
990 }
991
992 #[test]
993 fn test_update_leader_status() {
994 let manager = UpgradeManager::new(UpgradeConfig::default());
995 manager.register_node("node1", Version::new(1, 0, 0), false);
996
997 assert!(!manager.get_node_status("node1").unwrap().is_leader);
998
999 manager.update_leader_status("node1", true);
1000 assert!(manager.get_node_status("node1").unwrap().is_leader);
1001 }
1002}