use super::network::{MigrationBatch, NetworkError, ShardClient};
use super::rebalance::{MigrationPlan, MigrationState};
use super::types::ShardId;
use crate::core::id::NodeId;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutingToken {
pub label: String,
pub primary_shard: ShardId,
pub targets: Vec<ShardId>,
pub migration_version: u64,
}
#[derive(Debug, Clone)]
#[allow(missing_docs)]
pub enum MigrationError {
NetworkError(NetworkError),
ChecksumMismatch {
batch_number: u64,
expected: u64,
actual: u64,
},
BatchRejected { batch_number: u64, reason: String },
Cancelled(u64),
Timeout {
migration_id: u64,
phase: MigrationState,
elapsed: Duration,
},
SourceUnavailable(ShardId),
TargetUnavailable(ShardId),
InvalidState {
migration_id: u64,
expected: MigrationState,
actual: MigrationState,
},
VerificationFailed { migration_id: u64, reason: String },
}
impl fmt::Display for MigrationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MigrationError::NetworkError(e) => write!(f, "Network error: {}", e),
MigrationError::ChecksumMismatch {
batch_number,
expected,
actual,
} => {
write!(
f,
"Checksum mismatch in batch {}: expected {}, got {}",
batch_number, expected, actual
)
}
MigrationError::BatchRejected {
batch_number,
reason,
} => {
write!(f, "Batch {} rejected: {}", batch_number, reason)
}
MigrationError::Cancelled(id) => {
write!(f, "Migration {} was cancelled", id)
}
MigrationError::Timeout {
migration_id,
phase,
elapsed,
} => {
write!(
f,
"Migration {} timed out in {:?} phase after {:?}",
migration_id, phase, elapsed
)
}
MigrationError::SourceUnavailable(shard_id) => {
write!(f, "Source shard {} unavailable", shard_id)
}
MigrationError::TargetUnavailable(shard_id) => {
write!(f, "Target shard {} unavailable", shard_id)
}
MigrationError::InvalidState {
migration_id,
expected,
actual,
} => {
write!(
f,
"Migration {} in invalid state: expected {:?}, got {:?}",
migration_id, expected, actual
)
}
MigrationError::VerificationFailed {
migration_id,
reason,
} => {
write!(
f,
"Migration {} verification failed: {}",
migration_id, reason
)
}
}
}
}
impl std::error::Error for MigrationError {}
impl From<NetworkError> for MigrationError {
fn from(e: NetworkError) -> Self {
MigrationError::NetworkError(e)
}
}
pub type MigrationResult<T> = Result<T, MigrationError>;
#[derive(Debug, Clone)]
pub struct MigrationConfig {
pub batch_size: usize,
pub max_concurrent_batches: usize,
pub batch_timeout: Duration,
pub migration_timeout: Duration,
pub batch_retries: usize,
pub verification_sample_rate: f64,
pub pause_on_error: bool,
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
batch_size: 1000,
max_concurrent_batches: 4,
batch_timeout: Duration::from_secs(60),
migration_timeout: Duration::from_secs(3600),
batch_retries: 3,
verification_sample_rate: 0.01, pause_on_error: true,
}
}
}
#[derive(Debug)]
pub struct DualWriteRouter {
active_migrations: RwLock<HashMap<String, (ShardId, ShardId)>>,
migrating_nodes: RwLock<HashSet<NodeId>>,
}
impl DualWriteRouter {
pub fn new() -> Self {
Self {
active_migrations: RwLock::new(HashMap::new()),
migrating_nodes: RwLock::new(HashSet::new()),
}
}
pub fn register_migration(&self, labels: &[String], source: ShardId, target: ShardId) {
if let Ok(mut migrations) = self.active_migrations.write() {
for label in labels {
migrations.insert(label.clone(), (source, target));
}
}
}
pub fn unregister_migration(&self, labels: &[String]) {
if let Ok(mut migrations) = self.active_migrations.write() {
for label in labels {
migrations.remove(label);
}
}
}
pub fn register_nodes(&self, nodes: &[NodeId]) {
if let Ok(mut migrating) = self.migrating_nodes.write() {
for node_id in nodes {
migrating.insert(*node_id);
}
}
}
pub fn unregister_nodes(&self, nodes: &[NodeId]) {
if let Ok(mut migrating) = self.migrating_nodes.write() {
for node_id in nodes {
migrating.remove(node_id);
}
}
}
pub fn is_label_migrating(&self, label: &str) -> bool {
self.active_migrations
.read()
.map(|m| m.contains_key(label))
.unwrap_or(false)
}
pub fn is_node_migrating(&self, node_id: NodeId) -> bool {
self.migrating_nodes
.read()
.map(|m| m.contains(&node_id))
.unwrap_or(false)
}
pub fn route_write(&self, label: &str, primary_shard: ShardId) -> Vec<ShardId> {
if let Ok(migrations) = self.active_migrations.read()
&& let Some((source, target)) = migrations.get(label)
&& primary_shard == *source
{
return vec![*source, *target];
}
vec![primary_shard]
}
pub fn with_route_write<T, F>(
&self,
label: &str,
primary_shard: ShardId,
write_fn: F,
) -> Option<T>
where
F: FnOnce(&[ShardId]) -> T,
{
let migrations = self.active_migrations.read().ok()?;
let targets = if let Some((source, target)) = migrations.get(label) {
if primary_shard == *source {
vec![*source, *target]
} else {
vec![primary_shard]
}
} else {
vec![primary_shard]
};
Some(write_fn(&targets))
}
pub fn generate_routing_token(&self, label: &str, primary_shard: ShardId) -> RoutingToken {
let (targets, migration_version) = self
.active_migrations
.read()
.map(|migrations| {
let version = migrations.len() as u64; let targets = if let Some((source, target)) = migrations.get(label) {
if primary_shard == *source {
vec![*source, *target]
} else {
vec![primary_shard]
}
} else {
vec![primary_shard]
};
(targets, version)
})
.unwrap_or_else(|_| (vec![primary_shard], 0));
RoutingToken {
label: label.to_string(),
primary_shard,
targets,
migration_version,
}
}
pub fn verify_routing_token(&self, token: &RoutingToken) -> bool {
self.active_migrations
.read()
.map(|migrations| {
let current_targets = if let Some((source, target)) = migrations.get(&token.label) {
if token.primary_shard == *source {
vec![*source, *target]
} else {
vec![token.primary_shard]
}
} else {
vec![token.primary_shard]
};
current_targets == token.targets
})
.unwrap_or(false)
}
pub fn active_migration_count(&self) -> usize {
self.active_migrations.read().map(|m| m.len()).unwrap_or(0)
}
}
impl Default for DualWriteRouter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct MigrationExecutor<C: ShardClient> {
config: MigrationConfig,
clients: HashMap<ShardId, Arc<C>>,
dual_write_router: Arc<DualWriteRouter>,
cancellation_flags: RwLock<HashMap<u64, Arc<AtomicBool>>>,
batches_transferred: AtomicU64,
bytes_transferred: AtomicU64,
}
impl<C: ShardClient> MigrationExecutor<C> {
pub fn new(config: MigrationConfig, dual_write_router: Arc<DualWriteRouter>) -> Self {
Self {
config,
clients: HashMap::new(),
dual_write_router,
cancellation_flags: RwLock::new(HashMap::new()),
batches_transferred: AtomicU64::new(0),
bytes_transferred: AtomicU64::new(0),
}
}
pub fn register_client(&mut self, shard_id: ShardId, client: Arc<C>) {
self.clients.insert(shard_id, client);
}
pub fn execute(&self, plan: &mut MigrationPlan) -> MigrationResult<MigrationStats> {
let start = Instant::now();
let migration_id = plan.id;
let cancel_flag = Arc::new(AtomicBool::new(false));
if let Ok(mut flags) = self.cancellation_flags.write() {
flags.insert(migration_id, Arc::clone(&cancel_flag));
}
let source_client = self
.clients
.get(&plan.source_shard)
.ok_or(MigrationError::SourceUnavailable(plan.source_shard))?;
let target_client = self
.clients
.get(&plan.target_shard)
.ok_or(MigrationError::TargetUnavailable(plan.target_shard))?;
self.start_dual_write(plan)?;
let copy_stats = self.copy_data(plan, source_client, target_client, &cancel_flag)?;
if cancel_flag.load(Ordering::SeqCst) {
self.cleanup_cancelled(plan)?;
return Err(MigrationError::Cancelled(migration_id));
}
self.verify_migration(plan, source_client, target_client)?;
self.cutover(plan)?;
self.cleanup(plan)?;
if let Ok(mut flags) = self.cancellation_flags.write() {
flags.remove(&migration_id);
}
Ok(MigrationStats {
migration_id,
total_time: start.elapsed(),
batches_transferred: copy_stats.batches,
nodes_transferred: copy_stats.nodes,
edges_transferred: copy_stats.edges,
bytes_transferred: copy_stats.bytes,
verification_passed: true,
})
}
pub fn cancel(&self, migration_id: u64) -> bool {
if let Ok(flags) = self.cancellation_flags.read()
&& let Some(flag) = flags.get(&migration_id)
{
flag.store(true, Ordering::SeqCst);
return true;
}
false
}
pub fn stats(&self) -> ExecutorStats {
ExecutorStats {
batches_transferred: self.batches_transferred.load(Ordering::Relaxed),
bytes_transferred: self.bytes_transferred.load(Ordering::Relaxed),
active_migrations: self.cancellation_flags.read().map(|f| f.len()).unwrap_or(0),
}
}
fn start_dual_write(&self, plan: &mut MigrationPlan) -> MigrationResult<()> {
self.dual_write_router.register_migration(
&plan.labels_to_migrate,
plan.source_shard,
plan.target_shard,
);
plan.state = MigrationState::DualWrite;
Ok(())
}
fn copy_data(
&self,
plan: &mut MigrationPlan,
source_client: &Arc<C>,
target_client: &Arc<C>,
cancel_flag: &Arc<AtomicBool>,
) -> MigrationResult<CopyStats> {
plan.state = MigrationState::Copying;
let mut stats = CopyStats::default();
let mut offset = 0u64;
let mut batch_number = 0u64;
loop {
if cancel_flag.load(Ordering::SeqCst) {
return Err(MigrationError::Cancelled(plan.id));
}
let batch = source_client.extract_migration_batch(
plan.id,
&plan.labels_to_migrate,
self.config.batch_size,
offset,
)?;
let is_last = batch.is_last;
let node_count = batch.nodes.len() as u64;
let edge_count = batch.edges.len() as u64;
if node_count == 0 && edge_count == 0 {
if is_last {
break;
}
continue;
}
let mut retries = self.config.batch_retries;
loop {
match target_client.receive_migration_batch(batch.clone()) {
Ok(response) => {
if !response.accepted {
return Err(MigrationError::BatchRejected {
batch_number,
reason: response.error.unwrap_or_else(|| "Unknown".into()),
});
}
stats.batches += 1;
stats.nodes += response.nodes_written;
stats.edges += response.edges_written;
stats.bytes += estimate_batch_size(&batch);
self.batches_transferred.fetch_add(1, Ordering::Relaxed);
self.bytes_transferred
.fetch_add(estimate_batch_size(&batch), Ordering::Relaxed);
plan.progress.update(
response.nodes_written,
response.edges_written,
estimate_batch_size(&batch),
);
break;
}
Err(e) => {
if retries == 0 {
return Err(e.into());
}
retries -= 1;
}
}
}
if is_last {
break;
}
offset += self.config.batch_size as u64;
batch_number += 1;
}
Ok(stats)
}
fn verify_migration(
&self,
plan: &mut MigrationPlan,
source_client: &Arc<C>,
target_client: &Arc<C>,
) -> MigrationResult<()> {
plan.state = MigrationState::Verifying;
let _source_state = source_client.get_state()?;
let target_state = target_client.get_state()?;
if target_state.node_count == 0 && plan.progress.total_nodes > 0 {
return Err(MigrationError::VerificationFailed {
migration_id: plan.id,
reason: "Target shard has no nodes after migration".into(),
});
}
Ok(())
}
fn cutover(&self, plan: &mut MigrationPlan) -> MigrationResult<()> {
plan.state = MigrationState::Cutover;
self.dual_write_router
.unregister_migration(&plan.labels_to_migrate);
Ok(())
}
fn cleanup(&self, plan: &mut MigrationPlan) -> MigrationResult<()> {
plan.state = MigrationState::Cleanup;
plan.state = MigrationState::Completed;
Ok(())
}
fn cleanup_cancelled(&self, plan: &mut MigrationPlan) -> MigrationResult<()> {
self.dual_write_router
.unregister_migration(&plan.labels_to_migrate);
plan.state = MigrationState::Cancelled;
Ok(())
}
}
#[derive(Debug, Clone, Default)]
struct CopyStats {
batches: u64,
nodes: u64,
edges: u64,
bytes: u64,
}
#[derive(Debug, Clone)]
pub struct MigrationStats {
pub migration_id: u64,
pub total_time: Duration,
pub batches_transferred: u64,
pub nodes_transferred: u64,
pub edges_transferred: u64,
pub bytes_transferred: u64,
pub verification_passed: bool,
}
#[derive(Debug, Clone)]
pub struct ExecutorStats {
pub batches_transferred: u64,
pub bytes_transferred: u64,
pub active_migrations: usize,
}
fn estimate_batch_size(batch: &MigrationBatch) -> u64 {
let node_size: u64 = batch
.nodes
.iter()
.map(|n| 8 + n.label.len() as u64 + n.properties.len() as u64 + 16)
.sum();
let edge_size: u64 = batch
.edges
.iter()
.map(|e| 24 + e.label.len() as u64 + e.properties.len() as u64 + 16)
.sum();
node_size + edge_size + 32 }
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::sharding::network::{MockShardClient, NodeData};
use crate::storage::sharding::rebalance::{MigrationPlan, MigrationReason};
fn make_shard_id(id: u16) -> ShardId {
ShardId::new(id).unwrap()
}
fn make_mock_plan() -> MigrationPlan {
MigrationPlan::new(
1,
make_shard_id(0),
make_shard_id(1),
vec!["Person".to_string()],
100,
200,
MigrationReason::Imbalance,
)
}
#[test]
fn test_migration_error_display() {
let err = MigrationError::ChecksumMismatch {
batch_number: 1,
expected: 100,
actual: 200,
};
assert!(format!("{}", err).contains("Checksum"));
let err = MigrationError::Cancelled(42);
assert!(format!("{}", err).contains("cancelled"));
let err = MigrationError::SourceUnavailable(make_shard_id(0));
assert!(format!("{}", err).contains("Source"));
}
#[test]
fn test_migration_error_from_network() {
let net_err = NetworkError::ShardUnavailable(make_shard_id(0));
let mig_err: MigrationError = net_err.into();
assert!(matches!(mig_err, MigrationError::NetworkError(_)));
}
#[test]
fn test_migration_config_default() {
let config = MigrationConfig::default();
assert_eq!(config.batch_size, 1000);
assert_eq!(config.batch_retries, 3);
assert!(config.verification_sample_rate > 0.0);
}
#[test]
fn test_dual_write_router_creation() {
let router = DualWriteRouter::new();
assert_eq!(router.active_migration_count(), 0);
}
#[test]
fn test_dual_write_router_register() {
let router = DualWriteRouter::new();
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
assert!(router.is_label_migrating("Person"));
assert!(!router.is_label_migrating("Place"));
assert_eq!(router.active_migration_count(), 1);
}
#[test]
fn test_dual_write_router_unregister() {
let router = DualWriteRouter::new();
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
router.unregister_migration(&["Person".to_string()]);
assert!(!router.is_label_migrating("Person"));
assert_eq!(router.active_migration_count(), 0);
}
#[test]
fn test_dual_write_router_route() {
let router = DualWriteRouter::new();
let targets = router.route_write("Person", make_shard_id(0));
assert_eq!(targets, vec![make_shard_id(0)]);
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
let targets = router.route_write("Person", make_shard_id(0));
assert_eq!(targets.len(), 2);
assert!(targets.contains(&make_shard_id(0)));
assert!(targets.contains(&make_shard_id(1)));
}
#[test]
fn test_dual_write_router_node_tracking() {
let router = DualWriteRouter::new();
let node_id = NodeId::new(1).unwrap();
assert!(!router.is_node_migrating(node_id));
router.register_nodes(&[node_id]);
assert!(router.is_node_migrating(node_id));
router.unregister_nodes(&[node_id]);
assert!(!router.is_node_migrating(node_id));
}
#[test]
fn test_executor_creation() {
let router = Arc::new(DualWriteRouter::new());
let executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router);
let stats = executor.stats();
assert_eq!(stats.batches_transferred, 0);
assert_eq!(stats.active_migrations, 0);
}
#[test]
fn test_executor_register_client() {
let router = Arc::new(DualWriteRouter::new());
let mut executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router);
let client = Arc::new(MockShardClient::new(make_shard_id(0)));
executor.register_client(make_shard_id(0), client);
}
#[test]
fn test_executor_source_unavailable() {
let router = Arc::new(DualWriteRouter::new());
let executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router);
let mut plan = make_mock_plan();
let result = executor.execute(&mut plan);
assert!(matches!(result, Err(MigrationError::SourceUnavailable(_))));
}
#[test]
fn test_executor_target_unavailable() {
let router = Arc::new(DualWriteRouter::new());
let mut executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router);
let source_client = Arc::new(MockShardClient::new(make_shard_id(0)));
executor.register_client(make_shard_id(0), source_client);
let mut plan = make_mock_plan();
let result = executor.execute(&mut plan);
assert!(matches!(result, Err(MigrationError::TargetUnavailable(_))));
}
#[test]
fn test_executor_successful_migration() {
let router = Arc::new(DualWriteRouter::new());
let mut executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router.clone());
let source_client = Arc::new(MockShardClient::new(make_shard_id(0)));
let target_client = Arc::new(MockShardClient::new(make_shard_id(1)));
let mut target_state = crate::storage::sharding::types::ShardState::new(make_shard_id(1));
target_state.node_count = 100;
target_client.set_state(target_state);
executor.register_client(make_shard_id(0), source_client);
executor.register_client(make_shard_id(1), target_client);
let mut plan = make_mock_plan();
let result = executor.execute(&mut plan);
assert!(result.is_ok());
assert_eq!(plan.state, MigrationState::Completed);
}
#[test]
fn test_executor_cancel() {
let router = Arc::new(DualWriteRouter::new());
let executor: MigrationExecutor<MockShardClient> =
MigrationExecutor::new(MigrationConfig::default(), router);
assert!(!executor.cancel(999));
}
#[test]
fn test_migration_stats() {
let stats = MigrationStats {
migration_id: 1,
total_time: Duration::from_secs(10),
batches_transferred: 100,
nodes_transferred: 10000,
edges_transferred: 50000,
bytes_transferred: 1024 * 1024,
verification_passed: true,
};
assert_eq!(stats.migration_id, 1);
assert!(stats.verification_passed);
}
#[test]
fn test_estimate_batch_size() {
let batch = MigrationBatch {
migration_id: 1,
batch_number: 0,
is_last: true,
nodes: vec![NodeData {
id: NodeId::new(1).unwrap(),
label: "Person".to_string(),
properties: vec![0; 100],
valid_from: 0,
valid_to: None,
}],
edges: vec![],
checksum: 0,
};
let size = estimate_batch_size(&batch);
assert!(size > 100); }
#[test]
fn test_routing_token_generation() {
let router = DualWriteRouter::new();
let token = router.generate_routing_token("Person", make_shard_id(0));
assert_eq!(token.label, "Person");
assert_eq!(token.primary_shard, make_shard_id(0));
assert_eq!(token.targets, vec![make_shard_id(0)]);
assert_eq!(token.migration_version, 0);
}
#[test]
fn test_routing_token_with_migration() {
let router = DualWriteRouter::new();
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
let token = router.generate_routing_token("Person", make_shard_id(0));
assert_eq!(token.targets.len(), 2);
assert!(token.targets.contains(&make_shard_id(0)));
assert!(token.targets.contains(&make_shard_id(1)));
assert_eq!(token.migration_version, 1);
}
#[test]
fn test_routing_token_verification() {
let router = DualWriteRouter::new();
let token = router.generate_routing_token("Person", make_shard_id(0));
assert!(router.verify_routing_token(&token));
router.register_migration(&["Place".to_string()], make_shard_id(1), make_shard_id(2));
assert!(router.verify_routing_token(&token));
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
assert!(!router.verify_routing_token(&token)); }
#[test]
fn test_routing_token_debug() {
let token = RoutingToken {
label: "Person".to_string(),
primary_shard: make_shard_id(0),
targets: vec![make_shard_id(0)],
migration_version: 0,
};
let debug = format!("{:?}", token);
assert!(debug.contains("label"));
assert!(debug.contains("Person"));
}
#[test]
fn test_with_route_write_no_migration() {
let router = DualWriteRouter::new();
let result = router.with_route_write("Person", make_shard_id(0), |targets| {
assert_eq!(targets.len(), 1);
assert_eq!(targets[0], make_shard_id(0));
42
});
assert_eq!(result, Some(42));
}
#[test]
fn test_with_route_write_during_migration() {
let router = DualWriteRouter::new();
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
let result = router.with_route_write("Person", make_shard_id(0), |targets| {
assert_eq!(targets.len(), 2);
assert!(targets.contains(&make_shard_id(0)));
assert!(targets.contains(&make_shard_id(1)));
"success"
});
assert_eq!(result, Some("success"));
}
#[test]
fn test_with_route_write_different_primary() {
let router = DualWriteRouter::new();
router.register_migration(&["Person".to_string()], make_shard_id(0), make_shard_id(1));
let result = router.with_route_write("Person", make_shard_id(2), |targets| {
assert_eq!(targets.len(), 1);
assert_eq!(targets[0], make_shard_id(2));
true
});
assert_eq!(result, Some(true));
}
#[test]
fn test_dual_write_router_default() {
let router = DualWriteRouter::default();
assert_eq!(router.active_migration_count(), 0);
}
#[test]
fn test_migration_config_debug() {
let config = MigrationConfig::default();
let debug = format!("{:?}", config);
assert!(debug.contains("batch_size"));
assert!(debug.contains("batch_retries"));
}
#[test]
fn test_executor_stats_default() {
let stats = ExecutorStats {
batches_transferred: 0,
bytes_transferred: 0,
active_migrations: 0,
};
assert_eq!(stats.batches_transferred, 0);
assert_eq!(stats.active_migrations, 0);
}
#[test]
fn test_migration_error_target_unavailable() {
let err = MigrationError::TargetUnavailable(make_shard_id(1));
let msg = format!("{}", err);
assert!(msg.contains("Target"));
}
#[test]
fn test_migration_error_verification_failed() {
let err = MigrationError::VerificationFailed {
migration_id: 42,
reason: "test reason".to_string(),
};
let msg = format!("{}", err);
assert!(msg.contains("verification failed"));
}
#[test]
fn test_migration_error_batch_rejected() {
let err = MigrationError::BatchRejected {
batch_number: 5,
reason: "network error".to_string(),
};
let msg = format!("{}", err);
assert!(msg.contains("Batch"));
assert!(msg.contains("5"));
}
#[test]
fn test_migration_error_timeout() {
let err = MigrationError::Timeout {
migration_id: 1,
phase: MigrationState::Copying,
elapsed: Duration::from_secs(30),
};
let msg = format!("{}", err);
assert!(msg.contains("timed out"));
}
#[test]
fn test_migration_error_invalid_state() {
let err = MigrationError::InvalidState {
migration_id: 1,
expected: MigrationState::DualWrite,
actual: MigrationState::Completed,
};
let msg = format!("{}", err);
assert!(msg.contains("invalid state"));
}
}