use oxirs_vec::{
fault::{ReplicaManager, ReplicaState, ShardReplica},
multi_tenancy::{
admission_controller::AdmissionController, priority_queue::SlaQueryDispatcher,
sla::SlaClass,
},
persistence::{restore::restore_to_timestamp, PointInTimeRestore},
vector_store::VectorStore,
wal::{WalConfig, WalEntry, WalManager},
IndexDispatcher, IndexDispatcherConfig, Vector,
};
use std::path::Path;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use std::thread;
use std::time::{Duration, Instant};
use tempfile::TempDir;
fn det_vec(seed: u64, dim: usize) -> Vector {
let mut state = seed.wrapping_mul(2654435769).wrapping_add(0x9E37_79B9);
let mut values = Vec::with_capacity(dim);
for _ in 0..dim {
state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
let f = (state as f32) / (u64::MAX as f32);
values.push((f - 0.5) * 2.0);
}
Vector::new(values)
}
fn write_wal_inserts(dir: &Path, inserts: &[(String, Vec<f32>, u64)]) -> anyhow::Result<()> {
let cfg = WalConfig {
wal_directory: dir.to_path_buf(),
checkpoint_interval: u64::MAX,
sync_on_write: true,
..WalConfig::default()
};
let mgr = WalManager::new(cfg)?;
for (id, v, ts) in inserts {
mgr.append(WalEntry::Insert {
id: id.clone(),
vector: v.clone(),
metadata: None,
timestamp: *ts,
})?;
}
mgr.flush()
}
fn max_rto_ms() -> u128 {
if cfg!(debug_assertions) {
90_000 } else {
30_000 }
}
fn max_rpo_secs() -> u64 {
if cfg!(debug_assertions) {
5
} else {
1
}
}
#[test]
fn multi_tenant_load_smoke() {
let admission = AdmissionController::new();
let mut dispatcher: SlaQueryDispatcher<u64> = SlaQueryDispatcher::new();
let classes = [
SlaClass::Bronze,
SlaClass::Silver,
SlaClass::Gold,
SlaClass::Platinum,
];
let tenants_per_class = 4;
let queries_per_tenant = 50;
let mut admitted = 0u64;
let mut rejected = 0u64;
let mut next_payload: u64 = 0;
for (class_idx, class) in classes.iter().enumerate() {
for tenant_idx in 0..tenants_per_class {
let tenant = format!("t_{}_{}", class_idx, tenant_idx);
admission.register_tenant(&tenant, *class);
for _ in 0..queries_per_tenant {
next_payload += 1;
match admission.try_admit(&tenant) {
Ok(_) => {
admitted += 1;
dispatcher.enqueue(tenant.clone(), *class, next_payload);
}
Err(_) => rejected += 1,
}
}
}
}
assert!(admitted > 0, "dispatch pipeline should admit at least one");
assert_eq!(
admitted + rejected,
(classes.len() as u64) * (tenants_per_class as u64) * (queries_per_tenant as u64),
"every query is accounted for"
);
let drained = dispatcher.drain_ordered();
if drained.is_empty() {
return; }
let priorities: Vec<u8> = drained.iter().map(|q| q.priority).collect();
for window in priorities.windows(2) {
assert!(
window[0] >= window[1],
"priorities must be non-increasing: {:?}",
priorities
);
}
}
#[test]
#[ignore = "heavy: 40000 ops; run with --ignored"]
fn multi_tenant_load_heavy() {
let admission = AdmissionController::new();
let mut dispatcher: SlaQueryDispatcher<u64> = SlaQueryDispatcher::new();
let classes = [
SlaClass::Bronze,
SlaClass::Silver,
SlaClass::Gold,
SlaClass::Platinum,
];
let tenants_per_class = 10;
let queries_per_tenant = 1_000;
let mut admitted = 0u64;
let mut payload: u64 = 0;
for (class_idx, class) in classes.iter().enumerate() {
for tenant_idx in 0..tenants_per_class {
let tenant = format!("heavy_{}_{}", class_idx, tenant_idx);
admission.register_tenant(&tenant, *class);
for _ in 0..queries_per_tenant {
payload += 1;
if admission.try_admit(&tenant).is_ok() {
admitted += 1;
dispatcher.enqueue(tenant.clone(), *class, payload);
}
}
}
}
let total = (classes.len() as u64) * (tenants_per_class as u64) * (queries_per_tenant as u64);
assert_eq!(total, 40_000, "heavy scenario must enumerate 40k queries");
assert!(admitted > 0);
let drained = dispatcher.drain_ordered();
let priorities: Vec<u8> = drained.iter().map(|q| q.priority).collect();
for window in priorities.windows(2) {
assert!(window[0] >= window[1]);
}
}
#[test]
fn snapshot_during_load_and_restore_on_fresh_node() {
let tmp = TempDir::new().expect("tempdir");
let wal_dir = tmp.path().to_path_buf();
let stop_writer = Arc::new(AtomicBool::new(false));
let written = Arc::new(AtomicU64::new(0));
let snapshot_target_ts = Arc::new(AtomicU64::new(0));
let stop_writer_t = stop_writer.clone();
let written_t = written.clone();
let target_t = snapshot_target_ts.clone();
let wal_dir_t = wal_dir.clone();
let writer = thread::spawn(move || -> anyhow::Result<()> {
let cfg = WalConfig {
wal_directory: wal_dir_t,
checkpoint_interval: u64::MAX,
sync_on_write: true,
..WalConfig::default()
};
let mgr = WalManager::new(cfg)?;
let mut i: u64 = 0;
let start = Instant::now();
while !stop_writer_t.load(Ordering::Relaxed) {
i += 1;
let ts = start.elapsed().as_millis() as u64 + 1;
mgr.append(WalEntry::Insert {
id: format!("v_{}", i),
vector: vec![i as f32, (i * 2) as f32],
metadata: None,
timestamp: ts,
})?;
written_t.store(i, Ordering::Relaxed);
if i == 50 {
target_t.store(ts, Ordering::Relaxed);
}
if i >= 300 {
break;
}
thread::sleep(Duration::from_millis(1));
}
mgr.flush()?;
Ok(())
});
let stop_reader = Arc::new(AtomicBool::new(false));
let stop_reader_t = stop_reader.clone();
let wal_dir_r = wal_dir.clone();
let reader = thread::spawn(move || {
let mut last_count = 0usize;
while !stop_reader_t.load(Ordering::Relaxed) {
if let Ok(entries) = std::fs::read_dir(&wal_dir_r) {
let count = entries.filter_map(|e| e.ok()).count();
assert!(
count >= last_count,
"WAL file count must be monotonic non-decreasing: {} -> {}",
last_count,
count
);
last_count = count;
}
thread::sleep(Duration::from_millis(2));
}
});
let deadline = Instant::now() + Duration::from_secs(15);
while snapshot_target_ts.load(Ordering::Relaxed) == 0 && Instant::now() < deadline {
thread::sleep(Duration::from_millis(5));
}
let target_ts = snapshot_target_ts.load(Ordering::Relaxed);
assert!(target_ts > 0, "writer must reach snapshot point in time");
let writes_at_restore_start = written.load(Ordering::Relaxed);
assert!(
writes_at_restore_start >= 50,
"writer must have at least 50 ops before restore starts"
);
assert!(
writes_at_restore_start < 300,
"writer must still be active when restore starts (got {} writes)",
writes_at_restore_start
);
let restore_start = Instant::now();
let mut fresh_store = VectorStore::new();
let report =
restore_to_timestamp(&mut fresh_store, target_ts, &wal_dir).expect("restore must succeed");
let rto = restore_start.elapsed().as_millis();
let writes_at_restore_end = written.load(Ordering::Relaxed);
assert!(
writes_at_restore_end >= writes_at_restore_start,
"writer should not have rolled back: {} -> {}",
writes_at_restore_start,
writes_at_restore_end
);
assert!(
rto < max_rto_ms(),
"RTO {} ms exceeded ceiling {} ms",
rto,
max_rto_ms()
);
stop_writer.store(true, Ordering::Relaxed);
writer
.join()
.expect("writer panicked")
.expect("writer error");
stop_reader.store(true, Ordering::Relaxed);
reader.join().expect("reader panicked");
let pit = PointInTimeRestore::new(target_ts, wal_dir.clone());
let replayed = pit.replay_wal_to_timestamp(None).expect("replay");
let last_replayed_ts = replayed
.iter()
.map(|e| match e {
WalEntry::Insert { timestamp, .. }
| WalEntry::Update { timestamp, .. }
| WalEntry::Delete { timestamp, .. }
| WalEntry::Batch { timestamp, .. }
| WalEntry::Checkpoint { timestamp, .. }
| WalEntry::BeginTransaction { timestamp, .. }
| WalEntry::CommitTransaction { timestamp, .. }
| WalEntry::AbortTransaction { timestamp, .. } => *timestamp,
})
.max()
.unwrap_or(0);
let rpo_ms = target_ts.saturating_sub(last_replayed_ts);
let max_rpo_ms = max_rpo_secs() * 1_000;
assert!(
rpo_ms <= max_rpo_ms,
"RPO {} ms exceeded ceiling {} ms",
rpo_ms,
max_rpo_ms
);
assert!(
report.entries_replayed > 0,
"restore must replay at least one entry"
);
let post_target_inserts = replayed
.iter()
.filter(|e| {
let ts = match e {
WalEntry::Insert { timestamp, .. }
| WalEntry::Update { timestamp, .. }
| WalEntry::Delete { timestamp, .. }
| WalEntry::Batch { timestamp, .. }
| WalEntry::Checkpoint { timestamp, .. }
| WalEntry::BeginTransaction { timestamp, .. }
| WalEntry::CommitTransaction { timestamp, .. }
| WalEntry::AbortTransaction { timestamp, .. } => *timestamp,
};
ts > target_ts
})
.count();
assert_eq!(
post_target_inserts, 0,
"restore must not include entries with timestamp > target_ts"
);
}
#[test]
fn chaos_writer_kill_reads_continue() {
let mut mgr = ReplicaManager::new(2);
let primary = ShardReplica::new(1, "writer-A", "node-A", ReplicaState::Primary, 100);
let reader = ShardReplica::new(1, "reader-B", "node-B", ReplicaState::Replica, 100);
mgr.register_replica(primary).expect("register primary");
mgr.register_replica(reader).expect("register reader");
let status_before = mgr.replication_status();
assert!(status_before.healthy, "starting state must be healthy");
mgr.mark_failed(1, "writer-A");
let status_after = mgr.replication_status();
assert!(!status_after.healthy, "killed writer => unhealthy");
let promoted = mgr.auto_failover(1).expect("auto_failover should succeed");
assert_eq!(promoted, "reader-B");
let status_post_failover = mgr.replication_status();
assert_eq!(
status_post_failover.failed_replicas, 1,
"writer-A still counted as failed"
);
let replicas = mgr.get_replicas(1);
assert!(!replicas.is_empty(), "shard 1 must exist");
let primary_count = replicas.iter().filter(|r| r.state.is_primary()).count();
assert_eq!(primary_count, 1);
}
#[test]
fn chaos_reader_kill_remaining_replicas_serve() {
let mut mgr = ReplicaManager::new(3);
mgr.register_replica(ShardReplica::new(
2,
"p",
"node-1",
ReplicaState::Primary,
50,
))
.expect("register primary");
mgr.register_replica(ShardReplica::new(
2,
"r1",
"node-2",
ReplicaState::Replica,
50,
))
.expect("register replica1");
mgr.register_replica(ShardReplica::new(
2,
"r2",
"node-3",
ReplicaState::Replica,
50,
))
.expect("register replica2");
mgr.mark_failed(2, "r1");
let status = mgr.replication_status();
assert_eq!(status.failed_replicas, 1);
assert_eq!(status.under_replicated, 1);
let replicas = mgr.get_replicas(2);
let healthy = replicas.iter().filter(|r| r.state.is_healthy()).count();
assert_eq!(healthy, 2, "primary + 1 replica still healthy");
}
#[test]
fn chaos_checkpoint_corruption_yields_typed_error() {
let tmp = TempDir::new().expect("tempdir");
let wal_dir = tmp.path();
let entries: Vec<(String, Vec<f32>, u64)> = (1..=5)
.map(|i| (format!("v{}", i), vec![i as f32], i as u64 * 100))
.collect();
write_wal_inserts(wal_dir, &entries).expect("seed wal");
let wal_files: Vec<_> = std::fs::read_dir(wal_dir)
.expect("read_dir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().starts_with("wal-"))
.collect();
assert!(!wal_files.is_empty(), "WAL must produce at least one file");
for f in &wal_files {
let mut bytes = std::fs::read(f.path()).expect("read wal file");
bytes.extend_from_slice(&[0xFF; 64]);
std::fs::write(f.path(), bytes).expect("rewrite");
}
let mut store = VectorStore::new();
let result = restore_to_timestamp(&mut store, 1_000, wal_dir);
match result {
Ok(report) => {
assert!(
report.entries_replayed <= 5,
"must not over-replay; got {}",
report.entries_replayed
);
}
Err(e) => {
let msg = format!("{}", e);
assert!(
!msg.is_empty(),
"error must carry a message; got empty error"
);
}
}
}
#[test]
fn dispatcher_stats_persist_across_restart() -> anyhow::Result<()> {
let mut stats_path = std::env::temp_dir();
stats_path.push(format!(
"oxirs_vec_w2s7_dispatcher_{}.json",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
{
let config = IndexDispatcherConfig {
stats_path: Some(stats_path.clone()),
stats_save_interval: 1,
..Default::default()
};
let mut config = config;
config.ivf_config.n_clusters = 4;
config.ivf_config.n_probes = 2;
let mut d = IndexDispatcher::new(config)?;
for i in 0..16 {
d.insert(format!("e_{}", i), det_vec(i as u64 + 1, 8))?;
}
d.build()?;
for q in 0..5 {
let _ = d.search_knn(&det_vec(1000 + q as u64, 8), 3)?;
}
d.flush_stats()?;
}
{
let config = IndexDispatcherConfig {
stats_path: Some(stats_path.clone()),
..Default::default()
};
let mut config = config;
config.ivf_config.n_clusters = 4;
let d = IndexDispatcher::new(config)?;
let n = d.observation_count()?;
assert!(n > 0, "second run must reload stats from {:?}", stats_path);
}
let _ = std::fs::remove_file(&stats_path);
Ok(())
}