use std::time::Duration;
use nodedb_types::{DatabaseId, Lsn, MirrorMode, MirrorOrigin, MirrorStatus};
use tempfile::TempDir;
use nodedb::control::server::pgwire::ddl::database::{
MirrorReadOutcome, check_mirror_read_consistency,
};
use nodedb::types::ReadConsistency;
use super::helpers::{TEST_SOURCE_CLUSTER, inject_lag_record_for_id, now_ms, open_tmp_catalog};
#[test]
fn mirror_simulated_cross_region_latency() {
let dir = TempDir::new().unwrap();
let catalog = open_tmp_catalog(&dir);
let db_id = DatabaseId::new(5001);
let simulated_lag_ms = 300u64;
inject_lag_record_for_id(&catalog, db_id, simulated_lag_ms, 1);
let origin = MirrorOrigin {
source_cluster: TEST_SOURCE_CLUSTER.to_string(),
source_database: DatabaseId::new(0),
mode: MirrorMode::Async,
last_applied: Lsn::new(1),
status: MirrorStatus::Following,
};
match check_mirror_read_consistency(
&catalog,
db_id,
&origin,
ReadConsistency::BoundedStaleness(Duration::from_millis(1_000)),
) {
MirrorReadOutcome::ServeLocally => {}
MirrorReadOutcome::Reject { message, .. } => {
panic!("BoundedStaleness(1000ms) must pass with 300ms lag: {message}");
}
}
match check_mirror_read_consistency(
&catalog,
db_id,
&origin,
ReadConsistency::BoundedStaleness(Duration::from_millis(100)),
) {
MirrorReadOutcome::Reject { sqlstate_code, .. } => {
assert_eq!(
sqlstate_code,
nodedb_types::error::sqlstate::STALE_READ_NOT_LEADER
);
}
MirrorReadOutcome::ServeLocally => {
panic!("BoundedStaleness(100ms) must reject with 300ms lag");
}
}
use nodedb::control::mirror::{LAG_DEGRADED_MS, LagTransition, compute_lag_transition};
let t = compute_lag_transition(&MirrorStatus::Following, simulated_lag_ms, now_ms(), false);
assert_eq!(
t,
LagTransition::Unchanged,
"300ms lag must not trigger Degraded (threshold is {}ms)",
LAG_DEGRADED_MS
);
}
#[test]
fn mirror_simulated_latency_at_threshold_boundary() {
let dir = TempDir::new().unwrap();
let catalog = open_tmp_catalog(&dir);
let db_id = DatabaseId::new(5002);
inject_lag_record_for_id(&catalog, db_id, 199, 1);
let origin = MirrorOrigin {
source_cluster: TEST_SOURCE_CLUSTER.to_string(),
source_database: DatabaseId::new(0),
mode: MirrorMode::Async,
last_applied: Lsn::new(1),
status: MirrorStatus::Following,
};
match check_mirror_read_consistency(
&catalog,
db_id,
&origin,
ReadConsistency::BoundedStaleness(Duration::from_millis(200)),
) {
MirrorReadOutcome::ServeLocally => {}
MirrorReadOutcome::Reject { message, .. } => {
panic!("199ms lag should pass 200ms bound: {message}");
}
}
}