use std::collections::HashMap;
use std::sync::Arc;
use nodedb::bridge::envelope::PhysicalPlan;
use nodedb::bridge::physical_plan::TimeseriesOp;
use nodedb_mem::{EngineId, GovernorConfig, MemoryGovernor};
use crate::helpers::*;
fn generous_governor() -> Arc<MemoryGovernor> {
let per_engine: usize = 1 << 30; let mut engine_limits = HashMap::new();
for id in EngineId::ALL {
engine_limits.insert(*id, per_engine);
}
let global_ceiling = per_engine * EngineId::ALL.len();
Arc::new(
MemoryGovernor::new(GovernorConfig {
global_ceiling,
engine_limits,
})
.expect("governor config valid"),
)
}
fn ilp_lines(collection: &str, count: usize, start_ts_ns: i64) -> String {
let mut lines = String::with_capacity(count * 96);
let qtypes = ["A", "AAAA", "MX", "CNAME"];
let rcodes = ["NOERROR", "NXDOMAIN", "SERVFAIL", "REFUSED"];
for i in 0..count {
let ts_ns = start_ts_ns + i as i64 * 1_000_000;
let qtype = qtypes[i % qtypes.len()];
let rcode = rcodes[i % rcodes.len()];
let qname = format!("host-{}.example.com", i % 50);
let client_ip = format!("10.0.{}.{}", (i / 256) % 256, i % 256);
lines.push_str(&format!(
"{collection},qtype={qtype},rcode={rcode},qname={qname},client_ip={client_ip} elapsed_ms={}.0 {ts_ns}\n",
(i % 1000) as f64
));
}
lines
}
fn ingest_ilp(ctx: &mut TestCtx, collection: &str, payload: &str) -> serde_json::Value {
let raw = send_ok(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: collection.to_string(),
payload: payload.as_bytes().to_vec(),
format: "ilp".to_string(),
wal_lsn: None,
surrogates: Vec::new(),
}),
);
let json = nodedb::data::executor::response_codec::decode_payload_to_json(&raw);
serde_json::from_str(&json).unwrap_or(serde_json::Value::Null)
}
fn run_ts_flush_workload() -> (TestCtx, Arc<MemoryGovernor>) {
let mut ctx = make_ctx();
let gov = generous_governor();
ctx.core.set_governor(Arc::clone(&gov));
let batch_size = 10_000usize;
let num_batches = 300usize;
let mut accepted: u64 = 0;
let mut rejected: u64 = 0;
for b in 0..num_batches {
let start_ns = (b * batch_size) as i64 * 1_000_000;
let payload = ilp_lines("budget_metrics", batch_size, start_ns);
let resp = ingest_ilp(&mut ctx, "budget_metrics", &payload);
accepted += resp["accepted"].as_u64().unwrap_or(0);
rejected += resp["rejected"].as_u64().unwrap_or(0);
}
assert_eq!(rejected, 0, "no rows should be rejected by the hard limit");
assert_eq!(
accepted,
(batch_size * num_batches) as u64,
"all sent rows should be accepted — otherwise the workload didn't \
exercise the flush path the way this test expects"
);
(ctx, gov)
}
#[test]
fn timeseries_flush_does_not_over_release_engine_budget() {
let (_ctx, gov) = run_ts_flush_workload();
let over_releases = gov.total_over_release_count();
assert_eq!(
over_releases, 0,
"timeseries ingest+flush produced {over_releases} over-release \
event(s): `flush_ts_collection` releases the resident memtable \
footprint from the Timeseries budget, but ingest only reserved a \
small per-batch estimate. The two sides must track the same \
quantity — charge the memtable's bytes on ingest (or release only \
what was reserved), so the flush release is balanced and the \
governor's view survives a WAL-replay restart."
);
}
#[test]
fn timeseries_governor_reflects_resident_memtable_after_flush() {
let (_ctx, gov) = run_ts_flush_workload();
let allocated = gov.total_allocated();
assert!(
allocated > 0,
"after a 3 M-row ingest (≥ 2 flush cycles, ~600 K rows still \
resident in the memtable) the governor reports {allocated} B \
allocated. The resident memtable is unaccounted: the per-batch \
reservation is released when ingest returns and each flush \
saturates the Timeseries budget to zero. Pressure detection \
cannot see memory the governor doesn't know about."
);
}