mod config_tests {
use super::super::config::ObserverManagementConfig;
#[test]
fn test_default_config() {
let config = ObserverManagementConfig::default();
assert!(config.enabled);
assert_eq!(config.base_path, "/api/observers");
assert_eq!(config.max_page_size, 100);
assert!(!config.log_payloads);
assert_eq!(config.log_retention_days, 30);
assert!(config.require_auth);
}
}
mod repository_tests {
#![allow(clippy::unwrap_used)]
use super::super::RetryConfig;
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 3);
assert_eq!(config.backoff, "exponential");
assert_eq!(config.initial_delay_ms, 1000);
assert_eq!(config.max_delay_ms, 60000);
}
#[test]
fn test_list_entity_type_not_inlined() {
let malicious = "' OR '1'='1";
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) AS count FROM tb_observer WHERE 1=1");
qb.push(" AND entity_type = ").push_bind(malicious);
let sql = qb.sql();
assert!(!sql.contains(malicious), "user input must not appear in SQL string");
assert!(sql.contains("$1"), "placeholder must be present");
}
#[test]
fn test_list_event_type_not_inlined() {
let malicious = "'; DROP TABLE tb_observer; --";
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) AS count FROM tb_observer WHERE 1=1");
qb.push(" AND event_type = ").push_bind(malicious);
let sql = qb.sql();
assert!(!sql.contains(malicious));
assert!(sql.contains("$1"));
}
#[test]
fn test_list_logs_status_not_inlined() {
let malicious = "' UNION SELECT * FROM secrets --";
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) FROM tb_observer_log WHERE 1=1");
qb.push(" AND status = ").push_bind(malicious);
let sql = qb.sql();
assert!(!sql.contains(malicious));
assert!(sql.contains("$1"));
}
#[test]
fn test_list_logs_trace_id_not_inlined() {
let malicious = "x' OR fk_customer_org IS NOT NULL--";
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) FROM tb_observer_log WHERE 1=1");
qb.push(" AND trace_id = ").push_bind(malicious);
let sql = qb.sql();
assert!(!sql.contains(malicious));
assert!(sql.contains("$1"));
}
#[test]
fn test_list_no_filters_produces_minimal_sql() {
let qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) AS count FROM tb_observer WHERE 1=1");
let sql = qb.sql();
assert!(!sql.contains("entity_type"));
assert!(!sql.contains("event_type"));
assert!(!sql.contains("enabled"));
assert!(!sql.contains("fk_customer_org"));
assert!(!sql.contains("deleted_at"));
}
#[test]
fn test_list_exclude_deleted_adds_condition() {
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) AS count FROM tb_observer WHERE 1=1");
qb.push(" AND deleted_at IS NULL");
let sql = qb.sql();
assert!(sql.contains("deleted_at IS NULL"));
}
#[test]
fn test_list_logs_observer_id_uses_placeholder() {
let observer_id = uuid::Uuid::new_v4();
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) FROM tb_observer_log WHERE 1=1");
qb.push(" AND fk_observer = (SELECT pk_observer FROM tb_observer WHERE id = ")
.push_bind(observer_id)
.push(")");
let sql = qb.sql();
assert!(!sql.contains(&observer_id.to_string()), "UUID must not be inlined in SQL");
assert!(sql.contains("$1"));
}
#[test]
fn test_multiple_filters_use_sequential_placeholders() {
let mut qb: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("SELECT COUNT(*) AS count FROM tb_observer WHERE 1=1");
qb.push(" AND entity_type = ").push_bind("Order");
qb.push(" AND event_type = ").push_bind("INSERT");
qb.push(" AND enabled = ").push_bind(true);
let sql = qb.sql();
assert!(sql.contains("$1"));
assert!(sql.contains("$2"));
assert!(sql.contains("$3"));
assert!(!sql.contains("Order"));
assert!(!sql.contains("INSERT"));
}
}
mod routes_tests {
#[test]
fn test_routes_compile() {
}
}
mod runtime_tests {
use super::super::runtime::RuntimeHealth;
#[test]
fn test_runtime_config_defaults() {
}
#[test]
fn test_runtime_health_default() {
let health = RuntimeHealth {
running: false,
observer_count: 0,
last_checkpoint: None,
events_processed: 0,
errors: 0,
};
assert!(!health.running);
assert_eq!(health.observer_count, 0);
}
}
mod runtime_index_atomicity_tests {
use std::{collections::HashMap, sync::Arc, thread, time::Duration};
use arc_swap::ArcSwap;
type EntityTypeIndex = Arc<ArcSwap<HashMap<(String, String), Vec<i64>>>>;
fn gen_a() -> HashMap<(String, String), Vec<i64>> {
let mut m = HashMap::new();
m.insert(("Order".to_string(), "INSERT".to_string()), vec![1, 2, 3]);
m.insert(("Order".to_string(), "UPDATE".to_string()), vec![4]);
m.insert(("User".to_string(), "INSERT".to_string()), vec![5, 6]);
m
}
fn gen_b() -> HashMap<(String, String), Vec<i64>> {
let mut m = HashMap::new();
m.insert(("Order".to_string(), "INSERT".to_string()), vec![10, 20, 30]);
m.insert(("Order".to_string(), "UPDATE".to_string()), vec![40]);
m.insert(("User".to_string(), "INSERT".to_string()), vec![50, 60]);
m
}
#[test]
fn entity_type_index_swap_is_snapshot_atomic() {
let index: EntityTypeIndex = Arc::new(ArcSwap::from_pointee(gen_a()));
let a = gen_a();
let b = gen_b();
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writer_index = Arc::clone(&index);
let writer_stop = Arc::clone(&stop);
let writer = thread::spawn(move || {
let mut flip = false;
while !writer_stop.load(std::sync::atomic::Ordering::Relaxed) {
let next = if flip { gen_b() } else { gen_a() };
writer_index.store(Arc::new(next));
flip = !flip;
}
});
let mut readers = Vec::new();
for _ in 0..8 {
let reader_index = Arc::clone(&index);
let expected_a = a.clone();
let expected_b = b.clone();
readers.push(thread::spawn(move || {
let keys: Vec<(String, String)> = expected_a.keys().cloned().collect();
for _ in 0..12_500 {
let snapshot = reader_index.load();
for key in &keys {
let observed = snapshot
.get(key)
.cloned()
.expect("key must be present in every generation");
let from_a = expected_a.get(key) == Some(&observed);
let from_b = expected_b.get(key) == Some(&observed);
assert!(
from_a || from_b,
"observed value {:?} for key {:?} is from neither Gen_A nor Gen_B",
observed,
key,
);
}
}
}));
}
for r in readers {
r.join().expect("reader thread panicked");
}
stop.store(true, std::sync::atomic::Ordering::Relaxed);
writer.join().expect("writer thread panicked");
}
#[test]
fn entity_type_index_swap_visibility_is_prompt() {
let index: EntityTypeIndex = Arc::new(ArcSwap::from_pointee(gen_a()));
{
let pre = index.load();
for (k, v) in &gen_a() {
assert_eq!(pre.get(k), Some(v), "pre-swap value mismatch for {:?}", k);
}
}
index.store(Arc::new(gen_b()));
let mut handles = Vec::new();
for _ in 0..4 {
let reader = Arc::clone(&index);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(1));
let snap = reader.load();
for (k, v) in &gen_b() {
assert_eq!(snap.get(k), Some(v), "post-swap value mismatch for {:?}", k);
}
}));
}
for h in handles {
h.join().expect("reader thread panicked");
}
}
}