Skip to main content

nodedb_cluster/migration_executor/
tracker.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3use std::sync::Mutex;
4use std::time::Duration;
5
6use crate::migration::MigrationState;
7
8/// Track active migrations across the cluster.
9pub struct MigrationTracker {
10    active: Mutex<Vec<MigrationState>>,
11}
12
13impl MigrationTracker {
14    pub fn new() -> Self {
15        Self {
16            active: Mutex::new(Vec::new()),
17        }
18    }
19
20    pub fn add(&self, state: MigrationState) {
21        let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
22        active.push(state);
23    }
24
25    pub fn active_count(&self) -> usize {
26        let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
27        active.iter().filter(|s| s.is_active()).count()
28    }
29
30    pub fn snapshot(&self) -> Vec<MigrationSnapshot> {
31        let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
32        active
33            .iter()
34            .map(|s| MigrationSnapshot {
35                vshard_id: s.vshard_id(),
36                phase: format!("{:?}", s.phase()),
37                elapsed_ms: s.elapsed().map(|d| d.as_millis() as u64).unwrap_or(0),
38                is_active: s.is_active(),
39            })
40            .collect()
41    }
42
43    pub fn gc(&self, max_age: Duration) {
44        let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
45        active.retain(|s| s.is_active() || s.elapsed().map(|d| d < max_age).unwrap_or(true));
46    }
47}
48
49impl Default for MigrationTracker {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55/// Observability snapshot of a migration.
56#[derive(Debug, Clone)]
57pub struct MigrationSnapshot {
58    pub vshard_id: u32,
59    pub phase: String,
60    pub elapsed_ms: u64,
61    pub is_active: bool,
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::migration::MigrationState;
68
69    #[test]
70    fn migration_tracker_lifecycle() {
71        let tracker = MigrationTracker::new();
72        assert_eq!(tracker.active_count(), 0);
73
74        let mut state = MigrationState::new(0, 0, 1, 1, 2, 500_000);
75        state.start_base_copy(100);
76        tracker.add(state);
77
78        assert_eq!(tracker.active_count(), 1);
79        assert_eq!(tracker.snapshot().len(), 1);
80        assert!(tracker.snapshot()[0].is_active);
81    }
82}