use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tempfile::TempDir;
use nodedb::control::security::catalog::SystemCatalog;
use nodedb::control::security::catalog::database_types::{DatabaseDescriptor, DatabaseStatus};
pub use nodedb_test_support::pgwire_harness::TestServer;
use nodedb_types::{DatabaseId, Lsn, MirrorLagRecord, MirrorMode, MirrorOrigin, MirrorStatus};
pub const TEST_SOURCE_CLUSTER: &str = "prod-us-test";
pub fn open_tmp_catalog(dir: &TempDir) -> SystemCatalog {
let path: PathBuf = dir.path().join("system.redb");
SystemCatalog::open(&path).expect("open catalog")
}
pub fn make_mirror_descriptor(
id: u64,
name: &str,
status: MirrorStatus,
last_applied_lsn: u64,
) -> DatabaseDescriptor {
DatabaseDescriptor {
id: DatabaseId::new(id),
name: name.to_string(),
status: match &status {
MirrorStatus::Promoted => DatabaseStatus::Active,
MirrorStatus::Bootstrapping { .. }
| MirrorStatus::Following
| MirrorStatus::Degraded { .. }
| MirrorStatus::Disconnected => DatabaseStatus::Mirroring,
},
created_at_lsn: 0,
quota_ref: 0,
parent_clone: None,
mirror_origin: Some(MirrorOrigin {
source_cluster: TEST_SOURCE_CLUSTER.to_string(),
source_database: DatabaseId::new(0),
mode: MirrorMode::Async,
last_applied: Lsn::new(last_applied_lsn),
status,
}),
audit_dml: nodedb_types::AuditDmlMode::None,
idle_session_timeout_secs: 0,
}
}
pub fn inject_mirror(catalog: &SystemCatalog, db_id: DatabaseId, name: &str, status: MirrorStatus) {
let descriptor = make_mirror_descriptor(db_id.as_u64(), name, status, 0);
catalog
.put_database(&descriptor)
.expect("inject mirror descriptor");
}
pub fn inject_lag_record_for_id(
catalog: &SystemCatalog,
db_id: DatabaseId,
lag_offset_ms: u64,
lsn: u64,
) {
let record = MirrorLagRecord {
last_applied_lsn: Lsn::new(lsn),
last_apply_ms: now_ms().saturating_sub(lag_offset_ms),
};
catalog
.put_mirror_lag(db_id, &record)
.expect("inject lag record");
}
pub fn inject_mirror_descriptor(
server: &TestServer,
name: &str,
status: MirrorStatus,
last_applied_lsn: u64,
) {
let catalog = server
.shared
.credentials
.catalog()
.as_ref()
.expect("catalog unavailable");
let db_id = server.shared.database_registry.alloc_one();
let descriptor = make_mirror_descriptor(db_id.as_u64(), name, status, last_applied_lsn);
catalog
.put_database(&descriptor)
.expect("inject mirror descriptor");
}
pub fn inject_lag_record(server: &TestServer, db_name: &str, lag_offset_ms: u64, lsn: u64) {
let catalog = server
.shared
.credentials
.catalog()
.as_ref()
.expect("catalog unavailable");
let db_id = catalog
.get_database_id_by_name(db_name)
.expect("catalog lookup")
.expect("db not found");
inject_lag_record_for_id(catalog, db_id, lag_offset_ms, lsn);
}
pub async fn assert_sqlstate(client: &tokio_postgres::Client, query: &str, expected_state: &str) {
let err = client.simple_query(query).await.expect_err(&format!(
"expected error with SQLSTATE {expected_state} but query succeeded: {query}"
));
let pg_err = err
.as_db_error()
.unwrap_or_else(|| panic!("expected DB error but got: {err:?}"));
assert_eq!(
pg_err.code().code(),
expected_state,
"wrong SQLSTATE for query {query:?}; error: {pg_err:?}"
);
}
pub fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}