use std::sync::Arc;
use rustcdc::source::{SnapshotProgress, SnapshotProgressTracker, SnapshotTrackerConfig};
const TABLE_COUNT: usize = 10;
const ROWS_PER_TABLE: usize = 100_000;
const CHUNK_SIZE: usize = 5_000;
const CRASH_AFTER_CHUNKS: usize = 5;
const WORKERS: usize = 8;
fn table_names() -> Vec<String> {
(0..TABLE_COUNT)
.map(|idx| format!("public.table_{idx}"))
.collect()
}
fn worker_tables(tables: &[String], worker: usize) -> Vec<String> {
tables
.iter()
.enumerate()
.filter(|(idx, _)| idx % WORKERS == worker)
.map(|(_, table)| table.clone())
.collect()
}
#[test]
fn parallel_snapshot_stress_resume_after_chunk_5_for_10_tables() {
let tables = table_names();
let total_chunks = ROWS_PER_TABLE / CHUNK_SIZE;
let state = Arc::new(SnapshotProgressTracker::new(
"parallel-stress".into(),
1,
tables.clone(),
SnapshotTrackerConfig {
chunk_size: CHUNK_SIZE,
},
));
let mut first_phase = Vec::new();
for worker in 0..WORKERS {
let state_ref = state.clone();
let assigned = worker_tables(&tables, worker);
first_phase.push(std::thread::spawn(move || {
for table in assigned {
for chunk in 0..CRASH_AFTER_CHUNKS {
let cursor = Some(format!("{table}:{chunk}").into_bytes());
state_ref
.record_chunk_progress(&table, CHUNK_SIZE, cursor)
.expect("phase 1 progress update should succeed");
}
}
}));
}
for handle in first_phase {
handle.join().expect("phase 1 thread should join");
}
let before_crash_progress = state
.get_progress()
.expect("progress should be readable before crash");
let checkpoint_bytes = before_crash_progress
.encode()
.expect("progress checkpoint encode should succeed");
let restored = SnapshotProgress::decode(&checkpoint_bytes)
.expect("progress checkpoint decode should succeed");
for table in &tables {
let progress = restored
.get_table_progress(table)
.expect("table progress should exist after restore");
assert_eq!(progress.chunk_index, CRASH_AFTER_CHUNKS as u64);
assert_eq!(progress.row_count, (CRASH_AFTER_CHUNKS * CHUNK_SIZE) as u64);
assert!(!progress.is_complete);
}
let resumed = Arc::new(SnapshotProgressTracker::new(
restored.snapshot_id.clone(),
restored.created_at,
tables.clone(),
SnapshotTrackerConfig {
chunk_size: CHUNK_SIZE,
},
));
for table in &tables {
for _ in 0..CRASH_AFTER_CHUNKS {
resumed
.record_chunk_progress(table, CHUNK_SIZE, None)
.expect("rehydration progress update should succeed");
}
}
let mut second_phase = Vec::new();
for worker in 0..WORKERS {
let resumed_ref = resumed.clone();
let assigned = worker_tables(&tables, worker);
second_phase.push(std::thread::spawn(move || {
for table in assigned {
for chunk in CRASH_AFTER_CHUNKS..total_chunks {
let cursor = Some(format!("{table}:{chunk}").into_bytes());
resumed_ref
.record_chunk_progress(&table, CHUNK_SIZE, cursor)
.expect("phase 2 progress update should succeed");
}
resumed_ref
.mark_table_complete(&table)
.expect("table completion should succeed");
}
}));
}
for handle in second_phase {
handle.join().expect("phase 2 thread should join");
}
assert!(resumed.all_complete().expect("all_complete should succeed"));
assert_eq!(
resumed.completed_tables().expect("completed_tables"),
TABLE_COUNT
);
assert_eq!(resumed.progress_percent().expect("progress_percent"), 100);
assert!(resumed
.get_pending_tables()
.expect("pending tables")
.is_empty());
assert_eq!(
resumed.total_rows_processed().expect("total rows"),
(TABLE_COUNT * ROWS_PER_TABLE) as u64
);
}