use crate::common::{CdcError, CdcEvent, Result};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WatermarkStrategy {
#[default]
InsertInsert,
InsertDelete,
}
impl WatermarkStrategy {
pub fn requires_delete(&self) -> bool {
matches!(self, WatermarkStrategy::InsertDelete)
}
}
#[derive(Debug, Clone)]
pub struct IncrementalSnapshotConfig {
pub chunk_size: usize,
pub watermark_strategy: WatermarkStrategy,
pub max_buffer_memory: usize,
pub chunk_timeout: Duration,
pub inter_chunk_delay: Option<Duration>,
pub max_concurrent_chunks: usize,
pub allow_surrogate_key: bool,
pub signal_table: String,
}
impl Default for IncrementalSnapshotConfig {
fn default() -> Self {
Self {
chunk_size: 1024,
watermark_strategy: WatermarkStrategy::InsertInsert,
max_buffer_memory: 64 * 1024 * 1024, chunk_timeout: Duration::from_secs(60),
inter_chunk_delay: None,
max_concurrent_chunks: 1,
allow_surrogate_key: true,
signal_table: "debezium_signal".to_string(),
}
}
}
impl IncrementalSnapshotConfig {
pub fn builder() -> IncrementalSnapshotConfigBuilder {
IncrementalSnapshotConfigBuilder::default()
}
}
#[derive(Default)]
pub struct IncrementalSnapshotConfigBuilder {
config: IncrementalSnapshotConfig,
}
impl IncrementalSnapshotConfigBuilder {
pub fn chunk_size(mut self, size: usize) -> Self {
self.config.chunk_size = size.max(1);
self
}
pub fn watermark_strategy(mut self, strategy: WatermarkStrategy) -> Self {
self.config.watermark_strategy = strategy;
self
}
pub fn max_buffer_memory(mut self, bytes: usize) -> Self {
self.config.max_buffer_memory = bytes;
self
}
pub fn chunk_timeout(mut self, timeout: Duration) -> Self {
self.config.chunk_timeout = timeout;
self
}
pub fn inter_chunk_delay(mut self, delay: Duration) -> Self {
self.config.inter_chunk_delay = Some(delay);
self
}
pub fn max_concurrent_chunks(mut self, count: usize) -> Self {
self.config.max_concurrent_chunks = count.max(1);
self
}
pub fn allow_surrogate_key(mut self, allow: bool) -> Self {
self.config.allow_surrogate_key = allow;
self
}
pub fn signal_table(mut self, table: impl Into<String>) -> Self {
self.config.signal_table = table.into();
self
}
pub fn build(self) -> IncrementalSnapshotConfig {
self.config
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum IncrementalSnapshotState {
#[default]
Pending,
Running,
Paused,
Completed,
Stopped,
Failed,
}
impl IncrementalSnapshotState {
pub fn is_active(&self) -> bool {
matches!(self, Self::Running | Self::Paused)
}
pub fn can_resume(&self) -> bool {
matches!(self, Self::Paused)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotTable {
pub table_name: String,
pub primary_key: Vec<String>,
pub surrogate_key: Option<String>,
pub state: IncrementalSnapshotState,
pub max_key: Option<String>,
pub last_key: Option<String>,
pub total_chunks: Option<u64>,
pub completed_chunks: u64,
pub rows_processed: u64,
pub additional_conditions: Option<String>,
}
impl SnapshotTable {
pub fn new(table_name: impl Into<String>, primary_key: Vec<String>) -> Self {
Self {
table_name: table_name.into(),
primary_key,
surrogate_key: None,
state: IncrementalSnapshotState::Pending,
max_key: None,
last_key: None,
total_chunks: None,
completed_chunks: 0,
rows_processed: 0,
additional_conditions: None,
}
}
pub fn with_surrogate_key(mut self, key: impl Into<String>) -> Self {
self.surrogate_key = Some(key.into());
self
}
pub fn with_conditions(mut self, conditions: impl Into<String>) -> Self {
self.additional_conditions = Some(conditions.into());
self
}
pub fn chunk_key(&self) -> &str {
self.surrogate_key
.as_deref()
.unwrap_or_else(|| self.primary_key.first().map(|s| s.as_str()).unwrap_or("id"))
}
pub fn progress_percentage(&self) -> Option<f64> {
self.total_chunks.map(|total| {
if total == 0 {
100.0
} else {
(self.completed_chunks as f64 / total as f64) * 100.0
}
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotChunk {
pub chunk_id: String,
pub table_name: String,
pub sequence: u64,
pub from_key: Option<String>,
pub to_key: Option<String>,
pub key_column: String,
pub conditions: Option<String>,
pub state: ChunkState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ChunkState {
#[default]
Pending,
WindowOpen,
Buffered,
Complete,
Failed,
}
impl SnapshotChunk {
pub fn new(
table_name: impl Into<String>,
sequence: u64,
key_column: impl Into<String>,
) -> Self {
Self {
chunk_id: format!("chunk-{}-{}", Uuid::new_v4(), sequence),
table_name: table_name.into(),
sequence,
from_key: None,
to_key: None,
key_column: key_column.into(),
conditions: None,
state: ChunkState::Pending,
}
}
pub fn with_range(mut self, from: Option<String>, to: Option<String>) -> Self {
self.from_key = from;
self.to_key = to;
self
}
pub fn with_conditions(mut self, conditions: impl Into<String>) -> Self {
self.conditions = Some(conditions.into());
self
}
pub fn to_sql(&self, chunk_size: usize) -> String {
let mut sql = format!("SELECT * FROM {} WHERE ", self.table_name);
if let Some(ref from) = self.from_key {
sql.push_str(&format!("{} > '{}' AND ", self.key_column, from));
}
if let Some(ref to) = self.to_key {
sql.push_str(&format!("{} <= '{}' ", self.key_column, to));
} else {
sql.push_str("1=1 ");
}
if let Some(ref conditions) = self.conditions {
sql.push_str(&format!("AND ({}) ", conditions));
}
sql.push_str(&format!(
"ORDER BY {} LIMIT {}",
self.key_column, chunk_size
));
sql
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatermarkSignal {
pub id: String,
pub signal_type: WatermarkType,
pub chunk_id: String,
pub table_name: String,
pub timestamp: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum WatermarkType {
SnapshotWindowOpen,
SnapshotWindowClose,
}
impl WatermarkType {
pub fn as_str(&self) -> &str {
match self {
WatermarkType::SnapshotWindowOpen => "snapshot-window-open",
WatermarkType::SnapshotWindowClose => "snapshot-window-close",
}
}
}
impl WatermarkSignal {
pub fn open(chunk: &SnapshotChunk) -> Self {
Self {
id: format!("{}-open", chunk.chunk_id),
signal_type: WatermarkType::SnapshotWindowOpen,
chunk_id: chunk.chunk_id.clone(),
table_name: chunk.table_name.clone(),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn close(chunk: &SnapshotChunk) -> Self {
Self {
id: format!("{}-close", chunk.chunk_id),
signal_type: WatermarkType::SnapshotWindowClose,
chunk_id: chunk.chunk_id.clone(),
table_name: chunk.table_name.clone(),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn to_insert_sql(&self, signal_table: &str) -> String {
let data = serde_json::json!({
"chunk_id": self.chunk_id,
"table": self.table_name,
});
format!(
"INSERT INTO {} (id, type, data) VALUES ('{}', '{}', '{}')",
signal_table,
self.id,
self.signal_type.as_str(),
data
)
}
pub fn to_delete_sql(&self, signal_table: &str) -> String {
format!("DELETE FROM {} WHERE id = '{}'", signal_table, self.id)
}
}
#[derive(Debug)]
pub struct ChunkBuffer {
events: HashMap<String, CdcEvent>,
#[allow(dead_code)]
key_column: String,
table_name: String,
chunk_id: String,
window_opened: Option<Instant>,
is_open: bool,
dropped_count: u64,
}
impl ChunkBuffer {
pub fn new(chunk: &SnapshotChunk) -> Self {
Self {
events: HashMap::new(),
key_column: chunk.key_column.clone(),
table_name: chunk.table_name.clone(),
chunk_id: chunk.chunk_id.clone(),
window_opened: None,
is_open: false,
dropped_count: 0,
}
}
pub fn open_window(&mut self) {
self.is_open = true;
self.window_opened = Some(Instant::now());
self.events.clear();
debug!(
chunk_id = %self.chunk_id,
table = %self.table_name,
"Snapshot window opened"
);
}
pub fn add_row(&mut self, event: CdcEvent, key: String) {
if self.is_open {
self.events.insert(key, event);
}
}
pub fn check_conflict(&mut self, key: &str) -> bool {
if !self.is_open {
return false;
}
if self.events.remove(key).is_some() {
self.dropped_count += 1;
debug!(
key = %key,
chunk_id = %self.chunk_id,
"Dropped buffered snapshot event due to streaming conflict"
);
true
} else {
false
}
}
pub fn close_window(&mut self) -> Vec<CdcEvent> {
self.is_open = false;
let duration = self.window_opened.map(|t| t.elapsed());
let events: Vec<_> = self.events.drain().map(|(_, e)| e).collect();
debug!(
chunk_id = %self.chunk_id,
table = %self.table_name,
emitted = events.len(),
dropped = self.dropped_count,
duration_ms = ?duration.map(|d| d.as_millis()),
"Snapshot window closed"
);
events
}
pub fn is_window_open(&self) -> bool {
self.is_open
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn dropped_count(&self) -> u64 {
self.dropped_count
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalSnapshotContext {
pub snapshot_id: String,
pub tables: VecDeque<SnapshotTable>,
pub current_chunk: Option<SnapshotChunk>,
pub started_at: i64,
pub last_activity: i64,
}
impl IncrementalSnapshotContext {
pub fn new(tables: Vec<SnapshotTable>) -> Self {
let now = chrono::Utc::now().timestamp_millis();
Self {
snapshot_id: format!("incr-snap-{}", Uuid::new_v4()),
tables: tables.into_iter().collect(),
current_chunk: None,
started_at: now,
last_activity: now,
}
}
pub fn current_table(&self) -> Option<&SnapshotTable> {
self.tables.front()
}
pub fn current_table_mut(&mut self) -> Option<&mut SnapshotTable> {
self.tables.front_mut()
}
pub fn next_table(&mut self) -> Option<SnapshotTable> {
self.tables.pop_front()
}
pub fn is_complete(&self) -> bool {
self.tables.is_empty()
}
pub fn progress_percentage(&self) -> f64 {
if self.tables.is_empty() {
return 100.0;
}
let completed: u64 = self.tables.iter().map(|t| t.completed_chunks).sum();
let total: u64 = self
.tables
.iter()
.filter_map(|t| t.total_chunks)
.sum::<u64>()
.max(1);
(completed as f64 / total as f64) * 100.0
}
}
#[derive(Debug, Default)]
pub struct IncrementalSnapshotStats {
snapshots_started: AtomicU64,
snapshots_completed: AtomicU64,
snapshots_failed: AtomicU64,
chunks_processed: AtomicU64,
rows_snapshotted: AtomicU64,
events_dropped: AtomicU64,
window_time_ms: AtomicU64,
active_snapshots: AtomicU64,
}
impl IncrementalSnapshotStats {
pub fn record_started(&self) {
self.snapshots_started.fetch_add(1, Ordering::Relaxed);
self.active_snapshots.fetch_add(1, Ordering::Relaxed);
}
pub fn record_completed(&self) {
self.snapshots_completed.fetch_add(1, Ordering::Relaxed);
self.active_snapshots.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_failed(&self) {
self.snapshots_failed.fetch_add(1, Ordering::Relaxed);
self.active_snapshots.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_chunk(&self, rows: u64, dropped: u64, window_time_ms: u64) {
self.chunks_processed.fetch_add(1, Ordering::Relaxed);
self.rows_snapshotted.fetch_add(rows, Ordering::Relaxed);
self.events_dropped.fetch_add(dropped, Ordering::Relaxed);
self.window_time_ms
.fetch_add(window_time_ms, Ordering::Relaxed);
}
pub fn snapshot(&self) -> IncrementalSnapshotStatsSnapshot {
IncrementalSnapshotStatsSnapshot {
snapshots_started: self.snapshots_started.load(Ordering::Relaxed),
snapshots_completed: self.snapshots_completed.load(Ordering::Relaxed),
snapshots_failed: self.snapshots_failed.load(Ordering::Relaxed),
chunks_processed: self.chunks_processed.load(Ordering::Relaxed),
rows_snapshotted: self.rows_snapshotted.load(Ordering::Relaxed),
events_dropped: self.events_dropped.load(Ordering::Relaxed),
window_time_ms: self.window_time_ms.load(Ordering::Relaxed),
active_snapshots: self.active_snapshots.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalSnapshotStatsSnapshot {
pub snapshots_started: u64,
pub snapshots_completed: u64,
pub snapshots_failed: u64,
pub chunks_processed: u64,
pub rows_snapshotted: u64,
pub events_dropped: u64,
pub window_time_ms: u64,
pub active_snapshots: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalSnapshotRequest {
pub data_collections: Vec<String>,
pub snapshot_type: String,
#[serde(default)]
pub additional_conditions: Vec<AdditionalCondition>,
#[serde(default)]
pub surrogate_keys: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdditionalCondition {
#[serde(rename = "data-collection")]
pub data_collection: String,
pub filter: String,
}
impl IncrementalSnapshotRequest {
pub fn new(tables: Vec<String>) -> Self {
Self {
data_collections: tables,
snapshot_type: "incremental".to_string(),
additional_conditions: Vec::new(),
surrogate_keys: HashMap::new(),
}
}
pub fn with_condition(mut self, table: impl Into<String>, filter: impl Into<String>) -> Self {
self.additional_conditions.push(AdditionalCondition {
data_collection: table.into(),
filter: filter.into(),
});
self
}
pub fn with_surrogate_key(mut self, table: impl Into<String>, key: impl Into<String>) -> Self {
self.surrogate_keys.insert(table.into(), key.into());
self
}
pub fn get_condition(&self, table: &str) -> Option<&str> {
self.additional_conditions
.iter()
.find(|c| c.data_collection == table)
.map(|c| c.filter.as_str())
}
pub fn get_surrogate_key(&self, table: &str) -> Option<&str> {
self.surrogate_keys.get(table).map(|s| s.as_str())
}
}
pub struct IncrementalSnapshotCoordinator {
config: IncrementalSnapshotConfig,
context: Arc<RwLock<Option<IncrementalSnapshotContext>>>,
buffer: Arc<RwLock<Option<ChunkBuffer>>>,
active: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
stats: Arc<IncrementalSnapshotStats>,
}
impl IncrementalSnapshotCoordinator {
pub fn new(config: IncrementalSnapshotConfig) -> Self {
Self {
config,
context: Arc::new(RwLock::new(None)),
buffer: Arc::new(RwLock::new(None)),
active: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
stats: Arc::new(IncrementalSnapshotStats::default()),
}
}
pub async fn start(&self, request: IncrementalSnapshotRequest) -> Result<String> {
if self.active.load(Ordering::Acquire) {
return Err(CdcError::InvalidState(
"Incremental snapshot already in progress".to_string(),
));
}
let tables: Vec<SnapshotTable> = request
.data_collections
.iter()
.map(|table| {
let mut st = SnapshotTable::new(table.clone(), vec!["id".to_string()]);
if let Some(key) = request.get_surrogate_key(table) {
st = st.with_surrogate_key(key);
}
if let Some(cond) = request.get_condition(table) {
st = st.with_conditions(cond);
}
st
})
.collect();
let context = IncrementalSnapshotContext::new(tables);
let snapshot_id = context.snapshot_id.clone();
*self.context.write().await = Some(context);
self.active.store(true, Ordering::Release);
self.paused.store(false, Ordering::Release);
self.stats.record_started();
info!(
snapshot_id = %snapshot_id,
tables = ?request.data_collections,
"Incremental snapshot started"
);
Ok(snapshot_id)
}
pub async fn stop(&self) -> Result<()> {
if !self.active.load(Ordering::Acquire) {
return Ok(());
}
self.active.store(false, Ordering::Release);
self.paused.store(false, Ordering::Release);
if let Some(mut buffer) = self.buffer.write().await.take() {
if buffer.is_window_open() {
let _ = buffer.close_window();
}
}
if let Some(context) = self.context.write().await.take() {
info!(
snapshot_id = %context.snapshot_id,
progress = context.progress_percentage(),
"Incremental snapshot stopped"
);
}
Ok(())
}
pub async fn pause(&self) -> Result<()> {
if !self.active.load(Ordering::Acquire) {
return Err(CdcError::InvalidState(
"No incremental snapshot in progress".to_string(),
));
}
self.paused.store(true, Ordering::Release);
if let Some(ref mut context) = *self.context.write().await {
if let Some(ref mut table) = context.current_table_mut() {
table.state = IncrementalSnapshotState::Paused;
}
info!(
snapshot_id = %context.snapshot_id,
"Incremental snapshot paused"
);
}
Ok(())
}
pub async fn resume(&self) -> Result<()> {
if !self.active.load(Ordering::Acquire) {
return Err(CdcError::InvalidState(
"No incremental snapshot in progress".to_string(),
));
}
self.paused.store(false, Ordering::Release);
if let Some(ref mut context) = *self.context.write().await {
if let Some(ref mut table) = context.current_table_mut() {
table.state = IncrementalSnapshotState::Running;
}
info!(
snapshot_id = %context.snapshot_id,
"Incremental snapshot resumed"
);
}
Ok(())
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Acquire)
}
pub async fn next_chunk(&self) -> Result<Option<SnapshotChunk>> {
if !self.active.load(Ordering::Acquire) || self.paused.load(Ordering::Acquire) {
return Ok(None);
}
let mut context_guard = self.context.write().await;
let context = match context_guard.as_mut() {
Some(ctx) => ctx,
None => return Ok(None),
};
let (table_name, completed_chunks, chunk_key, last_key, max_key, conditions) = {
let table = match context.current_table() {
Some(t) => t,
None => return Ok(None),
};
(
table.table_name.clone(),
table.completed_chunks,
table.chunk_key().to_string(),
table.last_key.clone(),
table.max_key.clone(),
table.additional_conditions.clone(),
)
};
let chunk = SnapshotChunk::new(table_name, completed_chunks, chunk_key)
.with_range(last_key, max_key);
let chunk = if let Some(cond) = conditions {
chunk.with_conditions(cond)
} else {
chunk
};
context.current_chunk = Some(chunk.clone());
if let Some(table) = context.current_table_mut() {
table.state = IncrementalSnapshotState::Running;
}
Ok(Some(chunk))
}
pub fn open_window_signal(&self, chunk: &SnapshotChunk) -> WatermarkSignal {
WatermarkSignal::open(chunk)
}
pub fn close_window_signal(&self, chunk: &SnapshotChunk) -> WatermarkSignal {
WatermarkSignal::close(chunk)
}
pub async fn open_window(&self, chunk: &SnapshotChunk) {
let mut buffer = ChunkBuffer::new(chunk);
buffer.open_window();
*self.buffer.write().await = Some(buffer);
}
pub async fn buffer_row(&self, event: CdcEvent, key: String) {
if let Some(ref mut buffer) = *self.buffer.write().await {
buffer.add_row(event, key);
}
}
pub async fn check_streaming_conflict(&self, table: &str, key: &str) -> bool {
if let Some(ref mut buffer) = *self.buffer.write().await {
if buffer.table_name == table {
return buffer.check_conflict(key);
}
}
false
}
pub async fn close_window(&self) -> Result<Vec<CdcEvent>> {
let mut buffer_guard = self.buffer.write().await;
let buffer = match buffer_guard.as_mut() {
Some(b) => b,
None => return Ok(Vec::new()),
};
let window_start = buffer.window_opened;
let dropped = buffer.dropped_count();
let events = buffer.close_window();
let rows = events.len() as u64;
let window_time_ms = window_start
.map(|t| t.elapsed().as_millis() as u64)
.unwrap_or(0);
self.stats.record_chunk(rows, dropped, window_time_ms);
if let Some(ref mut context) = *self.context.write().await {
if let Some(ref mut table) = context.current_table_mut() {
table.completed_chunks += 1;
table.rows_processed += rows;
}
context.current_chunk = None;
context.last_activity = chrono::Utc::now().timestamp_millis();
}
Ok(events)
}
pub async fn complete_current_table(&self) -> Result<Option<String>> {
let mut context_guard = self.context.write().await;
let context = match context_guard.as_mut() {
Some(ctx) => ctx,
None => return Ok(None),
};
if let Some(completed) = context.next_table() {
info!(
table = %completed.table_name,
rows = completed.rows_processed,
chunks = completed.completed_chunks,
"Table incremental snapshot completed"
);
}
if context.is_complete() {
drop(context_guard);
self.active.store(false, Ordering::Release);
self.stats.record_completed();
if let Some(ctx) = self.context.read().await.as_ref() {
info!(
snapshot_id = %ctx.snapshot_id,
"Incremental snapshot completed"
);
}
return Ok(None);
}
Ok(context.current_table().map(|t| t.table_name.clone()))
}
pub async fn get_context(&self) -> Option<IncrementalSnapshotContext> {
self.context.read().await.clone()
}
pub async fn restore(&self, context: IncrementalSnapshotContext) -> Result<()> {
if self.active.load(Ordering::Acquire) {
return Err(CdcError::InvalidState(
"Cannot restore: snapshot already in progress".to_string(),
));
}
let snapshot_id = context.snapshot_id.clone();
*self.context.write().await = Some(context);
self.active.store(true, Ordering::Release);
self.paused.store(false, Ordering::Release);
self.stats.record_started();
info!(
snapshot_id = %snapshot_id,
"Incremental snapshot restored"
);
Ok(())
}
pub fn config(&self) -> &IncrementalSnapshotConfig {
&self.config
}
pub fn stats(&self) -> IncrementalSnapshotStatsSnapshot {
self.stats.snapshot()
}
pub async fn is_window_open(&self) -> bool {
if let Some(ref buffer) = *self.buffer.read().await {
return buffer.is_window_open();
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_event() -> CdcEvent {
CdcEvent::insert(
"postgres",
"testdb",
"public",
"orders",
serde_json::json!({}),
0,
)
}
#[test]
fn test_watermark_strategy() {
assert!(!WatermarkStrategy::InsertInsert.requires_delete());
assert!(WatermarkStrategy::InsertDelete.requires_delete());
}
#[test]
fn test_config_builder() {
let config = IncrementalSnapshotConfig::builder()
.chunk_size(2048)
.watermark_strategy(WatermarkStrategy::InsertDelete)
.max_buffer_memory(128 * 1024 * 1024)
.build();
assert_eq!(config.chunk_size, 2048);
assert_eq!(config.watermark_strategy, WatermarkStrategy::InsertDelete);
assert_eq!(config.max_buffer_memory, 128 * 1024 * 1024);
}
#[test]
fn test_snapshot_table() {
let table = SnapshotTable::new("public.orders", vec!["id".to_string()])
.with_surrogate_key("order_id")
.with_conditions("status = 'active'");
assert_eq!(table.chunk_key(), "order_id");
assert_eq!(
table.additional_conditions,
Some("status = 'active'".to_string())
);
}
#[test]
fn test_snapshot_chunk_sql() {
let chunk = SnapshotChunk::new("public.orders", 0, "id")
.with_range(Some("100".to_string()), Some("200".to_string()))
.with_conditions("status = 'active'");
let sql = chunk.to_sql(1024);
assert!(sql.contains("public.orders"));
assert!(sql.contains("id > '100'"));
assert!(sql.contains("id <= '200'"));
assert!(sql.contains("status = 'active'"));
assert!(sql.contains("LIMIT 1024"));
}
#[test]
fn test_watermark_signal() {
let chunk = SnapshotChunk::new("public.orders", 0, "id");
let open = WatermarkSignal::open(&chunk);
let close = WatermarkSignal::close(&chunk);
assert_eq!(open.signal_type, WatermarkType::SnapshotWindowOpen);
assert_eq!(close.signal_type, WatermarkType::SnapshotWindowClose);
assert!(open.id.ends_with("-open"));
assert!(close.id.ends_with("-close"));
}
#[test]
fn test_chunk_buffer_deduplication() {
let chunk = SnapshotChunk::new("public.orders", 0, "id");
let mut buffer = ChunkBuffer::new(&chunk);
buffer.add_row(test_event(), "key1".to_string());
assert!(buffer.is_empty());
buffer.open_window();
assert!(buffer.is_window_open());
buffer.add_row(test_event(), "key1".to_string());
buffer.add_row(test_event(), "key2".to_string());
buffer.add_row(test_event(), "key3".to_string());
assert_eq!(buffer.len(), 3);
assert!(buffer.check_conflict("key2"));
assert!(!buffer.check_conflict("key4")); assert_eq!(buffer.len(), 2);
assert_eq!(buffer.dropped_count(), 1);
let events = buffer.close_window();
assert_eq!(events.len(), 2);
assert!(!buffer.is_window_open());
}
#[test]
fn test_incremental_snapshot_context() {
let tables = vec![
SnapshotTable::new("public.orders", vec!["id".to_string()]),
SnapshotTable::new("public.customers", vec!["id".to_string()]),
];
let mut context = IncrementalSnapshotContext::new(tables);
assert_eq!(
context.current_table().map(|t| &t.table_name),
Some(&"public.orders".to_string())
);
assert!(!context.is_complete());
context.next_table();
assert_eq!(
context.current_table().map(|t| &t.table_name),
Some(&"public.customers".to_string())
);
context.next_table();
assert!(context.is_complete());
}
#[test]
fn test_snapshot_request() {
let request = IncrementalSnapshotRequest::new(vec![
"public.orders".to_string(),
"public.customers".to_string(),
])
.with_condition("public.orders", "status = 'active'")
.with_surrogate_key("public.orders", "order_id");
assert_eq!(
request.get_condition("public.orders"),
Some("status = 'active'")
);
assert_eq!(request.get_surrogate_key("public.orders"), Some("order_id"));
assert!(request.get_condition("public.customers").is_none());
}
#[tokio::test]
async fn test_coordinator_lifecycle() {
let config = IncrementalSnapshotConfig::builder().chunk_size(100).build();
let coordinator = IncrementalSnapshotCoordinator::new(config);
let request = IncrementalSnapshotRequest::new(vec!["public.orders".to_string()]);
let snapshot_id = coordinator.start(request).await.unwrap();
assert!(!snapshot_id.is_empty());
assert!(coordinator.is_active());
let request2 = IncrementalSnapshotRequest::new(vec!["public.customers".to_string()]);
assert!(coordinator.start(request2).await.is_err());
coordinator.pause().await.unwrap();
assert!(coordinator.is_paused());
coordinator.resume().await.unwrap();
assert!(!coordinator.is_paused());
coordinator.stop().await.unwrap();
assert!(!coordinator.is_active());
}
#[tokio::test]
async fn test_coordinator_chunking() {
let config = IncrementalSnapshotConfig::builder().chunk_size(100).build();
let coordinator = IncrementalSnapshotCoordinator::new(config);
let request = IncrementalSnapshotRequest::new(vec!["public.orders".to_string()]);
coordinator.start(request).await.unwrap();
let chunk = coordinator.next_chunk().await.unwrap();
assert!(chunk.is_some());
let chunk = chunk.unwrap();
assert_eq!(chunk.table_name, "public.orders");
assert_eq!(chunk.sequence, 0);
coordinator.open_window(&chunk).await;
assert!(coordinator.is_window_open().await);
coordinator.buffer_row(test_event(), "1".to_string()).await;
coordinator.buffer_row(test_event(), "2".to_string()).await;
assert!(
coordinator
.check_streaming_conflict("public.orders", "1")
.await
);
assert!(
!coordinator
.check_streaming_conflict("public.orders", "3")
.await
);
let events = coordinator.close_window().await.unwrap();
assert_eq!(events.len(), 1);
let stats = coordinator.stats();
assert_eq!(stats.chunks_processed, 1);
assert_eq!(stats.rows_snapshotted, 1);
assert_eq!(stats.events_dropped, 1);
}
#[test]
fn test_snapshot_state() {
assert!(IncrementalSnapshotState::Running.is_active());
assert!(IncrementalSnapshotState::Paused.is_active());
assert!(!IncrementalSnapshotState::Completed.is_active());
assert!(IncrementalSnapshotState::Paused.can_resume());
assert!(!IncrementalSnapshotState::Running.can_resume());
}
}