nodedb_cluster/migration_executor/
tracker.rs1use std::sync::Mutex;
4use std::time::Duration;
5
6use crate::migration::MigrationState;
7
8pub struct MigrationTracker {
10 active: Mutex<Vec<MigrationState>>,
11}
12
13impl MigrationTracker {
14 pub fn new() -> Self {
15 Self {
16 active: Mutex::new(Vec::new()),
17 }
18 }
19
20 pub fn add(&self, state: MigrationState) {
21 let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
22 active.push(state);
23 }
24
25 pub fn active_count(&self) -> usize {
26 let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
27 active.iter().filter(|s| s.is_active()).count()
28 }
29
30 pub fn snapshot(&self) -> Vec<MigrationSnapshot> {
31 let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
32 active
33 .iter()
34 .map(|s| MigrationSnapshot {
35 vshard_id: s.vshard_id(),
36 phase: format!("{:?}", s.phase()),
37 elapsed_ms: s.elapsed().map(|d| d.as_millis() as u64).unwrap_or(0),
38 is_active: s.is_active(),
39 })
40 .collect()
41 }
42
43 pub fn gc(&self, max_age: Duration) {
44 let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
45 active.retain(|s| s.is_active() || s.elapsed().map(|d| d < max_age).unwrap_or(true));
46 }
47}
48
49impl Default for MigrationTracker {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct MigrationSnapshot {
58 pub vshard_id: u32,
59 pub phase: String,
60 pub elapsed_ms: u64,
61 pub is_active: bool,
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::migration::MigrationState;
68
69 #[test]
70 fn migration_tracker_lifecycle() {
71 let tracker = MigrationTracker::new();
72 assert_eq!(tracker.active_count(), 0);
73
74 let mut state = MigrationState::new(0, 0, 1, 1, 2, 500_000);
75 state.start_base_copy(100);
76 tracker.add(state);
77
78 assert_eq!(tracker.active_count(), 1);
79 assert_eq!(tracker.snapshot().len(), 1);
80 assert!(tracker.snapshot()[0].is_active);
81 }
82}