#![cfg(any(feature = "postgres", feature = "mysql"))]
use std::collections::HashMap;
use rustcdc::source::SnapshotProgress;
const TABLE_COUNT: usize = 5;
const ROWS_PER_TABLE: usize = 100_000;
const CHUNK_SIZE: usize = 5_000;
const CHUNKS_PER_TABLE: usize = ROWS_PER_TABLE / CHUNK_SIZE;
fn table_names() -> Vec<String> {
(0..TABLE_COUNT)
.map(|index| format!("public.table_{index}"))
.collect()
}
#[test]
fn snapshot_progress_resumes_from_table2_chunk5_and_skips_completed_work() {
let tables = table_names();
let mut phase1 = SnapshotProgress::new("snapshot-resumable-5x100k".into(), 1_700_000_000);
for table in &tables {
phase1.add_table(table.clone());
}
for table in &tables[0..2] {
for chunk in 0..CHUNKS_PER_TABLE {
phase1
.record_table_chunk(
table,
CHUNK_SIZE,
Some(format!("{table}:cursor:{chunk}").into_bytes()),
)
.expect("phase1 chunk record for completed tables should succeed");
}
phase1
.mark_table_complete(table)
.expect("phase1 completion marker should succeed");
}
let table2 = &tables[2];
for chunk in 0..5 {
phase1
.record_table_chunk(
table2,
CHUNK_SIZE,
Some(format!("{table2}:cursor:{chunk}").into_bytes()),
)
.expect("phase1 chunk record for table2 should succeed");
}
let checkpoint_bytes = phase1.encode().expect("checkpoint encoding should succeed");
let mut resumed =
SnapshotProgress::decode(&checkpoint_bytes).expect("checkpoint decoding should succeed");
for table in &tables[0..2] {
let progress = resumed
.get_table_progress(table)
.expect("restored progress should include completed tables");
assert!(progress.is_complete);
assert_eq!(progress.chunk_index, CHUNKS_PER_TABLE as u64);
assert_eq!(progress.row_count, ROWS_PER_TABLE as u64);
}
let restored_table2 = resumed
.get_table_progress(table2)
.expect("restored progress should include table2");
assert!(!restored_table2.is_complete);
assert_eq!(restored_table2.chunk_index, 5);
assert_eq!(restored_table2.row_count, 25_000);
for table in &tables[3..5] {
let progress = resumed
.get_table_progress(table)
.expect("restored progress should include untouched tables");
assert!(!progress.is_complete);
assert_eq!(progress.chunk_index, 0);
assert_eq!(progress.row_count, 0);
}
let mut resumed_chunks: HashMap<String, usize> = HashMap::new();
for chunk in 5..CHUNKS_PER_TABLE {
resumed
.record_table_chunk(
table2,
CHUNK_SIZE,
Some(format!("{table2}:cursor:{chunk}").into_bytes()),
)
.expect("resumed chunk record for table2 should succeed");
*resumed_chunks.entry(table2.clone()).or_insert(0) += 1;
}
resumed
.mark_table_complete(table2)
.expect("resumed completion marker for table2 should succeed");
for table in &tables[3..5] {
for chunk in 0..CHUNKS_PER_TABLE {
resumed
.record_table_chunk(
table,
CHUNK_SIZE,
Some(format!("{table}:cursor:{chunk}").into_bytes()),
)
.expect("resumed chunk record for remaining tables should succeed");
*resumed_chunks.entry(table.clone()).or_insert(0) += 1;
}
resumed
.mark_table_complete(table)
.expect("resumed completion marker for remaining tables should succeed");
}
assert_eq!(resumed_chunks.get(&tables[0]).copied().unwrap_or(0), 0);
assert_eq!(resumed_chunks.get(&tables[1]).copied().unwrap_or(0), 0);
assert_eq!(resumed_chunks.get(table2).copied().unwrap_or(0), 15);
assert_eq!(resumed_chunks.get(&tables[3]).copied().unwrap_or(0), 20);
assert_eq!(resumed_chunks.get(&tables[4]).copied().unwrap_or(0), 20);
assert!(resumed.is_all_complete());
assert_eq!(resumed.completed_tables(), TABLE_COUNT);
assert!(resumed.get_pending_tables().is_empty());
assert_eq!(
resumed.total_rows_processed(),
(TABLE_COUNT * ROWS_PER_TABLE) as u64
);
for table in &tables {
let progress = resumed
.get_table_progress(table)
.expect("final progress should include every table");
assert!(progress.is_complete);
assert_eq!(progress.chunk_index, CHUNKS_PER_TABLE as u64);
assert_eq!(progress.row_count, ROWS_PER_TABLE as u64);
}
}