#![cfg(feature = "sqlserver")]
use std::time::{Duration, Instant};
use rustcdc::{
checkpoint::FileCheckpoint, schema_history::InMemorySchemaHistory, CdcRuntime, RuntimeConfig,
RuntimeSourceConfig,
};
#[path = "sqlserver_testkit.rs"]
mod sqlserver_testkit;
#[path = "latency_evidence_common.rs"]
mod latency_evidence_common;
use latency_evidence_common::{percentile, write_latency_artifacts, LatencySummary};
async fn sql_exec(client: &mut sqlserver_testkit::SqlClient, sql: &str) -> rustcdc::Result<()> {
client
.execute(sql, &[])
.await
.map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
Ok(())
}
#[tokio::test]
async fn sqlserver_connector_latency_evidence_stream_commit_percentiles() -> rustcdc::Result<()> {
if sqlserver_testkit::skip_docker_test("sqlserver latency evidence test") {
return Ok(());
}
let container = sqlserver_testkit::start_sqlserver_container("2019-latest").await?;
let (host, port) = sqlserver_testkit::host_and_port(&container).await?;
let mut admin =
sqlserver_testkit::connect_admin_with_retry(&host, port, 40, Duration::from_secs(2))
.await?;
sql_exec(
&mut admin,
"IF DB_ID('rustcdc_latency') IS NULL CREATE DATABASE rustcdc_latency",
)
.await?;
sql_exec(
&mut admin,
"USE rustcdc_latency; IF OBJECT_ID('dbo.latency_users', 'U') IS NULL CREATE TABLE dbo.latency_users (id INT NOT NULL PRIMARY KEY, payload NVARCHAR(255) NOT NULL)",
)
.await?;
sql_exec(
&mut admin,
"USE rustcdc_latency; DELETE FROM dbo.latency_users",
)
.await?;
sqlserver_testkit::enable_cdc(&host, port, "rustcdc_latency").await?;
sql_exec(
&mut admin,
"USE rustcdc_latency; IF NOT EXISTS (SELECT 1 FROM cdc.change_tables WHERE source_object_id = OBJECT_ID('dbo.latency_users')) EXEC sys.sp_cdc_enable_table @source_schema='dbo', @source_name='latency_users', @role_name=NULL, @supports_net_changes=0",
)
.await?;
tokio::time::sleep(Duration::from_secs(3)).await;
let source_cfg = sqlserver_testkit::source_config(host, port, "rustcdc_latency".into(), 30);
let checkpoint_dir = tempfile::tempdir().map_err(rustcdc::Error::IoError)?;
let mut runtime = CdcRuntime::new(
RuntimeConfig::new(
RuntimeSourceConfig::SqlServer(source_cfg),
FileCheckpoint::new(checkpoint_dir.path()),
InMemorySchemaHistory::default(),
)
.with_max_buffer_size(128)
.with_max_poll_wait_ms(100),
)?;
runtime.start().await?;
let rows_inserted = 4_096_u64;
for id in 1_u64..=rows_inserted {
let sql = format!(
"USE rustcdc_latency; INSERT INTO dbo.latency_users (id, payload) VALUES ({}, 'payload-{}')",
id, id
);
sql_exec(&mut admin, &sql).await?;
}
let mut poll_latencies_ms = Vec::new();
let mut commit_latencies_ms = Vec::new();
let mut batch_sizes = Vec::new();
let mut events_committed = 0_u64;
let started = Instant::now();
let cdc_scan_sql = "USE rustcdc_latency; EXEC sys.sp_cdc_scan";
let deadline = Instant::now() + Duration::from_secs(180);
while events_committed < rows_inserted {
if Instant::now() > deadline {
return Err(rustcdc::Error::TimeoutError(format!(
"timed out while collecting sqlserver latency evidence (committed={events_committed}, expected={rows_inserted})"
)));
}
let poll_start = Instant::now();
let batch = runtime.poll_event_batch().await?;
let poll_ms = poll_start.elapsed().as_secs_f64() * 1000.0;
if batch.is_empty() {
let _ = sql_exec(&mut admin, cdc_scan_sql).await;
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
}
let batch_len = batch.len();
batch_sizes.push(batch_len as f64);
poll_latencies_ms.push(poll_ms);
let commit_start = Instant::now();
runtime.commit_ack(batch.ack_mode()).await?;
let commit_ms = commit_start.elapsed().as_secs_f64() * 1000.0;
commit_latencies_ms.push(commit_ms);
events_committed = events_committed.saturating_add(batch_len as u64);
}
let summary = LatencySummary {
profile: "sqlserver_stream_commit",
rows_inserted,
events_committed,
batches: poll_latencies_ms.len(),
poll_latency_ms_p50: percentile(&poll_latencies_ms, 50.0),
poll_latency_ms_p95: percentile(&poll_latencies_ms, 95.0),
poll_latency_ms_p99: percentile(&poll_latencies_ms, 99.0),
commit_latency_ms_p50: percentile(&commit_latencies_ms, 50.0),
commit_latency_ms_p95: percentile(&commit_latencies_ms, 95.0),
commit_latency_ms_p99: percentile(&commit_latencies_ms, 99.0),
batch_size_p50: percentile(&batch_sizes, 50.0),
batch_size_p95: percentile(&batch_sizes, 95.0),
batch_size_p99: percentile(&batch_sizes, 99.0),
end_to_end_ms: started.elapsed().as_millis(),
};
assert!(summary.events_committed >= rows_inserted);
assert!(
summary.batches >= 16,
"expected sustained multi-batch evidence"
);
write_latency_artifacts("sqlserver", &summary)?;
println!(
"sqlserver latency evidence recorded: batches={} poll_p95_ms={:.3} commit_p95_ms={:.3}",
summary.batches, summary.poll_latency_ms_p95, summary.commit_latency_ms_p95
);
Ok(())
}