use reddb_server::{RedDBOptions, RedDBRuntime};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn runtime() -> RedDBRuntime {
RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
}
fn exec(rt: &RedDBRuntime, sql: &str) {
rt.execute_query(sql)
.unwrap_or_else(|err| panic!("{sql}: {err:?}"));
}
fn count_for(samples: &[((String, String), u64)], queue: &str) -> u64 {
samples
.iter()
.filter(|((_, q), _)| q == queue)
.map(|(_, n)| *n)
.sum()
}
#[test]
fn timeout_path_increments_started_and_timed_out_with_histogram_sample() {
let rt = runtime();
exec(&rt, "CREATE QUEUE qwt_timeout");
exec(&rt, "QUEUE GROUP CREATE qwt_timeout workers");
let read = rt
.execute_query("QUEUE READ qwt_timeout GROUP workers CONSUMER c1 COUNT 1 WAIT 120ms")
.expect("read");
assert!(
read.result.records.is_empty(),
"timeout should return empty projection"
);
let snap = rt.queue_telemetry_snapshot();
assert_eq!(
count_for(&snap.wait_started, "qwt_timeout"),
1,
"wait_started should fire exactly once for the one WAIT call"
);
assert_eq!(
count_for(&snap.wait_timed_out, "qwt_timeout"),
1,
"Timeout outcome should bump wait_timed_out"
);
assert_eq!(
count_for(&snap.wait_woken, "qwt_timeout"),
0,
"no wake happened; wait_woken should stay zero"
);
assert_eq!(
count_for(&snap.wait_cancelled, "qwt_timeout"),
0,
"no cancel; wait_cancelled should stay zero"
);
let hist: BTreeMap<_, _> = snap
.wait_duration
.iter()
.map(|((s, q), h)| ((s.clone(), q.clone()), h.clone()))
.collect();
let h = hist
.iter()
.find(|((_, q), _)| q == "qwt_timeout")
.map(|(_, h)| h)
.expect("histogram bucket present for qwt_timeout");
assert_eq!(h.count, 1, "exactly one histogram observation");
assert_eq!(
h.bucket_counts[0], 0,
"120ms timeout must not fall in the <=10ms bucket, got {:?}",
h.bucket_counts
);
assert_eq!(
h.bucket_counts[3], 1,
"120ms timeout should fall in the <=500ms bucket, got {:?}",
h.bucket_counts
);
assert!(
h.sum_ms >= 100 && h.sum_ms < 5_000,
"sum_ms should reflect ~120ms wait, got {}",
h.sum_ms
);
}
#[test]
fn wake_path_increments_woken_counter_and_histogram() {
let rt = Arc::new(runtime());
exec(&rt, "CREATE QUEUE qwt_wake");
exec(&rt, "QUEUE GROUP CREATE qwt_wake workers");
let producer_rt = rt.clone();
let producer = thread::spawn(move || {
thread::sleep(Duration::from_millis(80));
exec(&producer_rt, "QUEUE PUSH qwt_wake 'hi'");
});
let read = rt
.execute_query("QUEUE READ qwt_wake GROUP workers CONSUMER c1 COUNT 1 WAIT 5s")
.expect("read");
producer.join().unwrap();
assert_eq!(
read.result.records.len(),
1,
"wake should deliver the pushed message"
);
let snap = rt.queue_telemetry_snapshot();
assert_eq!(count_for(&snap.wait_started, "qwt_wake"), 1);
assert_eq!(count_for(&snap.wait_woken, "qwt_wake"), 1);
assert_eq!(count_for(&snap.wait_timed_out, "qwt_wake"), 0);
assert_eq!(count_for(&snap.wait_cancelled, "qwt_wake"), 0);
let h = snap
.wait_duration
.iter()
.find(|((_, q), _)| q == "qwt_wake")
.map(|(_, h)| h.clone())
.expect("histogram bucket present");
assert_eq!(h.count, 1, "exactly one observation for the one WAIT call");
}
#[test]
fn cancellation_path_increments_cancelled_counter() {
let rt = Arc::new(runtime());
exec(&rt, "CREATE QUEUE qwt_cancel");
exec(&rt, "QUEUE GROUP CREATE qwt_cancel workers");
let canceler_rt = rt.clone();
let registry = canceler_rt.queue_wait_registry();
let canceler = thread::spawn(move || {
thread::sleep(Duration::from_millis(80));
registry.cancel_all();
});
let err = rt
.execute_query("QUEUE READ qwt_cancel GROUP workers CONSUMER c1 COUNT 1 WAIT 5s")
.expect_err("cancellation surfaces as Err");
canceler.join().unwrap();
assert!(format!("{err}").to_lowercase().contains("wait"));
let snap = rt.queue_telemetry_snapshot();
assert_eq!(count_for(&snap.wait_started, "qwt_cancel"), 1);
assert_eq!(count_for(&snap.wait_cancelled, "qwt_cancel"), 1);
assert_eq!(count_for(&snap.wait_woken, "qwt_cancel"), 0);
assert_eq!(count_for(&snap.wait_timed_out, "qwt_cancel"), 0);
let h = snap
.wait_duration
.iter()
.find(|((_, q), _)| q == "qwt_cancel")
.map(|(_, h)| h.clone())
.expect("cancellation also records a histogram observation");
assert_eq!(h.count, 1);
rt.queue_wait_registry().reset_cancelled();
}
#[test]
fn immediate_read_does_not_increment_wait_counters() {
let rt = runtime();
exec(&rt, "CREATE QUEUE qwt_imm");
exec(&rt, "QUEUE GROUP CREATE qwt_imm workers");
exec(&rt, "QUEUE PUSH qwt_imm 'ready'");
let read = rt
.execute_query("QUEUE READ qwt_imm GROUP workers CONSUMER c1 COUNT 1 WAIT 5s")
.expect("read");
assert_eq!(read.result.records.len(), 1);
let snap = rt.queue_telemetry_snapshot();
assert_eq!(
count_for(&snap.wait_started, "qwt_imm"),
0,
"immediate read must not start a park lifecycle"
);
assert_eq!(count_for(&snap.wait_woken, "qwt_imm"), 0);
assert_eq!(count_for(&snap.wait_timed_out, "qwt_imm"), 0);
assert_eq!(count_for(&snap.wait_cancelled, "qwt_imm"), 0);
}
#[test]
fn normal_wait_outcomes_do_not_emit_operator_events() {
let wait_fn_source = read_wait_function_source();
assert!(
!wait_fn_source.contains("OperatorEvent"),
"group_read_with_optional_wait must not reference OperatorEvent on any branch"
);
assert!(
!wait_fn_source.contains("emit_global"),
"group_read_with_optional_wait must not call emit_global on any branch"
);
assert!(
!wait_fn_source.contains("AuditValue"),
"group_read_with_optional_wait must not construct audit payloads on any branch"
);
let rt = Arc::new(runtime());
exec(&rt, "CREATE QUEUE qwt_audit");
exec(&rt, "QUEUE GROUP CREATE qwt_audit workers");
let _ = rt
.execute_query("QUEUE READ qwt_audit GROUP workers CONSUMER c1 COUNT 1 WAIT 80ms")
.expect("timeout read");
let producer_rt = rt.clone();
let producer = thread::spawn(move || {
thread::sleep(Duration::from_millis(40));
exec(&producer_rt, "QUEUE PUSH qwt_audit 'go'");
});
let _ = rt
.execute_query("QUEUE READ qwt_audit GROUP workers CONSUMER c1 COUNT 1 WAIT 5s")
.expect("wake read");
producer.join().unwrap();
exec(&rt, "CREATE QUEUE qwt_audit_cancel");
exec(&rt, "QUEUE GROUP CREATE qwt_audit_cancel workers");
let canceler_rt = rt.clone();
let canceler = thread::spawn(move || {
thread::sleep(Duration::from_millis(40));
canceler_rt.queue_wait_registry().cancel_all();
});
let _ = rt
.execute_query("QUEUE READ qwt_audit_cancel GROUP workers CONSUMER c1 COUNT 1 WAIT 5s")
.expect_err("cancellation surfaces as Err");
canceler.join().unwrap();
rt.queue_wait_registry().reset_cancelled();
}
fn read_wait_function_source() -> String {
let manifest = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR set under cargo");
let path = std::path::Path::new(&manifest).join("src/runtime/impl_queue.rs");
let source =
std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("read {}: {}", path.display(), e));
let needle = "fn group_read_with_optional_wait";
let start = source
.find(needle)
.unwrap_or_else(|| panic!("could not find {needle} in impl_queue.rs"));
let after_sig = &source[start..];
let open = after_sig.find('{').expect("open brace after fn signature");
let mut depth = 0i32;
let mut end = open;
for (i, ch) in after_sig[open..].char_indices() {
match ch {
'{' => depth += 1,
'}' => {
depth -= 1;
if depth == 0 {
end = open + i + 1;
break;
}
}
_ => {}
}
}
after_sig[..end].to_string()
}