Skip to main content

nodedb_cluster/migration_executor/
executor.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3use std::sync::{Arc, Mutex, RwLock};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use tracing::info;
7use uuid::Uuid;
8
9use crate::catalog::ClusterCatalog;
10use crate::decommission::MetadataProposer;
11use crate::error::{ClusterError, Result};
12use crate::ghost::GhostTable;
13use crate::metadata_group::MetadataEntry as Entry;
14use crate::metadata_group::migration_state::{
15    MigrationCheckpointPayload, MigrationId, MigrationPhaseTag, SharedMigrationStateTable,
16};
17use crate::migration::{MigrationPhase, MigrationState};
18use crate::multi_raft::MultiRaft;
19use crate::routing::RoutingTable;
20use crate::topology::ClusterTopology;
21use crate::transport::NexarTransport;
22
23/// Configuration for a vShard migration.
24#[derive(Debug, Clone)]
25pub struct MigrationRequest {
26    pub vshard_id: u32,
27    pub source_node: u64,
28    pub target_node: u64,
29    /// Maximum allowed write pause during Phase 3 (microseconds).
30    pub write_pause_budget_us: u64,
31}
32
33impl Default for MigrationRequest {
34    fn default() -> Self {
35        Self {
36            vshard_id: 0,
37            source_node: 0,
38            target_node: 0,
39            write_pause_budget_us: 500_000,
40        }
41    }
42}
43
44/// Result of a completed migration.
45#[derive(Debug)]
46pub struct MigrationResult {
47    pub vshard_id: u32,
48    pub source_node: u64,
49    pub target_node: u64,
50    pub phase: MigrationPhase,
51    pub elapsed: Option<Duration>,
52    pub migration_id: MigrationId,
53}
54
55/// Executes a vShard migration through the 3-phase protocol.
56pub struct MigrationExecutor {
57    pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
58    pub(super) routing: Arc<RwLock<RoutingTable>>,
59    pub(super) topology: Arc<RwLock<ClusterTopology>>,
60    pub(super) transport: Arc<NexarTransport>,
61    pub(super) ghost_table: Arc<Mutex<GhostTable>>,
62    pub(super) catalog: Option<Arc<ClusterCatalog>>,
63    pub(super) metadata_proposer: Option<Arc<dyn MetadataProposer>>,
64    pub(super) migration_state: Option<SharedMigrationStateTable>,
65}
66
67impl MigrationExecutor {
68    pub fn new(
69        multi_raft: Arc<Mutex<MultiRaft>>,
70        routing: Arc<RwLock<RoutingTable>>,
71        topology: Arc<RwLock<ClusterTopology>>,
72        transport: Arc<NexarTransport>,
73    ) -> Self {
74        Self {
75            multi_raft,
76            routing,
77            topology,
78            transport,
79            ghost_table: Arc::new(Mutex::new(GhostTable::new())),
80            catalog: None,
81            metadata_proposer: None,
82            migration_state: None,
83        }
84    }
85
86    pub fn with_metadata_proposer(mut self, proposer: Arc<dyn MetadataProposer>) -> Self {
87        self.metadata_proposer = Some(proposer);
88        self
89    }
90
91    pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
92        self.catalog = Some(catalog);
93        self
94    }
95
96    pub fn with_migration_state(mut self, state: SharedMigrationStateTable) -> Self {
97        self.migration_state = Some(state);
98        self
99    }
100
101    pub fn ghost_table(&self) -> &Arc<Mutex<GhostTable>> {
102        &self.ghost_table
103    }
104
105    pub async fn execute(&self, req: MigrationRequest) -> Result<MigrationResult> {
106        let source_group = {
107            let routing = self.routing.read().unwrap_or_else(|p| p.into_inner());
108            routing.group_for_vshard(req.vshard_id)?
109        };
110
111        if let Some(state_table) = &self.migration_state {
112            let guard = state_table.lock().unwrap_or_else(|p| p.into_inner());
113            for row in guard.all_checkpoints() {
114                if let Some(_id) = row.migration_uuid() {
115                    let vshard_matches = match &row.payload {
116                        MigrationCheckpointPayload::AddLearner { vshard_id, .. } => {
117                            *vshard_id == req.vshard_id
118                        }
119                        MigrationCheckpointPayload::CatchUp { vshard_id, .. } => {
120                            *vshard_id == req.vshard_id
121                        }
122                        MigrationCheckpointPayload::PromoteLearner { vshard_id, .. } => {
123                            *vshard_id == req.vshard_id
124                        }
125                        MigrationCheckpointPayload::LeadershipTransfer { vshard_id, .. } => {
126                            *vshard_id == req.vshard_id
127                        }
128                        MigrationCheckpointPayload::Cutover { vshard_id, .. } => {
129                            *vshard_id == req.vshard_id
130                        }
131                        MigrationCheckpointPayload::Complete { vshard_id, .. } => {
132                            *vshard_id == req.vshard_id
133                        }
134                    };
135                    if vshard_matches && row.payload.phase_tag() != MigrationPhaseTag::Complete {
136                        return Err(ClusterError::MigrationInProgress {
137                            vshard_id: req.vshard_id,
138                        });
139                    }
140                }
141            }
142        }
143
144        let migration_id = Uuid::new_v4();
145
146        let mut state = MigrationState::new(
147            req.vshard_id,
148            source_group,
149            source_group,
150            req.source_node,
151            req.target_node,
152            req.write_pause_budget_us,
153        );
154
155        info!(
156            vshard = req.vshard_id,
157            source = req.source_node,
158            target = req.target_node,
159            group = source_group,
160            migration_id = %migration_id,
161            "starting vShard migration"
162        );
163
164        super::phases::phase1_base_copy(self, &mut state, source_group, &req, migration_id).await?;
165        super::phases::phase2_wal_catchup(self, &mut state, source_group, &req, migration_id)
166            .await?;
167        super::phases::phase3_cutover(self, &mut state, source_group, &req, migration_id).await?;
168
169        let elapsed = state.elapsed();
170        let phase = state.phase().clone();
171
172        info!(
173            vshard = req.vshard_id,
174            migration_id = %migration_id,
175            elapsed_ms = elapsed.map(|d| d.as_millis() as u64).unwrap_or(0),
176            "vShard migration completed"
177        );
178
179        Ok(MigrationResult {
180            vshard_id: req.vshard_id,
181            source_node: req.source_node,
182            target_node: req.target_node,
183            phase,
184            elapsed,
185            migration_id,
186        })
187    }
188
189    pub(super) async fn propose_checkpoint(
190        &self,
191        migration_id: MigrationId,
192        attempt: u32,
193        payload: MigrationCheckpointPayload,
194    ) -> Result<()> {
195        let Some(proposer) = &self.metadata_proposer else {
196            return Ok(());
197        };
198
199        let ts_ms = SystemTime::now()
200            .duration_since(UNIX_EPOCH)
201            .unwrap_or_default()
202            .as_millis() as u64;
203
204        let crc32c = payload.crc32c()?;
205        let phase = payload.phase_tag();
206
207        let entry = Entry::MigrationCheckpoint {
208            migration_id: migration_id.hyphenated().to_string(),
209            phase,
210            attempt,
211            payload,
212            crc32c,
213            ts_ms,
214        };
215
216        proposer.propose_and_wait(entry).await?;
217        Ok(())
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn migration_request_default() {
227        let req = MigrationRequest::default();
228        assert_eq!(req.write_pause_budget_us, 500_000);
229    }
230}