nodedb_cluster/migration_executor/
executor.rs1use std::sync::{Arc, Mutex, RwLock};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use tracing::info;
7use uuid::Uuid;
8
9use crate::catalog::ClusterCatalog;
10use crate::decommission::MetadataProposer;
11use crate::error::{ClusterError, Result};
12use crate::ghost::GhostTable;
13use crate::metadata_group::MetadataEntry as Entry;
14use crate::metadata_group::migration_state::{
15 MigrationCheckpointPayload, MigrationId, MigrationPhaseTag, SharedMigrationStateTable,
16};
17use crate::migration::{MigrationPhase, MigrationState};
18use crate::multi_raft::MultiRaft;
19use crate::routing::RoutingTable;
20use crate::topology::ClusterTopology;
21use crate::transport::NexarTransport;
22
23#[derive(Debug, Clone)]
25pub struct MigrationRequest {
26 pub vshard_id: u32,
27 pub source_node: u64,
28 pub target_node: u64,
29 pub write_pause_budget_us: u64,
31}
32
33impl Default for MigrationRequest {
34 fn default() -> Self {
35 Self {
36 vshard_id: 0,
37 source_node: 0,
38 target_node: 0,
39 write_pause_budget_us: 500_000,
40 }
41 }
42}
43
44#[derive(Debug)]
46pub struct MigrationResult {
47 pub vshard_id: u32,
48 pub source_node: u64,
49 pub target_node: u64,
50 pub phase: MigrationPhase,
51 pub elapsed: Option<Duration>,
52 pub migration_id: MigrationId,
53}
54
55pub struct MigrationExecutor {
57 pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
58 pub(super) routing: Arc<RwLock<RoutingTable>>,
59 pub(super) topology: Arc<RwLock<ClusterTopology>>,
60 pub(super) transport: Arc<NexarTransport>,
61 pub(super) ghost_table: Arc<Mutex<GhostTable>>,
62 pub(super) catalog: Option<Arc<ClusterCatalog>>,
63 pub(super) metadata_proposer: Option<Arc<dyn MetadataProposer>>,
64 pub(super) migration_state: Option<SharedMigrationStateTable>,
65}
66
67impl MigrationExecutor {
68 pub fn new(
69 multi_raft: Arc<Mutex<MultiRaft>>,
70 routing: Arc<RwLock<RoutingTable>>,
71 topology: Arc<RwLock<ClusterTopology>>,
72 transport: Arc<NexarTransport>,
73 ) -> Self {
74 Self {
75 multi_raft,
76 routing,
77 topology,
78 transport,
79 ghost_table: Arc::new(Mutex::new(GhostTable::new())),
80 catalog: None,
81 metadata_proposer: None,
82 migration_state: None,
83 }
84 }
85
86 pub fn with_metadata_proposer(mut self, proposer: Arc<dyn MetadataProposer>) -> Self {
87 self.metadata_proposer = Some(proposer);
88 self
89 }
90
91 pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
92 self.catalog = Some(catalog);
93 self
94 }
95
96 pub fn with_migration_state(mut self, state: SharedMigrationStateTable) -> Self {
97 self.migration_state = Some(state);
98 self
99 }
100
101 pub fn ghost_table(&self) -> &Arc<Mutex<GhostTable>> {
102 &self.ghost_table
103 }
104
105 pub async fn execute(&self, req: MigrationRequest) -> Result<MigrationResult> {
106 let source_group = {
107 let routing = self.routing.read().unwrap_or_else(|p| p.into_inner());
108 routing.group_for_vshard(req.vshard_id)?
109 };
110
111 if let Some(state_table) = &self.migration_state {
112 let guard = state_table.lock().unwrap_or_else(|p| p.into_inner());
113 for row in guard.all_checkpoints() {
114 if let Some(_id) = row.migration_uuid() {
115 let vshard_matches = match &row.payload {
116 MigrationCheckpointPayload::AddLearner { vshard_id, .. } => {
117 *vshard_id == req.vshard_id
118 }
119 MigrationCheckpointPayload::CatchUp { vshard_id, .. } => {
120 *vshard_id == req.vshard_id
121 }
122 MigrationCheckpointPayload::PromoteLearner { vshard_id, .. } => {
123 *vshard_id == req.vshard_id
124 }
125 MigrationCheckpointPayload::LeadershipTransfer { vshard_id, .. } => {
126 *vshard_id == req.vshard_id
127 }
128 MigrationCheckpointPayload::Cutover { vshard_id, .. } => {
129 *vshard_id == req.vshard_id
130 }
131 MigrationCheckpointPayload::Complete { vshard_id, .. } => {
132 *vshard_id == req.vshard_id
133 }
134 };
135 if vshard_matches && row.payload.phase_tag() != MigrationPhaseTag::Complete {
136 return Err(ClusterError::MigrationInProgress {
137 vshard_id: req.vshard_id,
138 });
139 }
140 }
141 }
142 }
143
144 let migration_id = Uuid::new_v4();
145
146 let mut state = MigrationState::new(
147 req.vshard_id,
148 source_group,
149 source_group,
150 req.source_node,
151 req.target_node,
152 req.write_pause_budget_us,
153 );
154
155 info!(
156 vshard = req.vshard_id,
157 source = req.source_node,
158 target = req.target_node,
159 group = source_group,
160 migration_id = %migration_id,
161 "starting vShard migration"
162 );
163
164 super::phases::phase1_base_copy(self, &mut state, source_group, &req, migration_id).await?;
165 super::phases::phase2_wal_catchup(self, &mut state, source_group, &req, migration_id)
166 .await?;
167 super::phases::phase3_cutover(self, &mut state, source_group, &req, migration_id).await?;
168
169 let elapsed = state.elapsed();
170 let phase = state.phase().clone();
171
172 info!(
173 vshard = req.vshard_id,
174 migration_id = %migration_id,
175 elapsed_ms = elapsed.map(|d| d.as_millis() as u64).unwrap_or(0),
176 "vShard migration completed"
177 );
178
179 Ok(MigrationResult {
180 vshard_id: req.vshard_id,
181 source_node: req.source_node,
182 target_node: req.target_node,
183 phase,
184 elapsed,
185 migration_id,
186 })
187 }
188
189 pub(super) async fn propose_checkpoint(
190 &self,
191 migration_id: MigrationId,
192 attempt: u32,
193 payload: MigrationCheckpointPayload,
194 ) -> Result<()> {
195 let Some(proposer) = &self.metadata_proposer else {
196 return Ok(());
197 };
198
199 let ts_ms = SystemTime::now()
200 .duration_since(UNIX_EPOCH)
201 .unwrap_or_default()
202 .as_millis() as u64;
203
204 let crc32c = payload.crc32c()?;
205 let phase = payload.phase_tag();
206
207 let entry = Entry::MigrationCheckpoint {
208 migration_id: migration_id.hyphenated().to_string(),
209 phase,
210 attempt,
211 payload,
212 crc32c,
213 ts_ms,
214 };
215
216 proposer.propose_and_wait(entry).await?;
217 Ok(())
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn migration_request_default() {
227 let req = MigrationRequest::default();
228 assert_eq!(req.write_pause_budget_us, 500_000);
229 }
230}