use super::*;
use crate::runtime::lifecycle::Phase;
use std::path::{Path, PathBuf};
pub(crate) fn runtime_state_path(data_path: &Path) -> PathBuf {
let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
parent.join(".runtime-state.json")
}
pub(crate) fn persist_runtime_readonly(state_path: &Path, enabled: bool) -> std::io::Result<()> {
use std::io::Write;
let mut object = crate::json::Map::new();
object.insert("read_only".to_string(), crate::json::Value::Bool(enabled));
let body = crate::serde_json::to_string_pretty(&crate::json::Value::Object(object))
.map_err(|err| std::io::Error::other(err.to_string()))?;
if let Some(parent) = state_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let tmp = state_path.with_extension("json.tmp");
{
let mut f = std::fs::File::create(&tmp)?;
f.write_all(body.as_bytes())?;
f.sync_all()?;
}
std::fs::rename(&tmp, state_path)?;
Ok(())
}
pub fn load_runtime_readonly(data_path: &Path) -> Option<bool> {
let state_path = runtime_state_path(data_path);
let bytes = std::fs::read(&state_path).ok()?;
let parsed: crate::json::Value = crate::json::from_slice(&bytes).ok()?;
parsed.get("read_only").and_then(|v| v.as_bool())
}
fn default_holder_id() -> String {
if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
return explicit;
}
let host = std::env::var("HOSTNAME")
.or_else(|_| std::env::var("HOST"))
.unwrap_or_else(|_| "unknown-host".to_string());
format!("{host}-{}", std::process::id())
}
fn sanitize_label(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for ch in value.chars() {
match ch {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
_ => out.push(ch),
}
}
out
}
fn b64_decode(input: &str) -> Result<Vec<u8>, String> {
let input = input.trim_end_matches('=');
let mut buf = Vec::with_capacity(input.len() * 3 / 4 + 1);
let lookup = |c: u8| -> Result<u32, String> {
match c {
b'A'..=b'Z' => Ok((c - b'A') as u32),
b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
b'+' => Ok(62),
b'/' => Ok(63),
other => Err(format!("invalid base64 character: {}", other as char)),
}
};
let bytes: Vec<u8> = input.bytes().collect();
for chunk in bytes.chunks(4) {
let v: Vec<u32> = chunk.iter().map(|&b| lookup(b)).collect::<Result<_, _>>()?;
match v.len() {
4 => {
let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6) | v[3];
buf.push((n >> 16) as u8);
buf.push((n >> 8) as u8);
buf.push(n as u8);
}
3 => {
let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6);
buf.push((n >> 16) as u8);
buf.push((n >> 8) as u8);
}
2 => {
let n = (v[0] << 18) | (v[1] << 12);
buf.push((n >> 16) as u8);
}
_ => {}
}
}
Ok(buf)
}
fn reject_smuggling_bytes(field: &str, value: &str) -> Option<HttpResponse> {
for (idx, byte) in value.as_bytes().iter().enumerate() {
match *byte {
b'\0' => {
return Some(json_error(
400,
format!("field `{field}` contains forbidden NUL byte at index {idx}"),
));
}
b'\r' | b'\n' => {
return Some(json_error(
400,
format!("field `{field}` contains forbidden CR/LF byte at index {idx}"),
));
}
_ => {}
}
}
None
}
impl RedDBServer {
pub(crate) fn handle_admin_shutdown(&self) -> HttpResponse {
let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
.ok()
.map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
.unwrap_or(true);
match self.runtime.graceful_shutdown(backup_on_shutdown) {
Ok(report) => {
let mut details = Map::new();
details.insert(
"backup_uploaded".to_string(),
JsonValue::Bool(report.backup_uploaded),
);
details.insert(
"duration_ms".to_string(),
JsonValue::Number(report.duration_ms as f64),
);
self.runtime.audit_log().record(
"admin/shutdown",
"operator",
"instance",
"ok",
JsonValue::Object(details),
);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert(
"phase".to_string(),
JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
);
object.insert(
"flushed_wal".to_string(),
JsonValue::Bool(report.flushed_wal),
);
object.insert(
"final_checkpoint".to_string(),
JsonValue::Bool(report.final_checkpoint),
);
object.insert(
"backup_uploaded".to_string(),
JsonValue::Bool(report.backup_uploaded),
);
object.insert(
"duration_ms".to_string(),
JsonValue::Number(report.duration_ms as f64),
);
json_response(200, JsonValue::Object(object))
}
Err(err) => json_error(500, err.to_string()),
}
}
pub(crate) fn handle_admin_restore(&self, body: Vec<u8>) -> HttpResponse {
if !self.runtime.write_gate().is_read_only() {
return json_error(
409,
"POST /admin/restore requires the runtime to be read_only or replica-role; \
toggle via RED_READONLY=true or POST /admin/readonly first",
);
}
let db = self.runtime.db();
let Some(backend) = db.options().remote_backend.clone() else {
return json_error(412, "no remote backend configured (RED_BACKEND=none)");
};
let Some(local_path) = db.path().map(|p| p.to_path_buf()) else {
return json_error(412, "in-memory runtime cannot be restored from remote");
};
let snapshot_prefix = db.options().default_snapshot_prefix();
let wal_prefix = db.options().default_wal_archive_prefix();
let target_time_ms = if body.is_empty() {
0u64
} else {
match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
Ok(v) => v
.get("to_timestamp_ms")
.and_then(|n| n.as_u64())
.or_else(|| {
v.get("to_timestamp")
.and_then(|n| n.as_u64())
.map(|s| s.saturating_mul(1000))
})
.unwrap_or(0),
Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
}
};
let recovery =
crate::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
match recovery.restore_to(target_time_ms, &local_path) {
Ok(report) => {
let mut details = Map::new();
details.insert(
"snapshot_used".to_string(),
JsonValue::Number(report.snapshot_used as f64),
);
details.insert(
"wal_segments_replayed".to_string(),
JsonValue::Number(report.wal_segments_replayed as f64),
);
details.insert(
"records_applied".to_string(),
JsonValue::Number(report.records_applied as f64),
);
details.insert(
"recovered_to_lsn".to_string(),
JsonValue::Number(report.recovered_to_lsn as f64),
);
details.insert(
"recovered_to_time".to_string(),
JsonValue::Number(report.recovered_to_time as f64),
);
self.runtime.audit_log().record(
"admin/restore",
"operator",
"instance",
"ok",
JsonValue::Object(details.clone()),
);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
for (k, v) in details {
object.insert(k, v);
}
json_response(200, JsonValue::Object(object))
}
Err(err) => {
self.runtime.audit_log().record(
"admin/restore",
"operator",
"instance",
&format!("err: {err}"),
JsonValue::Null,
);
json_error(500, err.to_string())
}
}
}
pub(crate) fn handle_admin_backup(
&self,
_query: &std::collections::BTreeMap<String, String>,
) -> HttpResponse {
match self.runtime.trigger_backup() {
Ok(result) => {
let mut details = Map::new();
details.insert(
"snapshot_id".to_string(),
JsonValue::Number(result.snapshot_id as f64),
);
details.insert("uploaded".to_string(), JsonValue::Bool(result.uploaded));
details.insert(
"duration_ms".to_string(),
JsonValue::Number(result.duration_ms as f64),
);
self.runtime.audit_log().record(
"admin/backup",
"operator",
"instance",
"ok",
JsonValue::Object(details.clone()),
);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
for (k, v) in details {
object.insert(k, v);
}
json_response(200, JsonValue::Object(object))
}
Err(err) => {
self.runtime.audit_log().record(
"admin/backup",
"operator",
"instance",
&format!("err: {err}"),
JsonValue::Null,
);
json_error(500, err.to_string())
}
}
}
pub(crate) fn handle_admin_blob_cache_sweep(&self, body: Vec<u8>) -> HttpResponse {
use crate::storage::cache::sweeper::{BlobCacheSweeper, SweepLimit};
let (limit_entries, limit_millis) = if body.is_empty() {
(None, None)
} else {
match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
Ok(v) => {
let entries = v
.get("limit_entries")
.and_then(|n| n.as_u64())
.map(|n| usize::try_from(n).unwrap_or(usize::MAX));
let millis = v
.get("limit_millis")
.and_then(|n| n.as_u64())
.map(|n| u32::try_from(n).unwrap_or(u32::MAX));
(entries, millis)
}
Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
}
};
let limit = match (limit_entries, limit_millis) {
(None, None) => SweepLimit::Either {
entries: usize::MAX,
millis: u32::MAX,
},
(Some(e), None) => SweepLimit::Entries(e),
(None, Some(m)) => SweepLimit::Millis(m),
(Some(e), Some(m)) => SweepLimit::Either {
entries: e,
millis: m,
},
};
let report = BlobCacheSweeper::sweep_expired(self.runtime.result_blob_cache(), limit);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert(
"entries_scanned".to_string(),
JsonValue::Number(report.entries_scanned as f64),
);
object.insert(
"entries_evicted".to_string(),
JsonValue::Number(report.entries_evicted as f64),
);
object.insert(
"bytes_reclaimed".to_string(),
JsonValue::Number(report.bytes_reclaimed as f64),
);
object.insert(
"elapsed_ms".to_string(),
JsonValue::Number(report.elapsed_ms as f64),
);
object.insert(
"truncated_due_to_limit".to_string(),
JsonValue::Bool(report.truncated_due_to_limit),
);
let mut details = Map::new();
details.insert(
"entries_evicted".to_string(),
JsonValue::Number(report.entries_evicted as f64),
);
details.insert(
"bytes_reclaimed".to_string(),
JsonValue::Number(report.bytes_reclaimed as f64),
);
details.insert(
"elapsed_ms".to_string(),
JsonValue::Number(report.elapsed_ms as f64),
);
self.runtime.audit_log().record(
"admin/blob_cache/sweep",
"operator",
"instance",
"ok",
JsonValue::Object(details),
);
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_admin_blob_cache_flush_namespace(&self, body: Vec<u8>) -> HttpResponse {
use crate::storage::cache::sweeper::BlobCacheSweeper;
if body.is_empty() {
return json_error(400, "missing JSON body with required `namespace` field");
}
let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
Ok(v) => v,
Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
};
let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json_error(400, "field `namespace` is required and must be a string"),
};
if namespace.is_empty() {
return json_error(400, "field `namespace` must not be empty");
}
for (idx, byte) in namespace.as_bytes().iter().enumerate() {
match *byte {
b'\0' => {
return json_error(
400,
format!("field `namespace` contains forbidden NUL byte at index {idx}"),
);
}
b'\r' | b'\n' => {
return json_error(
400,
format!("field `namespace` contains forbidden CR/LF byte at index {idx}"),
);
}
_ => {}
}
}
let report =
BlobCacheSweeper::flush_namespace(self.runtime.result_blob_cache(), &namespace);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert(
"namespace".to_string(),
crate::json_field::SerializedJsonField::tainted(&report.namespace),
);
object.insert(
"generation_before".to_string(),
JsonValue::Number(report.generation_before as f64),
);
object.insert(
"generation_after".to_string(),
JsonValue::Number(report.generation_after as f64),
);
object.insert(
"elapsed_micros".to_string(),
JsonValue::Number(report.elapsed_micros as f64),
);
let mut details = Map::new();
details.insert(
"namespace".to_string(),
crate::json_field::SerializedJsonField::tainted(&report.namespace),
);
details.insert(
"elapsed_micros".to_string(),
JsonValue::Number(report.elapsed_micros as f64),
);
self.runtime.audit_log().record(
"admin/blob_cache/flush_namespace",
"operator",
"instance",
"ok",
JsonValue::Object(details),
);
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_admin_blob_cache_compare_and_set(&self, body: Vec<u8>) -> HttpResponse {
use crate::storage::cache::blob::{BlobCachePolicy, BlobCachePut, CacheError};
if body.is_empty() {
return json_error(400, "missing JSON body");
}
let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
Ok(v) => v,
Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
};
let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
Some(n) if !n.is_empty() => n.to_string(),
Some(_) => return json_error(400, "field `namespace` must not be empty"),
None => return json_error(400, "field `namespace` is required and must be a string"),
};
let key = match parsed.get("key").and_then(|v| v.as_str()) {
Some(k) if !k.is_empty() => k.to_string(),
Some(_) => return json_error(400, "field `key` must not be empty"),
None => return json_error(400, "field `key` is required and must be a string"),
};
let new_value_b64 = match parsed.get("new_value_b64").and_then(|v| v.as_str()) {
Some(v) => v.to_string(),
None => {
return json_error(
400,
"field `new_value_b64` is required and must be a string",
)
}
};
let new_version = match parsed.get("new_version").and_then(|v| v.as_u64()) {
Some(v) => v,
None => {
return json_error(
400,
"field `new_version` is required and must be a non-negative integer",
)
}
};
if let Some(ev) = parsed.get("expected_version") {
if ev.as_u64().is_none() {
return json_error(
400,
"field `expected_version` must be a non-negative integer",
);
}
}
let ttl_ms = parsed.get("ttl_ms").and_then(|v| v.as_u64());
if let Some(err) = reject_smuggling_bytes("namespace", &namespace) {
return err;
}
if let Some(err) = reject_smuggling_bytes("key", &key) {
return err;
}
let bytes = match b64_decode(&new_value_b64) {
Ok(b) => b,
Err(e) => return json_error(400, format!("invalid base64 in `new_value_b64`: {e}")),
};
let policy = if let Some(ttl) = ttl_ms {
BlobCachePolicy::default().version(new_version).ttl_ms(ttl)
} else {
BlobCachePolicy::default().version(new_version)
};
let put = BlobCachePut::new(bytes).with_policy(policy);
match self.runtime.result_blob_cache().put(&namespace, &key, put) {
Ok(()) => {
let mut obj = Map::new();
obj.insert("committed".to_string(), JsonValue::Bool(true));
obj.insert(
"current_version".to_string(),
JsonValue::Number(new_version as f64),
);
let mut details = Map::new();
details.insert(
"namespace".to_string(),
crate::json_field::SerializedJsonField::tainted(&namespace),
);
details.insert(
"key".to_string(),
crate::json_field::SerializedJsonField::tainted(&key),
);
details.insert(
"new_version".to_string(),
JsonValue::Number(new_version as f64),
);
self.runtime.audit_log().record(
"admin/cache/compare_and_set",
"operator",
"instance",
"ok",
JsonValue::Object(details),
);
json_response(200, JsonValue::Object(obj))
}
Err(CacheError::VersionMismatch { existing, .. }) => {
let mut obj = Map::new();
obj.insert("committed".to_string(), JsonValue::Bool(false));
obj.insert(
"current_version".to_string(),
JsonValue::Number(existing as f64),
);
obj.insert(
"reason".to_string(),
JsonValue::String("VersionMismatch".to_string()),
);
json_response(409, JsonValue::Object(obj))
}
Err(err) => json_error(500, format!("cache put failed: {err:?}")),
}
}
pub(crate) fn handle_admin_blob_cache_stats(
&self,
_query: &std::collections::BTreeMap<String, String>,
) -> HttpResponse {
let s = self.runtime.result_blob_cache().stats();
let mut obj = Map::new();
obj.insert("ok".to_string(), JsonValue::Bool(true));
obj.insert("hits".to_string(), JsonValue::Number(s.hits() as f64));
obj.insert("misses".to_string(), JsonValue::Number(s.misses() as f64));
obj.insert(
"insertions".to_string(),
JsonValue::Number(s.insertions() as f64),
);
obj.insert(
"evictions".to_string(),
JsonValue::Number(s.evictions() as f64),
);
obj.insert(
"expirations".to_string(),
JsonValue::Number(s.expirations() as f64),
);
obj.insert(
"invalidations".to_string(),
JsonValue::Number(s.invalidations() as f64),
);
obj.insert(
"namespace_flushes".to_string(),
JsonValue::Number(s.namespace_flushes() as f64),
);
obj.insert(
"version_mismatches".to_string(),
JsonValue::Number(s.version_mismatches() as f64),
);
obj.insert("entries".to_string(), JsonValue::Number(s.entries() as f64));
obj.insert(
"bytes_in_use".to_string(),
JsonValue::Number(s.bytes_in_use() as f64),
);
obj.insert(
"l1_bytes_max".to_string(),
JsonValue::Number(s.l1_bytes_max() as f64),
);
obj.insert(
"l2_bytes_in_use".to_string(),
JsonValue::Number(s.l2_bytes_in_use() as f64),
);
obj.insert(
"l2_bytes_max".to_string(),
JsonValue::Number(s.l2_bytes_max() as f64),
);
obj.insert(
"l2_full_rejections".to_string(),
JsonValue::Number(s.l2_full_rejections() as f64),
);
obj.insert(
"l2_metadata_reads".to_string(),
JsonValue::Number(s.l2_metadata_reads() as f64),
);
obj.insert(
"l2_negative_skips".to_string(),
JsonValue::Number(s.l2_negative_skips() as f64),
);
obj.insert(
"synopsis_metadata_reads".to_string(),
JsonValue::Number(s.synopsis_metadata_reads() as f64),
);
obj.insert(
"synopsis_bytes".to_string(),
JsonValue::Number(s.synopsis_bytes() as f64),
);
obj.insert(
"namespaces".to_string(),
JsonValue::Number(s.namespaces() as f64),
);
obj.insert(
"max_namespaces".to_string(),
JsonValue::Number(s.max_namespaces() as f64),
);
obj.insert(
"promotion_queued".to_string(),
JsonValue::Number(s.promotion_queued() as f64),
);
obj.insert(
"promotion_dropped".to_string(),
JsonValue::Number(s.promotion_dropped() as f64),
);
obj.insert(
"promotion_completed".to_string(),
JsonValue::Number(s.promotion_completed() as f64),
);
obj.insert(
"promotion_queue_depth".to_string(),
JsonValue::Number(s.promotion_queue_depth() as f64),
);
obj.insert(
"l2_compression_ratio_observed".to_string(),
JsonValue::Number(s.l2_compression_ratio_observed()),
);
obj.insert(
"l2_compression_skipped_total".to_string(),
JsonValue::Number(s.l2_compression_skipped_total() as f64),
);
obj.insert(
"l2_bytes_saved_total".to_string(),
JsonValue::Number(s.l2_bytes_saved_total() as f64),
);
json_response(200, JsonValue::Object(obj))
}
pub(crate) fn handle_admin_readonly(&self, body: Vec<u8>) -> HttpResponse {
let enabled = if body.is_empty() {
true
} else {
match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
Ok(v) => v.get("enabled").and_then(|n| n.as_bool()).unwrap_or(true),
Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
}
};
let previous = self.runtime.write_gate().set_read_only(enabled);
if let Some(data_path) = self.runtime.db().path() {
let state_path = runtime_state_path(data_path);
if let Err(err) = persist_runtime_readonly(&state_path, enabled) {
self.runtime.write_gate().set_read_only(previous);
return json_error(
500,
format!("read_only persisted to {state_path:?} failed: {err}"),
);
}
}
let mut details = Map::new();
details.insert("enabled".to_string(), JsonValue::Bool(enabled));
details.insert("previous".to_string(), JsonValue::Bool(previous));
self.runtime.audit_log().record(
"admin/readonly",
"operator",
"instance",
"ok",
JsonValue::Object(details),
);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert("read_only".to_string(), JsonValue::Bool(enabled));
object.insert("previous".to_string(), JsonValue::Bool(previous));
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_metrics(&self) -> HttpResponse {
use std::fmt::Write;
let lifecycle = self.runtime.lifecycle();
let phase = lifecycle.phase();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
let cold_start_secs = lifecycle
.ready_at_ms()
.map(|ready_ms| (ready_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0);
let health_status: u8 = match phase {
Phase::Stopped => 0,
Phase::Starting | Phase::ShuttingDown => 0,
Phase::Draining => 1,
Phase::Ready => 2,
};
let read_only = self.runtime.write_gate().is_read_only();
let role = match self.runtime.write_gate().role() {
crate::replication::ReplicationRole::Standalone => "standalone",
crate::replication::ReplicationRole::Primary => "primary",
crate::replication::ReplicationRole::Replica { .. } => "replica",
};
let db_size_bytes = self
.runtime
.db()
.path()
.and_then(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.unwrap_or(0);
let runtime_stats = self.runtime.stats();
let result_blob_stats = runtime_stats.result_blob_cache;
let kv_stats = runtime_stats.kv;
let metrics_ingest = runtime_stats.metrics_ingest;
let mut body = String::with_capacity(1024);
let _ = writeln!(
body,
"# HELP reddb_uptime_seconds Seconds since the runtime was constructed."
);
let _ = writeln!(body, "# TYPE reddb_uptime_seconds gauge");
let _ = writeln!(body, "reddb_uptime_seconds {}", uptime_secs);
let _ = writeln!(
body,
"# HELP reddb_health_status 0=down/starting, 1=degraded/draining, 2=ready."
);
let _ = writeln!(body, "# TYPE reddb_health_status gauge");
let _ = writeln!(body, "reddb_health_status {}", health_status);
let _ = writeln!(
body,
"# HELP reddb_phase Lifecycle phase as a labeled gauge (always 1; phase in label)."
);
let _ = writeln!(body, "# TYPE reddb_phase gauge");
let _ = writeln!(body, "reddb_phase{{phase=\"{}\"}} 1", phase.as_str());
let _ = writeln!(
body,
"# HELP reddb_read_only 1 when public mutations are gated, 0 otherwise."
);
let _ = writeln!(body, "# TYPE reddb_read_only gauge");
let _ = writeln!(body, "reddb_read_only {}", if read_only { 1 } else { 0 });
let _ = writeln!(
body,
"# HELP reddb_replication_role Replication role of this instance."
);
let _ = writeln!(body, "# TYPE reddb_replication_role gauge");
let _ = writeln!(body, "reddb_replication_role{{role=\"{}\"}} 1", role);
let lease_state = self.runtime.write_gate().lease_state();
let _ = writeln!(
body,
"# HELP reddb_writer_lease_state Serverless writer-lease gate state (label)."
);
let _ = writeln!(body, "# TYPE reddb_writer_lease_state gauge");
let _ = writeln!(
body,
"reddb_writer_lease_state{{state=\"{}\"}} 1",
lease_state.label()
);
let backup_status = self.runtime.backup_status();
if let Some(last) = backup_status.last_backup.as_ref() {
let last_ts_secs = (last.timestamp as f64) / 1000.0;
let _ = writeln!(
body,
"# HELP reddb_backup_last_success_timestamp_seconds Unix ts (s) of the most recent successful backup."
);
let _ = writeln!(
body,
"# TYPE reddb_backup_last_success_timestamp_seconds gauge"
);
let _ = writeln!(
body,
"reddb_backup_last_success_timestamp_seconds {}",
last_ts_secs
);
let age_secs = ((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0;
let _ = writeln!(
body,
"# HELP reddb_backup_age_seconds Seconds since last successful backup."
);
let _ = writeln!(body, "# TYPE reddb_backup_age_seconds gauge");
let _ = writeln!(body, "reddb_backup_age_seconds {}", age_secs);
let _ = writeln!(
body,
"# HELP reddb_backup_last_duration_seconds Wall-clock duration of the most recent backup."
);
let _ = writeln!(body, "# TYPE reddb_backup_last_duration_seconds gauge");
let _ = writeln!(
body,
"reddb_backup_last_duration_seconds {}",
(last.duration_ms as f64) / 1000.0
);
}
let _ = writeln!(
body,
"# HELP reddb_backup_failures_total Total backup failures since process start."
);
let _ = writeln!(body, "# TYPE reddb_backup_failures_total counter");
let _ = writeln!(
body,
"reddb_backup_failures_total {}",
backup_status.total_failures
);
let _ = writeln!(
body,
"# HELP reddb_backup_total_total Total successful backups since process start."
);
let _ = writeln!(body, "# TYPE reddb_backup_total_total counter");
let _ = writeln!(
body,
"reddb_backup_total_total {}",
backup_status.total_backups
);
let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
let lag = current_lsn.saturating_sub(last_archived_lsn);
let _ = writeln!(
body,
"# HELP reddb_wal_current_lsn Current local LSN (most recent record visible to writers)."
);
let _ = writeln!(body, "# TYPE reddb_wal_current_lsn gauge");
let _ = writeln!(body, "reddb_wal_current_lsn {}", current_lsn);
let _ = writeln!(
body,
"# HELP reddb_wal_last_archived_lsn LSN of the most recently archived WAL segment."
);
let _ = writeln!(body, "# TYPE reddb_wal_last_archived_lsn gauge");
let _ = writeln!(body, "reddb_wal_last_archived_lsn {}", last_archived_lsn);
let _ = writeln!(
body,
"# HELP reddb_wal_archive_lag_records Records between current LSN and last archived LSN."
);
let _ = writeln!(body, "# TYPE reddb_wal_archive_lag_records gauge");
let _ = writeln!(body, "reddb_wal_archive_lag_records {}", lag);
let _ = writeln!(
body,
"# HELP reddb_metrics_remote_write_samples_accepted_total Metrics remote-write samples accepted since process start."
);
let _ = writeln!(
body,
"# TYPE reddb_metrics_remote_write_samples_accepted_total counter"
);
let _ = writeln!(
body,
"reddb_metrics_remote_write_samples_accepted_total {}",
metrics_ingest.samples_accepted
);
let _ = writeln!(
body,
"# HELP reddb_metrics_remote_write_series_accepted_total Metrics remote-write series accepted since process start."
);
let _ = writeln!(
body,
"# TYPE reddb_metrics_remote_write_series_accepted_total counter"
);
let _ = writeln!(
body,
"reddb_metrics_remote_write_series_accepted_total {}",
metrics_ingest.series_accepted
);
let _ = writeln!(
body,
"# HELP reddb_metrics_remote_write_samples_rejected_total Metrics remote-write samples rejected since process start."
);
let _ = writeln!(
body,
"# TYPE reddb_metrics_remote_write_samples_rejected_total counter"
);
let _ = writeln!(
body,
"reddb_metrics_remote_write_samples_rejected_total {}",
metrics_ingest.samples_rejected
);
let _ = writeln!(
body,
"# HELP reddb_metrics_remote_write_series_rejected_total Metrics remote-write series rejected since process start."
);
let _ = writeln!(
body,
"# TYPE reddb_metrics_remote_write_series_rejected_total counter"
);
let _ = writeln!(
body,
"reddb_metrics_remote_write_series_rejected_total {}",
metrics_ingest.series_rejected
);
let _ = writeln!(
body,
"# HELP reddb_metrics_remote_write_series_rejected_by_reason_total Metrics remote-write series rejected since process start by reason."
);
let _ = writeln!(
body,
"# TYPE reddb_metrics_remote_write_series_rejected_by_reason_total counter"
);
let _ = writeln!(
body,
"reddb_metrics_remote_write_series_rejected_by_reason_total{{reason=\"cardinality_budget\"}} {}",
metrics_ingest.series_rejected_cardinality_budget
);
let _ = writeln!(
body,
"# HELP reddb_metrics_tenant_activity_total Metrics adapter requests by tenant, namespace, and operation since process start."
);
let _ = writeln!(body, "# TYPE reddb_metrics_tenant_activity_total counter");
for activity in self.runtime.metrics_tenant_activity_snapshot() {
let _ = writeln!(
body,
"reddb_metrics_tenant_activity_total{{tenant=\"{}\",namespace=\"{}\",operation=\"{}\"}} {}",
sanitize_label(&activity.tenant),
sanitize_label(&activity.namespace),
sanitize_label(&activity.operation),
activity.count
);
}
let replicas = self.runtime.primary_replica_snapshots();
let _ = writeln!(
body,
"# HELP reddb_replica_count Currently registered replicas."
);
let _ = writeln!(body, "# TYPE reddb_replica_count gauge");
let _ = writeln!(body, "reddb_replica_count {}", replicas.len());
if !replicas.is_empty() {
let replica_lag_budget_secs = std::env::var("RED_SLO_REPLICA_LAG_BUDGET_SECONDS")
.ok()
.and_then(|value| value.parse::<f64>().ok())
.filter(|value| value.is_finite() && *value >= 0.0)
.unwrap_or(60.0);
let _ = writeln!(
body,
"# HELP reddb_replica_ack_lsn Most recent LSN acked by each replica."
);
let _ = writeln!(body, "# TYPE reddb_replica_ack_lsn gauge");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_ack_lsn{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.last_acked_lsn
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_applied_lsn Most recent LSN applied by each replica."
);
let _ = writeln!(body, "# TYPE reddb_replica_applied_lsn gauge");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_applied_lsn{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.last_acked_lsn
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_durable_lsn Most recent LSN durably persisted by each replica."
);
let _ = writeln!(body, "# TYPE reddb_replica_durable_lsn gauge");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_durable_lsn{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.last_durable_lsn
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_lag_records Real LSN distance from last sent LSN to applied LSN."
);
let _ = writeln!(body, "# TYPE reddb_replica_lag_records gauge");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_lag_records{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.last_sent_lsn.saturating_sub(r.last_acked_lsn)
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_apply_errors_total Replica-reported WAL apply errors since process start."
);
let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_apply_errors_total{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.apply_error_count
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_divergence_total Replica-reported WAL divergence errors since process start."
);
let _ = writeln!(body, "# TYPE reddb_replica_divergence_total counter");
for r in &replicas {
let _ = writeln!(
body,
"reddb_replica_divergence_total{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
r.divergence_count
);
}
let _ = writeln!(
body,
"# HELP reddb_replica_lag_seconds Wall-clock seconds since the replica was last seen."
);
let _ = writeln!(body, "# TYPE reddb_replica_lag_seconds gauge");
let _ = writeln!(
body,
"# HELP reddb_slo_lag_budget_remaining_seconds Remaining per-replica lag budget; negative means SLO breach."
);
let _ = writeln!(body, "# TYPE reddb_slo_lag_budget_remaining_seconds gauge");
for r in &replicas {
let lag_ms = (now_ms as u128).saturating_sub(r.last_seen_at_unix_ms);
let lag_secs = (lag_ms as f64) / 1000.0;
let _ = writeln!(
body,
"reddb_replica_lag_seconds{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
lag_secs
);
let _ = writeln!(
body,
"reddb_slo_lag_budget_remaining_seconds{{replica_id=\"{}\"}} {}",
sanitize_label(&r.id),
replica_lag_budget_secs - lag_secs
);
}
}
self.runtime.refresh_replication_flow_control();
let flow = self.runtime.write_gate().flow_control();
let _ = writeln!(
body,
"# HELP reddb_replication_flow_control_throttled 1 when write admission is throttled by in-quorum replica lag."
);
let _ = writeln!(
body,
"# TYPE reddb_replication_flow_control_throttled gauge"
);
let _ = writeln!(
body,
"reddb_replication_flow_control_throttled {}",
u8::from(flow.is_throttled())
);
let _ = writeln!(
body,
"# HELP reddb_replication_flow_control_soft_target_lsn Soft target lag (LSN records) above which writes throttle; 0 disables."
);
let _ = writeln!(
body,
"# TYPE reddb_replication_flow_control_soft_target_lsn gauge"
);
let _ = writeln!(
body,
"reddb_replication_flow_control_soft_target_lsn {}",
flow.soft_target_lsn()
);
let _ = writeln!(
body,
"# HELP reddb_replication_flow_control_in_quorum_lag_lsn Most recent max lag (LSN records) across in-quorum replicas; excludes async read-replicas."
);
let _ = writeln!(
body,
"# TYPE reddb_replication_flow_control_in_quorum_lag_lsn gauge"
);
let _ = writeln!(
body,
"reddb_replication_flow_control_in_quorum_lag_lsn {}",
flow.observed_lag_lsn()
);
let _ = writeln!(
body,
"# HELP reddb_replica_apply_errors_total Replica WAL apply errors since process start, by kind."
);
let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
for (kind, count) in self.runtime.replica_apply_error_counts() {
let _ = writeln!(
body,
"reddb_replica_apply_errors_total{{kind=\"{}\"}} {}",
kind.label(),
count
);
}
if let Some(health) = self.runtime.replica_apply_health() {
let _ = writeln!(
body,
"# HELP reddb_replica_apply_health Replica apply state (label, value=1)."
);
let _ = writeln!(body, "# TYPE reddb_replica_apply_health gauge");
let _ = writeln!(
body,
"reddb_replica_apply_health{{state=\"{}\"}} 1",
sanitize_label(&health)
);
}
self.runtime.quota_bucket().evict_idle();
let rejections = self.runtime.quota_bucket().rejection_snapshot();
if !rejections.is_empty() {
let _ = writeln!(
body,
"# HELP reddb_quota_rejected_total Requests rejected by per-caller QPS quota."
);
let _ = writeln!(body, "# TYPE reddb_quota_rejected_total counter");
for (principal, count) in &rejections {
let _ = writeln!(
body,
"reddb_quota_rejected_total{{principal=\"{}\"}} {}",
sanitize_label(principal),
count
);
}
}
let (reached, timed_out, not_required, last_micros) =
self.runtime.commit_waiter_metrics_snapshot();
let _ = writeln!(
body,
"# HELP reddb_commit_wait_total Commit-wait outcomes by kind."
);
let _ = writeln!(body, "# TYPE reddb_commit_wait_total counter");
let _ = writeln!(
body,
"reddb_commit_wait_total{{outcome=\"reached\"}} {}",
reached
);
let _ = writeln!(
body,
"reddb_commit_wait_total{{outcome=\"timed_out\"}} {}",
timed_out
);
let _ = writeln!(
body,
"reddb_commit_wait_total{{outcome=\"not_required\"}} {}",
not_required
);
let _ = writeln!(
body,
"# HELP reddb_commit_wait_last_seconds Wall-clock seconds of the most recent commit wait."
);
let _ = writeln!(body, "# TYPE reddb_commit_wait_last_seconds gauge");
let _ = writeln!(
body,
"reddb_commit_wait_last_seconds {}",
(last_micros as f64) / 1_000_000.0
);
let _ = writeln!(
body,
"# HELP reddb_commit_watermark_lsn Highest LSN durable on the active synchronous commit quorum."
);
let _ = writeln!(body, "# TYPE reddb_commit_watermark_lsn gauge");
let _ = writeln!(
body,
"reddb_commit_watermark_lsn {}",
self.runtime.commit_watermark()
);
let policy = self.runtime.commit_policy();
let _ = writeln!(
body,
"# HELP reddb_primary_commit_policy Active commit policy on the primary."
);
let _ = writeln!(body, "# TYPE reddb_primary_commit_policy gauge");
let _ = writeln!(
body,
"reddb_primary_commit_policy{{policy=\"{}\"}} 1",
policy.label()
);
let blob_ns = "runtime.result_cache";
let _ = writeln!(
body,
"# HELP reddb_cache_blob_get_total Blob Cache get outcomes by namespace."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_get_total counter");
let _ = writeln!(
body,
"reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l1\"}} {}",
blob_ns,
result_blob_stats.hits()
);
let _ = writeln!(
body,
"reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l2\"}} 0",
blob_ns
);
let _ = writeln!(
body,
"reddb_cache_blob_get_total{{namespace=\"{}\",result=\"miss\"}} {}",
blob_ns,
result_blob_stats.misses()
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_put_total Blob Cache put outcomes by namespace."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_put_total counter");
let _ = writeln!(
body,
"reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"ok\"}} {}",
blob_ns,
result_blob_stats.insertions()
);
let _ = writeln!(
body,
"reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"version_mismatch\"}} {}",
blob_ns,
result_blob_stats.version_mismatches()
);
let _ = writeln!(
body,
"reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"too_large\"}} 0",
blob_ns
);
let _ = writeln!(
body,
"reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"metadata_too_large\"}} 0",
blob_ns
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_invalidate_total Blob Cache invalidations by namespace and kind."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_invalidate_total counter");
for (kind, count) in [
("key", 0),
("prefix", 0),
("tag", 0),
("dependency", result_blob_stats.invalidations()),
("namespace", result_blob_stats.namespace_flushes()),
] {
let _ = writeln!(
body,
"reddb_cache_blob_invalidate_total{{namespace=\"{}\",kind=\"{}\"}} {}",
blob_ns, kind, count
);
}
let _ = writeln!(
body,
"# HELP reddb_cache_blob_evict_total Blob Cache evictions by namespace and reason."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_evict_total counter");
for (reason, count) in [
("capacity", result_blob_stats.evictions()),
("expiry", result_blob_stats.expirations()),
("policy", 0),
] {
let _ = writeln!(
body,
"reddb_cache_blob_evict_total{{namespace=\"{}\",reason=\"{}\"}} {}",
blob_ns, reason, count
);
}
let _ = writeln!(
body,
"# HELP reddb_cache_blob_l1_bytes_in_use L1 bytes currently used by Blob Cache namespace."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_bytes_in_use gauge");
let _ = writeln!(
body,
"reddb_cache_blob_l1_bytes_in_use{{namespace=\"{}\"}} {}",
blob_ns,
result_blob_stats.bytes_in_use()
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_l1_entries L1 entries currently held by Blob Cache namespace."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_entries gauge");
let _ = writeln!(
body,
"reddb_cache_blob_l1_entries{{namespace=\"{}\"}} {}",
blob_ns,
result_blob_stats.entries()
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_l2_bytes_in_use L2 bytes currently used by Blob Cache namespace."
);
let _ = writeln!(body, "# TYPE reddb_cache_blob_l2_bytes_in_use gauge");
let _ = writeln!(
body,
"reddb_cache_blob_l2_bytes_in_use{{namespace=\"{}\"}} {}",
blob_ns,
result_blob_stats.l2_bytes_in_use()
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_l2_full_rejections_total Blob Cache puts rejected because L2 is full."
);
let _ = writeln!(
body,
"# TYPE reddb_cache_blob_l2_full_rejections_total counter"
);
let _ = writeln!(
body,
"reddb_cache_blob_l2_full_rejections_total{{namespace=\"{}\"}} {}",
blob_ns,
result_blob_stats.l2_full_rejections()
);
let _ = writeln!(
body,
"# HELP reddb_cache_blob_version_mismatch_total Blob Cache CAS version mismatches by namespace."
);
let _ = writeln!(
body,
"# TYPE reddb_cache_blob_version_mismatch_total counter"
);
let _ = writeln!(
body,
"reddb_cache_blob_version_mismatch_total{{namespace=\"{}\"}} {}",
blob_ns,
result_blob_stats.version_mismatches()
);
let _ = writeln!(
body,
"# HELP reddb_kv_ops_total Normal-KV operations since process start."
);
let _ = writeln!(body, "# TYPE reddb_kv_ops_total counter");
for (verb, count) in [
("put", kv_stats.puts),
("get", kv_stats.gets),
("delete", kv_stats.deletes),
("incr", kv_stats.incrs),
] {
let _ = writeln!(body, "reddb_kv_ops_total{{verb=\"{}\"}} {}", verb, count);
}
let _ = writeln!(
body,
"# HELP reddb_kv_cas_total Normal-KV CAS outcomes since process start."
);
let _ = writeln!(body, "# TYPE reddb_kv_cas_total counter");
let _ = writeln!(
body,
"reddb_kv_cas_total{{outcome=\"success\"}} {}",
kv_stats.cas_success
);
let _ = writeln!(
body,
"reddb_kv_cas_total{{outcome=\"conflict\"}} {}",
kv_stats.cas_conflict
);
let _ = writeln!(
body,
"# HELP reddb_kv_watch_streams_active Active normal-KV WATCH streams."
);
let _ = writeln!(body, "# TYPE reddb_kv_watch_streams_active gauge");
let _ = writeln!(
body,
"reddb_kv_watch_streams_active {}",
kv_stats.watch_streams_active
);
let _ = writeln!(
body,
"# HELP reddb_kv_watch_events_emitted_total Normal-KV WATCH events emitted since process start."
);
let _ = writeln!(body, "# TYPE reddb_kv_watch_events_emitted_total counter");
let _ = writeln!(
body,
"reddb_kv_watch_events_emitted_total {}",
kv_stats.watch_events_emitted
);
let _ = writeln!(
body,
"# HELP reddb_kv_watch_drops_total Normal-KV WATCH events dropped by bounded subscriber buffers."
);
let _ = writeln!(body, "# TYPE reddb_kv_watch_drops_total counter");
let _ = writeln!(body, "reddb_kv_watch_drops_total {}", kv_stats.watch_drops);
let _ = writeln!(
body,
"# HELP reddb_db_size_bytes On-disk size of the primary database file."
);
let _ = writeln!(body, "# TYPE reddb_db_size_bytes gauge");
let _ = writeln!(body, "reddb_db_size_bytes {}", db_size_bytes);
if let Some(secs) = cold_start_secs {
let _ = writeln!(
body,
"# HELP reddb_cold_start_duration_seconds Seconds from process start to /health/ready 200."
);
let _ = writeln!(body, "# TYPE reddb_cold_start_duration_seconds gauge");
let _ = writeln!(body, "reddb_cold_start_duration_seconds {}", secs);
}
let phases = lifecycle.cold_start_phases().durations_ms();
if !phases.is_empty() {
let _ = writeln!(
body,
"# HELP reddb_cold_start_phase_seconds Per-phase cold-start duration."
);
let _ = writeln!(body, "# TYPE reddb_cold_start_phase_seconds gauge");
for (name, dur_ms) in phases {
let _ = writeln!(
body,
"reddb_cold_start_phase_seconds{{phase=\"{}\"}} {}",
name,
(dur_ms as f64) / 1000.0
);
}
}
let limits = self.runtime.resource_limits();
if let Some(v) = limits.max_db_size_bytes {
let _ = writeln!(
body,
"# HELP reddb_limit_db_size_bytes Operator-pinned cap on the primary DB file size."
);
let _ = writeln!(body, "# TYPE reddb_limit_db_size_bytes gauge");
let _ = writeln!(body, "reddb_limit_db_size_bytes {}", v);
}
if let Some(v) = limits.max_connections {
let _ = writeln!(body, "# TYPE reddb_limit_connections gauge");
let _ = writeln!(body, "reddb_limit_connections {}", v);
}
if let Some(v) = limits.max_qps {
let _ = writeln!(body, "# TYPE reddb_limit_qps gauge");
let _ = writeln!(body, "reddb_limit_qps {}", v);
}
if let Some(v) = limits.max_batch_size {
let _ = writeln!(body, "# TYPE reddb_limit_batch_size gauge");
let _ = writeln!(body, "reddb_limit_batch_size {}", v);
}
if let Some(v) = limits.max_memory_bytes {
let _ = writeln!(body, "# TYPE reddb_limit_memory_bytes gauge");
let _ = writeln!(body, "reddb_limit_memory_bytes {}", v);
}
{
let queue_telemetry = self.runtime.queue_telemetry_snapshot();
let _ = writeln!(
body,
"# HELP queue_delivered_total Messages handed to a consumer (per queue/group/mode)."
);
let _ = writeln!(body, "# TYPE queue_delivered_total counter");
for ((queue, group, mode), n) in &queue_telemetry.delivered {
let _ = writeln!(
body,
"queue_delivered_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(group),
sanitize_label(mode),
n
);
}
let _ = writeln!(
body,
"# HELP queue_acked_total Messages acknowledged (per queue/group/mode)."
);
let _ = writeln!(body, "# TYPE queue_acked_total counter");
for ((queue, group, mode), n) in &queue_telemetry.acked {
let _ = writeln!(
body,
"queue_acked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(group),
sanitize_label(mode),
n
);
}
let _ = writeln!(
body,
"# HELP queue_nacked_total Messages negatively-acknowledged (per queue/group/mode/outcome)."
);
let _ = writeln!(body, "# TYPE queue_nacked_total counter");
for ((queue, group, mode, outcome), n) in &queue_telemetry.nacked {
let _ = writeln!(
body,
"queue_nacked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\",outcome=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(group),
sanitize_label(mode),
outcome,
n
);
}
let pending = self.runtime.queue_pending_counts();
let _ = writeln!(
body,
"# HELP queue_pending_gauge In-flight (delivered, not yet acked) messages per queue/group."
);
let _ = writeln!(body, "# TYPE queue_pending_gauge gauge");
for ((queue, group), n) in &pending {
let _ = writeln!(
body,
"queue_pending_gauge{{queue=\"{}\",group=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(group),
n
);
}
let render_wait_counter =
|body: &mut String, name: &str, help: &str, samples: &[((String, String), u64)]| {
let _ = writeln!(body, "# HELP {} {}", name, help);
let _ = writeln!(body, "# TYPE {} counter", name);
for ((scope, queue), n) in samples {
let _ = writeln!(
body,
"{}{{queue=\"{}\",scope=\"{}\"}} {}",
name,
sanitize_label(queue),
sanitize_label(scope),
n
);
}
};
render_wait_counter(
&mut body,
"queue_wait_started_total",
"QUEUE READ ... WAIT lifecycles that entered the park loop.",
&queue_telemetry.wait_started,
);
render_wait_counter(
&mut body,
"queue_wait_woken_total",
"QUEUE READ ... WAIT lifecycles that resolved by wake + delivery.",
&queue_telemetry.wait_woken,
);
render_wait_counter(
&mut body,
"queue_wait_timed_out_total",
"QUEUE READ ... WAIT lifecycles that resolved by WAIT budget expiry.",
&queue_telemetry.wait_timed_out,
);
render_wait_counter(
&mut body,
"queue_wait_cancelled_total",
"QUEUE READ ... WAIT lifecycles that resolved by registry cancellation.",
&queue_telemetry.wait_cancelled,
);
let _ = writeln!(
body,
"# HELP queue_wait_duration_ms Wall-clock duration of QUEUE READ ... WAIT park lifecycles, milliseconds."
);
let _ = writeln!(body, "# TYPE queue_wait_duration_ms histogram");
for ((scope, queue), hist) in &queue_telemetry.wait_duration {
for (i, upper) in crate::runtime::queue_telemetry::WAIT_DURATION_BUCKETS_MS
.iter()
.enumerate()
{
let count = hist.bucket_counts.get(i).copied().unwrap_or(0);
let _ = writeln!(
body,
"queue_wait_duration_ms_bucket{{queue=\"{}\",scope=\"{}\",le=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(scope),
upper,
count
);
}
let _ = writeln!(
body,
"queue_wait_duration_ms_bucket{{queue=\"{}\",scope=\"{}\",le=\"+Inf\"}} {}",
sanitize_label(queue),
sanitize_label(scope),
hist.count
);
let _ = writeln!(
body,
"queue_wait_duration_ms_sum{{queue=\"{}\",scope=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(scope),
hist.sum_ms
);
let _ = writeln!(
body,
"queue_wait_duration_ms_count{{queue=\"{}\",scope=\"{}\"}} {}",
sanitize_label(queue),
sanitize_label(scope),
hist.count
);
}
}
{
use crate::runtime::impl_queue::{
EVENTS_DLQ_TOTAL, EVENTS_DRAIN_RETRIES_TOTAL, EVENTS_ENQUEUED_TOTAL,
};
let enqueued = EVENTS_ENQUEUED_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
let retries = EVENTS_DRAIN_RETRIES_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
let dlq_total = EVENTS_DLQ_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
let _ = writeln!(
body,
"# HELP reddb_events_enqueued_total Total events successfully pushed to target queues."
);
let _ = writeln!(body, "# TYPE reddb_events_enqueued_total counter");
let _ = writeln!(body, "reddb_events_enqueued_total {enqueued}");
let _ = writeln!(
body,
"# HELP reddb_events_drain_retries_total Total event push failures that triggered DLQ routing."
);
let _ = writeln!(body, "# TYPE reddb_events_drain_retries_total counter");
let _ = writeln!(
body,
"reddb_events_drain_retries_total{{reason=\"queue_full\"}} {retries}"
);
let _ = writeln!(
body,
"# HELP reddb_events_dlq_total Total events routed to dead-letter queues."
);
let _ = writeln!(body, "# TYPE reddb_events_dlq_total counter");
let _ = writeln!(body, "reddb_events_dlq_total {dlq_total}");
}
crate::runtime::ai::metrics::render_ai_metrics(&mut body);
{
let (hits, misses, evicts) = self.runtime.result_cache_metrics();
let _ = writeln!(
body,
"# HELP reddb_result_cache_hit_total Result cache hits (incl. graph-analytics TVFs)."
);
let _ = writeln!(body, "# TYPE reddb_result_cache_hit_total counter");
let _ = writeln!(body, "reddb_result_cache_hit_total {hits}");
let _ = writeln!(
body,
"# HELP reddb_result_cache_miss_total Result cache misses (cold computes)."
);
let _ = writeln!(body, "# TYPE reddb_result_cache_miss_total counter");
let _ = writeln!(body, "reddb_result_cache_miss_total {misses}");
let _ = writeln!(
body,
"# HELP reddb_result_cache_evict_total Result cache entries evicted by capacity."
);
let _ = writeln!(body, "# TYPE reddb_result_cache_evict_total counter");
let _ = writeln!(body, "reddb_result_cache_evict_total {evicts}");
}
self.http_metrics().render(&mut body, self.http_limiter());
HttpResponse {
status: 200,
content_type: "text/plain; version=0.0.4",
body: body.into_bytes(),
extra_headers: Vec::new(),
}
}
pub(crate) fn handle_admin_status(&self) -> HttpResponse {
let lifecycle = self.runtime.lifecycle();
let phase = lifecycle.phase();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
let read_only = self.runtime.write_gate().is_read_only();
let role = match self.runtime.write_gate().role() {
crate::replication::ReplicationRole::Standalone => "standalone",
crate::replication::ReplicationRole::Primary => "primary",
crate::replication::ReplicationRole::Replica { .. } => "replica",
};
let db = self.runtime.db();
let db_size_bytes = db
.path()
.and_then(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.unwrap_or(0);
let backend_kind = db
.options()
.remote_backend
.as_ref()
.map(|b| b.name().to_string());
let mut object = Map::new();
object.insert(
"version".to_string(),
JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
);
object.insert(
"phase".to_string(),
JsonValue::String(phase.as_str().to_string()),
);
object.insert(
"uptime_secs".to_string(),
JsonValue::Number((uptime_secs * 1000.0).round() / 1000.0),
);
object.insert(
"started_at_unix_ms".to_string(),
JsonValue::Number(lifecycle.started_at_ms() as f64),
);
if let Some(ready_at) = lifecycle.ready_at_ms() {
object.insert(
"ready_at_unix_ms".to_string(),
JsonValue::Number(ready_at as f64),
);
}
object.insert(
"db_size_bytes".to_string(),
JsonValue::Number(db_size_bytes as f64),
);
object.insert("read_only".to_string(), JsonValue::Bool(read_only));
object.insert(
"replication_role".to_string(),
JsonValue::String(role.to_string()),
);
object.insert(
"writer_lease".to_string(),
JsonValue::String(self.runtime.write_gate().lease_state().label().to_string()),
);
let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
let mut enc_obj = Map::new();
enc_obj.insert(
"state".to_string(),
JsonValue::String(enc_state.to_string()),
);
if let Some(err) = enc_error {
enc_obj.insert("error".to_string(), JsonValue::String(err));
}
object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc_obj));
let backup = self.runtime.backup_status();
let mut backup_obj = Map::new();
if let Some(last) = backup.last_backup.as_ref() {
backup_obj.insert(
"last_success_unix_ms".to_string(),
JsonValue::Number(last.timestamp as f64),
);
backup_obj.insert(
"last_duration_ms".to_string(),
JsonValue::Number(last.duration_ms as f64),
);
backup_obj.insert(
"age_seconds".to_string(),
JsonValue::Number(((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0),
);
}
backup_obj.insert(
"total_successes".to_string(),
JsonValue::Number(backup.total_backups as f64),
);
backup_obj.insert(
"total_failures".to_string(),
JsonValue::Number(backup.total_failures as f64),
);
backup_obj.insert(
"interval_secs".to_string(),
JsonValue::Number(backup.interval_secs as f64),
);
object.insert("backup".to_string(), JsonValue::Object(backup_obj));
let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
let mut wal_obj = Map::new();
wal_obj.insert(
"current_lsn".to_string(),
JsonValue::Number(current_lsn as f64),
);
wal_obj.insert(
"last_archived_lsn".to_string(),
JsonValue::Number(last_archived_lsn as f64),
);
wal_obj.insert(
"archive_lag_records".to_string(),
JsonValue::Number(current_lsn.saturating_sub(last_archived_lsn) as f64),
);
object.insert("wal".to_string(), JsonValue::Object(wal_obj));
let mut replica_obj = Map::new();
if let Some(health) = self.runtime.replica_apply_health() {
replica_obj.insert("apply_health".to_string(), JsonValue::String(health));
}
let mut errors_obj = Map::new();
for (kind, count) in self.runtime.replica_apply_error_counts() {
errors_obj.insert(kind.label().to_string(), JsonValue::Number(count as f64));
}
replica_obj.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
let snaps = self.runtime.primary_replica_snapshots();
if !snaps.is_empty() {
let arr: Vec<JsonValue> = snaps
.iter()
.map(|r| {
let mut o = Map::new();
o.insert("id".to_string(), JsonValue::String(r.id.clone()));
o.insert(
"last_acked_lsn".to_string(),
JsonValue::Number(r.last_acked_lsn as f64),
);
o.insert(
"last_sent_lsn".to_string(),
JsonValue::Number(r.last_sent_lsn as f64),
);
o.insert(
"last_durable_lsn".to_string(),
JsonValue::Number(r.last_durable_lsn as f64),
);
o.insert(
"last_seen_at_unix_ms".to_string(),
JsonValue::Number(r.last_seen_at_unix_ms as f64),
);
o.insert(
"lag_records".to_string(),
JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
);
if let Some(region) = &r.region {
o.insert("region".to_string(), JsonValue::String(region.clone()));
}
JsonValue::Object(o)
})
.collect();
replica_obj.insert("primary_view".to_string(), JsonValue::Array(arr));
}
replica_obj.insert(
"commit_policy".to_string(),
JsonValue::String(self.runtime.commit_policy().label().to_string()),
);
let durable = self.runtime.commit_waiter_snapshot();
if !durable.is_empty() {
let arr: Vec<JsonValue> = durable
.into_iter()
.map(|(id, lsn)| {
let mut o = Map::new();
o.insert("replica_id".to_string(), JsonValue::String(id));
o.insert("durable_lsn".to_string(), JsonValue::Number(lsn as f64));
JsonValue::Object(o)
})
.collect();
replica_obj.insert("durable_view".to_string(), JsonValue::Array(arr));
}
object.insert("replica".to_string(), JsonValue::Object(replica_obj));
if let Some(backend) = backend_kind {
object.insert("remote_backend".to_string(), JsonValue::String(backend));
}
let limits = self.runtime.resource_limits();
let mut limits_obj = Map::new();
if let Some(v) = limits.max_db_size_bytes {
limits_obj.insert("max_db_size_bytes".to_string(), JsonValue::Number(v as f64));
}
if let Some(v) = limits.max_connections {
limits_obj.insert("max_connections".to_string(), JsonValue::Number(v as f64));
}
if let Some(v) = limits.max_qps {
limits_obj.insert("max_qps".to_string(), JsonValue::Number(v as f64));
}
if let Some(v) = limits.max_batch_size {
limits_obj.insert("max_batch_size".to_string(), JsonValue::Number(v as f64));
}
if let Some(v) = limits.max_memory_bytes {
limits_obj.insert("max_memory_bytes".to_string(), JsonValue::Number(v as f64));
}
if let Some(d) = limits.max_query_duration {
limits_obj.insert(
"max_query_duration_ms".to_string(),
JsonValue::Number(d.as_millis() as f64),
);
}
if let Some(v) = limits.max_result_bytes {
limits_obj.insert("max_result_bytes".to_string(), JsonValue::Number(v as f64));
}
object.insert("limits".to_string(), JsonValue::Object(limits_obj));
if let Some(report) = lifecycle.shutdown_report() {
let mut shutdown_obj = Map::new();
shutdown_obj.insert(
"duration_ms".to_string(),
JsonValue::Number(report.duration_ms as f64),
);
shutdown_obj.insert(
"flushed_wal".to_string(),
JsonValue::Bool(report.flushed_wal),
);
shutdown_obj.insert(
"backup_uploaded".to_string(),
JsonValue::Bool(report.backup_uploaded),
);
object.insert("shutdown".to_string(), JsonValue::Object(shutdown_obj));
}
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_cluster_status(&self) -> HttpResponse {
use crate::presentation::cluster_status_json::{
cluster_status_json, ClusterStatusInputs, ConnectionSnapshot, DeploymentShapeView,
ProcessRoleView, ReplicaView, ReplicationSnapshot, StorageSnapshot, SystemSnapshot,
TransportListenerView, TransportSnapshot, WalSnapshot,
};
let lifecycle = self.runtime.lifecycle();
let phase = lifecycle.phase();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
let role_view = match self.runtime.write_gate().role() {
crate::replication::ReplicationRole::Standalone => ProcessRoleView::Standalone,
crate::replication::ReplicationRole::Primary => ProcessRoleView::Primary,
crate::replication::ReplicationRole::Replica { .. } => ProcessRoleView::Replica,
};
let db = self.runtime.db();
let db_size_bytes = db
.path()
.and_then(|p| std::fs::metadata(p).ok())
.map(|m| m.len());
let remote_backend = db
.options()
.remote_backend
.as_ref()
.map(|b| b.name().to_string());
let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
let active = self
.options
.transport_readiness
.active
.iter()
.map(|l| TransportListenerView {
transport: l.transport.clone(),
bind_addr: l.bind_addr.clone(),
explicit: l.explicit,
reason: None,
})
.collect();
let failed = self
.options
.transport_readiness
.failed
.iter()
.map(|l| TransportListenerView {
transport: l.transport.clone(),
bind_addr: l.bind_addr.clone(),
explicit: l.explicit,
reason: Some(l.reason.clone()),
})
.collect();
let runtime_stats = self.runtime.stats();
let limits = self.runtime.resource_limits();
let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
let system = &runtime_stats.system;
let system_view = SystemSnapshot {
pid: system.pid,
cpu_cores: system.cpu_cores,
os: system.os.clone(),
arch: system.arch.clone(),
hostname: system.hostname.clone(),
total_memory_bytes: if system.total_memory_bytes == 0 {
None
} else {
Some(system.total_memory_bytes)
},
available_memory_bytes: if system.available_memory_bytes == 0 {
None
} else {
Some(system.available_memory_bytes)
},
};
let replicas = self
.runtime
.primary_replica_snapshots()
.into_iter()
.map(|r| ReplicaView {
id: r.id,
last_acked_lsn: r.last_acked_lsn,
last_sent_lsn: r.last_sent_lsn,
last_durable_lsn: r.last_durable_lsn,
last_seen_at_unix_ms: r.last_seen_at_unix_ms,
region: r.region,
})
.collect();
let apply_errors = self
.runtime
.replica_apply_error_counts()
.iter()
.map(|(kind, count)| (kind.label().to_string(), *count))
.collect();
let inputs = ClusterStatusInputs {
snapshot_at_unix_ms: now_ms,
version: env!("CARGO_PKG_VERSION").to_string(),
phase: phase.as_str().to_string(),
uptime_secs,
started_at_unix_ms: lifecycle.started_at_ms(),
ready_at_unix_ms: lifecycle.ready_at_ms(),
read_only: self.runtime.write_gate().is_read_only(),
deployment_shape: DeploymentShapeView::Server,
process_role: role_view,
transport: TransportSnapshot { active, failed },
connections: ConnectionSnapshot {
active: runtime_stats.active_connections as u64,
idle: runtime_stats.idle_connections as u64,
total_checkouts: runtime_stats.total_checkouts,
max: limits.max_connections,
},
storage: StorageSnapshot {
db_size_bytes,
remote_backend,
encryption_state: enc_state.to_string(),
encryption_error: enc_error,
paged_mode: runtime_stats.paged_mode,
},
wal: WalSnapshot {
current_lsn,
last_archived_lsn,
},
system: system_view,
replication: ReplicationSnapshot {
role: role_view,
commit_policy: self.runtime.commit_policy().label().to_string(),
replicas,
apply_health: self.runtime.replica_apply_health(),
apply_errors,
},
};
json_response(200, cluster_status_json(&inputs))
}
pub(crate) fn handle_capabilities(&self) -> HttpResponse {
use crate::api::Capability;
use crate::presentation::capabilities_json::{
system_capabilities_json, CapabilityState, SystemCapabilitiesInputs,
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let db = self.runtime.db();
let opts = db.options();
let build = [
Capability::Table,
Capability::Graph,
Capability::Vector,
Capability::FullText,
Capability::Security,
Capability::Encryption,
]
.into_iter()
.map(|c| (c.as_str().to_string(), opts.has_capability(c)))
.collect();
let simd_level = Some(
match crate::storage::engine::simd_distance::simd_level() {
crate::storage::engine::simd_distance::SimdLevel::Scalar => "scalar",
crate::storage::engine::simd_distance::SimdLevel::Sse => "sse",
crate::storage::engine::simd_distance::SimdLevel::Avx => "avx",
crate::storage::engine::simd_distance::SimdLevel::AvxFma => "avx_fma",
}
.to_string(),
);
let ai_providers = [
crate::ai::AiProvider::OpenAi,
crate::ai::AiProvider::Anthropic,
crate::ai::AiProvider::Groq,
crate::ai::AiProvider::OpenRouter,
crate::ai::AiProvider::Together,
crate::ai::AiProvider::Venice,
crate::ai::AiProvider::Ollama,
crate::ai::AiProvider::DeepSeek,
crate::ai::AiProvider::HuggingFace,
crate::ai::AiProvider::Local,
]
.iter()
.map(|p| p.token().to_string())
.collect();
let mut ai_model_names: Vec<String> = self
.collect_ai_model_entries()
.into_iter()
.map(|(name, _)| name)
.collect();
ai_model_names.sort();
let auth_enabled = self
.auth_store
.as_ref()
.map(|s| s.is_enabled())
.unwrap_or(false);
let on = |b: bool| {
if b {
CapabilityState::Supported
} else {
CapabilityState::Disabled
}
};
let oauth_configured = self.runtime.oauth_validator().is_some()
|| self
.auth_store
.as_ref()
.map(|s| s.config().oauth.enabled)
.unwrap_or(false);
let mtls_configured = self
.auth_store
.as_ref()
.map(|s| s.config().cert.enabled)
.unwrap_or(false);
let admin_token_configured = super::routing::read_admin_token().is_some();
let auth_methods = vec![
("password".to_string(), on(auth_enabled)),
("bearer".to_string(), on(auth_enabled)),
("oauth_jwt".to_string(), on(oauth_configured)),
("mtls".to_string(), on(mtls_configured)),
("admin_token".to_string(), on(admin_token_configured)),
];
let replication_role = match self.runtime.write_gate().role() {
crate::replication::ReplicationRole::Standalone => "standalone",
crate::replication::ReplicationRole::Primary => "primary",
crate::replication::ReplicationRole::Replica { .. } => "replica",
}
.to_string();
let commit_policy = self.runtime.commit_policy().label().to_string();
let read_only = self.runtime.write_gate().is_read_only();
let transports_active = self
.options
.transport_readiness
.active
.iter()
.map(|l| l.transport.clone())
.collect();
let transports_failed = self
.options
.transport_readiness
.failed
.iter()
.map(|l| (l.transport.clone(), l.reason.clone()))
.collect();
let preview_features = Vec::new();
let inputs = SystemCapabilitiesInputs {
snapshot_at_unix_ms: now_ms,
version: env!("CARGO_PKG_VERSION").to_string(),
build,
simd_level,
ai_model_names,
ai_providers,
auth_enabled,
auth_methods,
replication_role,
commit_policy,
read_only,
transports_active,
transports_failed,
preview_features,
};
json_response(200, system_capabilities_json(&inputs))
}
pub(crate) fn handle_auth_capabilities(
&self,
headers: &std::collections::BTreeMap<String, String>,
) -> HttpResponse {
use crate::presentation::capabilities_json::{
principal_capabilities_json, PrincipalCapabilitiesInputs,
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let auth_enabled = self
.auth_store
.as_ref()
.map(|s| s.is_enabled())
.unwrap_or(false);
let read_only = self.runtime.write_gate().is_read_only();
let bearer = headers
.get("authorization")
.and_then(|v| v.strip_prefix("Bearer "))
.map(str::trim)
.filter(|t| !t.is_empty());
let mut authenticated = false;
let mut tenant: Option<String> = None;
let mut role: Option<crate::auth::Role> = None;
let mut principal = super::routing::principal_for(headers);
if auth_enabled {
if let (Some(token), Some(store)) = (bearer, self.auth_store.as_ref()) {
if let Some((uid, r)) = store.validate_token_full(token) {
authenticated = true;
tenant = uid.tenant.clone();
role = Some(r);
principal = uid.to_string();
} else if let super::routing::BearerOutcome::Valid(r) =
super::routing::resolve_bearer_role(token, &self.runtime, store)
{
authenticated = true;
role = Some(r);
}
}
}
let (can_read, can_write, can_admin) = if !auth_enabled {
(true, true, true)
} else if let Some(r) = role {
(r.can_read(), r.can_write(), r.can_admin())
} else {
let require_auth = self
.auth_store
.as_ref()
.map(|s| s.config().require_auth)
.unwrap_or(true);
(!require_auth, false, false)
};
let inputs = PrincipalCapabilitiesInputs {
snapshot_at_unix_ms: now_ms,
auth_enabled,
authenticated,
principal,
tenant,
role: role.map(|r| r.as_str().to_string()),
read_only,
can_read,
can_write,
can_admin,
};
json_response(200, principal_capabilities_json(&inputs))
}
pub(crate) fn handle_admin_failover_promote(&self, body: Vec<u8>) -> HttpResponse {
if !matches!(
self.runtime.write_gate().role(),
crate::replication::ReplicationRole::Replica { .. }
) {
let reason = "promotion only allowed on a replica (current role is not Replica)";
if let Err(err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::FailoverPromotion,
crate::runtime::control_events::Outcome::Denied,
"failover_promote",
Some("replication:role".to_string()),
Some(reason.to_string()),
Vec::new(),
) {
return json_error(500, err.to_string());
}
return json_error(409, reason);
}
let Some(backend) = self.runtime.db().options().remote_backend_atomic.clone() else {
let reason = "promotion requires a CAS-capable remote backend (use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true)";
if let Err(err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::FailoverPromotion,
crate::runtime::control_events::Outcome::Denied,
"failover_promote",
Some("replication:backend".to_string()),
Some(reason.to_string()),
Vec::new(),
) {
return json_error(500, err.to_string());
}
return json_error(412, reason);
};
let health = self.runtime.replica_apply_health().unwrap_or_default();
if matches!(
health.as_str(),
"stalled_gap" | "divergence" | "apply_error"
) {
let reason = format!(
"promotion refused — replica apply state is `{health}`; resolve before promoting"
);
if let Err(err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::ReplicationSafety,
crate::runtime::control_events::Outcome::Denied,
"promotion_refused",
Some("replication:apply_health".to_string()),
Some(reason.clone()),
vec![(
"apply_health".to_string(),
crate::runtime::control_events::Sensitivity::raw(health),
)],
) {
return json_error(500, err.to_string());
}
return json_error(409, reason);
}
let (holder_id, ttl_ms) = if body.is_empty() {
(default_holder_id(), 60_000u64)
} else {
match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
Ok(v) => {
let holder = v
.get("holder_id")
.and_then(|n| n.as_str())
.map(|s| s.to_string())
.unwrap_or_else(default_holder_id);
let ttl = v
.get("ttl_ms")
.and_then(|n| n.as_u64())
.filter(|t| *t > 0)
.unwrap_or(60_000);
(holder, ttl)
}
Err(err) => {
let reason = format!("invalid JSON body: {err}");
if let Err(emit_err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::FailoverPromotion,
crate::runtime::control_events::Outcome::Error,
"failover_promote",
Some("replication:request".to_string()),
Some(reason.clone()),
Vec::new(),
) {
return json_error(500, emit_err.to_string());
}
return json_error(400, reason);
}
}
};
let database_key = self
.runtime
.db()
.options()
.remote_key
.clone()
.unwrap_or_else(|| "main".to_string());
let store = crate::replication::LeaseStore::new(backend);
match crate::runtime::lease_lifecycle::admin_promote_lease(
&store,
self.runtime.audit_log(),
&database_key,
&holder_id,
ttl_ms,
) {
Ok(lease) => {
if let Err(err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::FailoverPromotion,
crate::runtime::control_events::Outcome::Allowed,
"failover_promote",
Some(format!("replication:database:{database_key}")),
None,
vec![
(
"holder_id".to_string(),
crate::runtime::control_events::Sensitivity::raw(&lease.holder_id),
),
(
"generation".to_string(),
crate::runtime::control_events::Sensitivity::raw(
lease.generation.to_string(),
),
),
(
"ttl_ms".to_string(),
crate::runtime::control_events::Sensitivity::raw(ttl_ms.to_string()),
),
],
) {
return json_error(500, err.to_string());
}
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert("holder_id".to_string(), JsonValue::String(lease.holder_id));
object.insert(
"generation".to_string(),
JsonValue::Number(lease.generation as f64),
);
object.insert(
"acquired_at_ms".to_string(),
JsonValue::Number(lease.acquired_at_ms as f64),
);
object.insert(
"expires_at_ms".to_string(),
JsonValue::Number(lease.expires_at_ms as f64),
);
object.insert(
"next_step".to_string(),
JsonValue::String(
"restart with RED_REPLICATION_MODE=primary to start accepting writes"
.to_string(),
),
);
json_response(200, JsonValue::Object(object))
}
Err(err) => {
let reason = format!("promotion refused: {err}");
if let Err(emit_err) = self.runtime.emit_control_event(
crate::runtime::control_events::EventKind::FailoverPromotion,
crate::runtime::control_events::Outcome::Denied,
"failover_promote",
Some(format!("replication:database:{database_key}")),
Some(reason.clone()),
vec![
(
"holder_id".to_string(),
crate::runtime::control_events::Sensitivity::raw(holder_id),
),
(
"ttl_ms".to_string(),
crate::runtime::control_events::Sensitivity::raw(ttl_ms.to_string()),
),
],
) {
return json_error(500, emit_err.to_string());
}
json_error(409, reason)
}
}
}
pub(crate) fn handle_admin_audit_query(
&self,
query: &std::collections::BTreeMap<String, String>,
) -> HttpResponse {
use crate::runtime::audit_log::Outcome;
use crate::runtime::audit_query::{
events_to_json_array, parse_time_arg, run_query, AuditQuery,
};
let mut q = AuditQuery::new();
if let Some(s) = query.get("since") {
q.since_ms = parse_time_arg(s);
if q.since_ms.is_none() {
return json_error(400, format!("invalid 'since' value: {s}"));
}
}
if let Some(u) = query.get("until") {
q.until_ms = parse_time_arg(u);
if q.until_ms.is_none() {
return json_error(400, format!("invalid 'until' value: {u}"));
}
}
if let Some(p) = query.get("principal") {
if !p.is_empty() {
q.principal = Some(p.clone());
}
}
if let Some(t) = query.get("tenant") {
if !t.is_empty() {
q.tenant = Some(t.clone());
}
}
if let Some(a) = query.get("action") {
if !a.is_empty() {
q.action_prefix = Some(a.clone());
}
}
if let Some(o) = query.get("outcome") {
if let Some(parsed) = Outcome::parse(o) {
q.outcome = Some(parsed);
} else {
return json_error(
400,
format!("invalid 'outcome' value: {o} (expected success|denied|error)"),
);
}
}
if let Some(l) = query.get("limit") {
match l.parse::<usize>() {
Ok(n) if n > 0 => q.limit = n.min(1000),
_ => return json_error(400, format!("invalid 'limit' value: {l}")),
}
} else {
q.limit = 100;
}
let format = query
.get("format")
.map(|s| s.to_ascii_lowercase())
.unwrap_or_default();
let path = self.runtime.audit_log().path().to_path_buf();
let events = run_query(&path, &q);
if format == "jsonl" || format == "ndjson" {
let mut body = String::new();
for ev in &events {
body.push_str(&ev.to_json_line(None));
body.push('\n');
}
return HttpResponse {
status: 200,
content_type: "application/x-ndjson",
body: body.into_bytes(),
extra_headers: Vec::new(),
};
}
json_response(200, events_to_json_array(&events))
}
pub(crate) fn handle_admin_drain(&self) -> HttpResponse {
self.runtime.lifecycle().mark_draining();
self.runtime.audit_log().record(
"admin/drain",
"operator",
"instance",
"ok",
JsonValue::Null,
);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert(
"phase".to_string(),
JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
);
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_health_live(&self) -> HttpResponse {
let phase = self.runtime.lifecycle().phase();
let alive = !matches!(phase, Phase::Stopped);
let status = if alive { 200 } else { 503 };
let mut object = Map::new();
object.insert(
"status".to_string(),
JsonValue::String(if alive { "alive" } else { "stopped" }.to_string()),
);
object.insert(
"phase".to_string(),
JsonValue::String(phase.as_str().to_string()),
);
json_response(status, JsonValue::Object(object))
}
pub(crate) fn handle_health_ready(&self) -> HttpResponse {
self.health_ready_response("ready")
}
pub(crate) fn handle_health_startup(&self) -> HttpResponse {
self.health_ready_response("startup")
}
fn health_ready_response(&self, probe: &str) -> HttpResponse {
let lifecycle = self.runtime.lifecycle();
let phase = lifecycle.phase();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let started_at = lifecycle.started_at_ms();
let since_secs = (now.saturating_sub(started_at) as f64) / 1000.0;
let mut object = Map::new();
object.insert("probe".to_string(), JsonValue::String(probe.to_string()));
object.insert(
"transport_listeners".to_string(),
self.transport_readiness_json(),
);
object.insert(
"phase".to_string(),
JsonValue::String(phase.as_str().to_string()),
);
object.insert(
"since_secs".to_string(),
JsonValue::Number((since_secs * 1000.0).round() / 1000.0),
);
if let Some(ready_at) = lifecycle.ready_at_ms() {
object.insert(
"ready_at_unix_ms".to_string(),
JsonValue::Number(ready_at as f64),
);
}
if phase.accepts_queries() {
object.insert("status".to_string(), JsonValue::String("ready".to_string()));
json_response(200, JsonValue::Object(object))
} else {
object.insert(
"status".to_string(),
JsonValue::String(phase.as_str().to_string()),
);
if let Some(reason) = lifecycle.not_ready_reason() {
object.insert("reason".to_string(), JsonValue::String(reason));
} else {
object.insert(
"reason".to_string(),
JsonValue::String(match phase {
Phase::Starting => "starting".to_string(),
Phase::ShuttingDown => "shutting_down".to_string(),
Phase::Stopped => "stopped".to_string(),
Phase::Draining => "draining".to_string(),
Phase::Ready => "ready".to_string(),
}),
);
}
json_response(503, JsonValue::Object(object))
}
}
fn iam_audit(&self, action: &str, target: &str, outcome: &str) {
self.runtime
.audit_log()
.record(action, "operator", target, outcome, JsonValue::Null);
}
pub(crate) fn handle_iam_policy_put(&self, id: &str, body: Vec<u8>) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let Ok(text) = std::str::from_utf8(&body) else {
return json_error(400, "body must be utf-8 JSON");
};
let mut policy = match crate::auth::policies::Policy::from_json_str(text) {
Ok(p) => p,
Err(e) => return json_error(400, format!("policy parse: {e}")),
};
if policy.id != id {
policy.id = id.to_string();
}
if let Err(e) = store.put_policy(policy) {
return json_error(400, e.to_string());
}
self.runtime.invalidate_result_cache();
self.iam_audit("iam/policy.put", id, "ok");
let mut obj = Map::new();
obj.insert("ok".to_string(), JsonValue::Bool(true));
obj.insert("id".to_string(), JsonValue::String(id.to_string()));
json_response(200, JsonValue::Object(obj))
}
pub(crate) fn handle_iam_policy_get(&self, id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let Some(p) = store.get_policy(id) else {
return json_error(404, format!("policy `{id}` not found"));
};
let body = p.to_json_string();
HttpResponse {
status: 200,
content_type: "application/json",
body: body.into_bytes(),
extra_headers: Vec::new(),
}
}
pub(crate) fn handle_iam_policy_list(&self) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let pols = store.list_policies();
let items: Vec<JsonValue> = pols
.iter()
.map(|p| {
let mut obj = Map::new();
obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
obj.insert("version".to_string(), JsonValue::Number(p.version as f64));
obj.insert(
"statements".to_string(),
JsonValue::Number(p.statements.len() as f64),
);
obj.insert(
"tenant".to_string(),
p.tenant
.as_deref()
.map(|t| JsonValue::String(t.to_string()))
.unwrap_or(JsonValue::Null),
);
JsonValue::Object(obj)
})
.collect();
let mut envelope = Map::new();
envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
envelope.insert("items".to_string(), JsonValue::Array(items));
json_response(200, JsonValue::Object(envelope))
}
pub(crate) fn handle_iam_policy_actions(&self) -> HttpResponse {
use crate::auth::action_catalog::{LifecycleState, ACTIONS};
let items: Vec<JsonValue> = ACTIONS
.iter()
.map(|entry| {
let (state, replacement, since_version) = match &entry.lifecycle_state {
LifecycleState::Active => ("active", JsonValue::Null, JsonValue::Null),
LifecycleState::Deprecated {
replacement,
since_version,
} => (
"deprecated",
replacement
.map(|r| JsonValue::String(r.to_string()))
.unwrap_or(JsonValue::Null),
JsonValue::String(since_version.to_string()),
),
LifecycleState::Removed => ("removed", JsonValue::Null, JsonValue::Null),
};
let mut obj = Map::new();
obj.insert(
"name".to_string(),
JsonValue::String(entry.name.to_string()),
);
obj.insert(
"category".to_string(),
JsonValue::String(entry.category.as_str().to_string()),
);
obj.insert(
"lifecycle_state".to_string(),
JsonValue::String(state.to_string()),
);
obj.insert("replacement".to_string(), replacement);
obj.insert("since_version".to_string(), since_version);
obj.insert(
"gates_description".to_string(),
JsonValue::String(entry.gates_description.to_string()),
);
JsonValue::Object(obj)
})
.collect();
let mut envelope = Map::new();
envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
envelope.insert("items".to_string(), JsonValue::Array(items));
json_response(200, JsonValue::Object(envelope))
}
pub(crate) fn handle_iam_policy_lint(&self, body: Vec<u8>) -> HttpResponse {
let Ok(text) = std::str::from_utf8(&body) else {
return json_error(400, "body must be utf-8 JSON");
};
let diags = crate::auth::policy_linter::lint(text);
let items: Vec<JsonValue> = diags.iter().map(|d| d.to_json_value()).collect();
let mut envelope = Map::new();
envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
envelope.insert("diagnostics".to_string(), JsonValue::Array(items));
json_response(200, JsonValue::Object(envelope))
}
pub(crate) fn handle_iam_policy_migrate_mode(&self, body: Vec<u8>) -> HttpResponse {
use crate::auth::enforcement_mode::PolicyEnforcementMode;
use crate::auth::migrate_policy_mode::{
principal_label, simulate_migration_delta, MigratePolicyDelta,
};
use crate::auth::policies::ResourceRef;
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
std::str::from_utf8(&body).unwrap_or(""),
) {
Ok(v) => v,
Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
};
let obj = match parsed.as_object() {
Some(o) => o,
None => return json_error(400, "body must be a JSON object"),
};
let target = match obj.get("target").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return json_error(400, "missing `target`"),
};
let dry_run = obj
.get("dry_run")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let parsed_mode = match PolicyEnforcementMode::parse(&target) {
Some(m) => m,
None => {
return json_error(
400,
format!("invalid target `{target}` (expected `policy_only`)"),
);
}
};
if parsed_mode != PolicyEnforcementMode::PolicyOnly {
return json_error(
400,
format!("target `{target}` is not supported — only `policy_only` may be migrated to via this endpoint"),
);
}
let snapshot = self.runtime.catalog();
let resources: Vec<ResourceRef> = snapshot
.collections
.iter()
.map(|c| ResourceRef::new("table", c.name.clone()))
.collect();
let now_ms = crate::utils::now_unix_millis() as u128;
let deltas: Vec<MigratePolicyDelta> =
simulate_migration_delta(store.as_ref(), &resources, now_ms);
let outcome_str = if dry_run {
"dry_run"
} else if deltas.is_empty() {
"applied"
} else {
"refused"
};
self.iam_audit("iam/policy.migrate_mode", &target, outcome_str);
let items: Vec<JsonValue> = deltas
.iter()
.map(|d| {
let mut row = Map::new();
row.insert(
"principal".to_string(),
JsonValue::String(principal_label(&d.principal)),
);
row.insert(
"role".to_string(),
JsonValue::String(d.role.as_str().to_string()),
);
row.insert("action".to_string(), JsonValue::String(d.action.clone()));
row.insert(
"resource_kind".to_string(),
JsonValue::String(d.resource_kind.clone()),
);
row.insert(
"resource_name".to_string(),
JsonValue::String(d.resource_name.clone()),
);
JsonValue::Object(row)
})
.collect();
let mut envelope = Map::new();
envelope.insert("target".to_string(), JsonValue::String(target.clone()));
envelope.insert("dry_run".to_string(), JsonValue::Bool(dry_run));
envelope.insert(
"outcome".to_string(),
JsonValue::String(outcome_str.to_string()),
);
envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
envelope.insert("delta".to_string(), JsonValue::Array(items));
if !dry_run && !deltas.is_empty() {
return json_response(409, JsonValue::Object(envelope));
}
if !dry_run {
store.set_enforcement_mode(parsed_mode);
}
json_response(200, JsonValue::Object(envelope))
}
pub(crate) fn handle_iam_policy_delete(&self, id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
match store.delete_policy(id) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit("iam/policy.drop", id, "ok");
HttpResponse {
status: 204,
content_type: "application/json",
body: Vec::new(),
extra_headers: Vec::new(),
}
}
Err(e) => json_error(404, e.to_string()),
}
}
pub(crate) fn handle_iam_attach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let uid = decode_user_arg(user);
match store.attach_policy(
crate::auth::store::PrincipalRef::User(uid.clone()),
policy_id,
) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit(
"iam/policy.attach",
&format!("user:{uid}::{policy_id}"),
"ok",
);
let mut obj = Map::new();
obj.insert("ok".to_string(), JsonValue::Bool(true));
json_response(200, JsonValue::Object(obj))
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_detach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let uid = decode_user_arg(user);
match store.detach_policy(
crate::auth::store::PrincipalRef::User(uid.clone()),
policy_id,
) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit(
"iam/policy.detach",
&format!("user:{uid}::{policy_id}"),
"ok",
);
HttpResponse {
status: 204,
content_type: "application/json",
body: Vec::new(),
extra_headers: Vec::new(),
}
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_add_user_group(&self, user: &str, group: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let uid = decode_user_arg(user);
match store.add_user_to_group(&uid, group) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit("iam/group.add", &format!("user:{uid}::group:{group}"), "ok");
let mut obj = Map::new();
obj.insert("ok".to_string(), JsonValue::Bool(true));
json_response(200, JsonValue::Object(obj))
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_remove_user_group(&self, user: &str, group: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let uid = decode_user_arg(user);
match store.remove_user_from_group(&uid, group) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit(
"iam/group.remove",
&format!("user:{uid}::group:{group}"),
"ok",
);
HttpResponse {
status: 204,
content_type: "application/json",
body: Vec::new(),
extra_headers: Vec::new(),
}
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_attach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
match store.attach_policy(
crate::auth::store::PrincipalRef::Group(group.to_string()),
policy_id,
) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit(
"iam/policy.attach",
&format!("group:{group}::{policy_id}"),
"ok",
);
let mut obj = Map::new();
obj.insert("ok".to_string(), JsonValue::Bool(true));
json_response(200, JsonValue::Object(obj))
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_detach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
match store.detach_policy(
crate::auth::store::PrincipalRef::Group(group.to_string()),
policy_id,
) {
Ok(()) => {
self.runtime.invalidate_result_cache();
self.iam_audit(
"iam/policy.detach",
&format!("group:{group}::{policy_id}"),
"ok",
);
HttpResponse {
status: 204,
content_type: "application/json",
body: Vec::new(),
extra_headers: Vec::new(),
}
}
Err(e) => json_error(400, e.to_string()),
}
}
pub(crate) fn handle_iam_effective_permissions(
&self,
user: &str,
query: &std::collections::BTreeMap<String, String>,
) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let uid = decode_user_arg(user);
let pols = store.effective_policies(&uid);
let resource_echo = query.get("resource").cloned();
let items: Vec<JsonValue> = pols
.iter()
.map(|p| {
let mut obj = Map::new();
obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
obj.insert(
"statements".to_string(),
JsonValue::Number(p.statements.len() as f64),
);
JsonValue::Object(obj)
})
.collect();
let mut envelope = Map::new();
envelope.insert("user".to_string(), JsonValue::String(uid.to_string()));
if let Some(r) = resource_echo {
envelope.insert("resource".to_string(), JsonValue::String(r));
}
envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
envelope.insert("policies".to_string(), JsonValue::Array(items));
json_response(200, JsonValue::Object(envelope))
}
pub(crate) fn handle_iam_simulate(&self, body: Vec<u8>) -> HttpResponse {
let Some(store) = self.auth_store.as_ref() else {
return json_error(503, "auth store not configured");
};
let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
std::str::from_utf8(&body).unwrap_or(""),
) {
Ok(v) => v,
Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
};
let obj = match parsed.as_object() {
Some(o) => o,
None => return json_error(400, "body must be a JSON object"),
};
let principal = match obj.get("principal").and_then(|v| v.as_str()) {
Some(s) => decode_user_arg(s),
None => return json_error(400, "missing `principal`"),
};
let action = match obj.get("action").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return json_error(400, "missing `action`"),
};
let resource = match obj.get("resource") {
Some(JsonValue::Object(r)) => {
let kind = r
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = r
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if kind.is_empty() || name.is_empty() {
return json_error(400, "resource needs kind+name");
}
let mut rr = crate::auth::policies::ResourceRef::new(kind, name);
if let Some(t) = r.get("tenant").and_then(|v| v.as_str()) {
rr = rr.with_tenant(t.to_string());
}
rr
}
Some(JsonValue::String(s)) => match s.split_once(':') {
Some((k, n)) => crate::auth::policies::ResourceRef::new(k, n),
None => return json_error(400, "resource string must be `kind:name`"),
},
_ => return json_error(400, "missing `resource`"),
};
let mut sim_ctx = crate::auth::store::SimCtx::default();
if let Some(c) = obj.get("ctx").and_then(|v| v.as_object()) {
if let Some(t) = c.get("current_tenant").and_then(|v| v.as_str()) {
sim_ctx.current_tenant = Some(t.to_string());
}
if let Some(true) = c.get("mfa").and_then(|v| v.as_bool()) {
sim_ctx.mfa_present = true;
}
if let Some(ip) = c
.get("source_ip")
.or_else(|| c.get("peer_ip"))
.and_then(|v| v.as_str())
{
if let Ok(addr) = ip.parse() {
sim_ctx.peer_ip = Some(addr);
}
}
if let Some(ms) = c.get("now_ms").and_then(|v| v.as_u64()) {
sim_ctx.now_ms = Some(ms as u128);
}
}
let outcome = store.simulate(&principal, &action, &resource, sim_ctx);
let (decision_str, matched_pid, matched_sid) =
crate::runtime::impl_core::decision_to_strings(&outcome.decision);
self.iam_audit("iam/policy.simulate", &principal.to_string(), &decision_str);
let mut envelope = Map::new();
envelope.insert("decision".to_string(), JsonValue::String(decision_str));
envelope.insert(
"matched_policy_id".to_string(),
matched_pid
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
envelope.insert(
"matched_sid".to_string(),
matched_sid
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
envelope.insert("reason".to_string(), JsonValue::String(outcome.reason));
let trail: Vec<JsonValue> = outcome
.trail
.into_iter()
.map(|t| {
let mut obj = Map::new();
obj.insert("policy_id".to_string(), JsonValue::String(t.policy_id));
obj.insert(
"sid".to_string(),
t.sid.map(JsonValue::String).unwrap_or(JsonValue::Null),
);
obj.insert("matched".to_string(), JsonValue::Bool(t.matched));
obj.insert(
"effect".to_string(),
JsonValue::String(
match t.effect {
crate::auth::policies::Effect::Allow => "allow",
crate::auth::policies::Effect::Deny => "deny",
}
.to_string(),
),
);
obj.insert(
"why_skipped".to_string(),
t.why_skipped
.map(|s| JsonValue::String(s.to_string()))
.unwrap_or(JsonValue::Null),
);
JsonValue::Object(obj)
})
.collect();
envelope.insert("trail".to_string(), JsonValue::Array(trail));
json_response(200, JsonValue::Object(envelope))
}
}
fn decode_user_arg(raw: &str) -> crate::auth::UserId {
if let Some((tenant, name)) = raw.split_once('/') {
return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
}
if let Some((tenant, name)) = raw.split_once('.') {
return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
}
crate::auth::UserId::platform(raw.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metrics_expose_result_blob_cache_label_set() {
let runtime =
crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
.expect("runtime");
runtime
.db()
.store()
.set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
runtime.execute_query("SELECT 1").expect("populate miss");
runtime.execute_query("SELECT 1").expect("blob hit");
runtime.invalidate_result_cache();
let server = RedDBServer::new(runtime);
let response = server.handle_metrics();
let body = String::from_utf8(response.body).expect("utf8 metrics");
for needle in [
"reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l1\"}",
"reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l2\"}",
"reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"miss\"}",
"reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"ok\"}",
"reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"version_mismatch\"}",
"reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"too_large\"}",
"reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"metadata_too_large\"}",
"reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"dependency\"}",
"reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"namespace\"}",
"reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"capacity\"}",
"reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"expiry\"}",
"reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"policy\"}",
"reddb_cache_blob_l1_bytes_in_use{namespace=\"runtime.result_cache\"}",
"reddb_cache_blob_l1_entries{namespace=\"runtime.result_cache\"}",
"reddb_cache_blob_l2_bytes_in_use{namespace=\"runtime.result_cache\"}",
"reddb_cache_blob_l2_full_rejections_total{namespace=\"runtime.result_cache\"}",
"reddb_cache_blob_version_mismatch_total{namespace=\"runtime.result_cache\"}",
] {
assert!(body.contains(needle), "missing metric line for {needle}");
}
}
fn test_server() -> RedDBServer {
let runtime =
crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
.expect("runtime");
RedDBServer::new(runtime)
}
fn parse_body(resp: &HttpResponse) -> JsonValue {
let s = std::str::from_utf8(&resp.body).expect("utf8 body");
crate::serde_json::from_str::<JsonValue>(s).expect("JSON body")
}
fn cap_state<'a>(section: &'a JsonValue, key: &str) -> &'a str {
section
.get(key)
.and_then(|c| c.get("state"))
.and_then(JsonValue::as_str)
.unwrap_or_else(|| panic!("missing capability state for `{key}`"))
}
#[test]
fn capabilities_baseline_standalone_reports_supported_contract() {
let server = test_server();
let resp = server.handle_capabilities();
assert_eq!(resp.status, 200);
let body = parse_body(&resp);
assert!(body
.get("discovery_version")
.and_then(JsonValue::as_u64)
.is_some());
assert!(body.get("version").and_then(JsonValue::as_str).is_some());
let repl = body.get("replication").expect("replication");
assert_eq!(
repl.get("role").and_then(JsonValue::as_str),
Some("standalone")
);
assert!(repl
.get("commit_policy")
.and_then(JsonValue::as_str)
.is_some());
let simd = body
.get("vector")
.and_then(|v| v.get("simd"))
.expect("simd");
assert_eq!(
simd.get("state").and_then(JsonValue::as_str),
Some("supported")
);
assert!(simd.get("level").and_then(JsonValue::as_str).is_some());
let auth = body.get("auth").expect("auth");
assert_eq!(
auth.get("enabled").and_then(JsonValue::as_bool),
Some(false)
);
assert_eq!(
cap_state(auth.get("methods").expect("methods"), "bearer"),
"disabled"
);
for section in [
"build",
"vector",
"ai",
"auth",
"replication",
"api_contracts",
"preview_features",
] {
assert!(body.get(section).is_some(), "missing section `{section}`");
}
let providers = body
.get("ai")
.and_then(|a| a.get("providers"))
.expect("providers");
assert_eq!(cap_state(providers, "anthropic"), "supported");
}
#[test]
fn capabilities_auth_enabled_marks_password_and_bearer_supported() {
let runtime =
crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
.expect("runtime");
let auth = std::sync::Arc::new(crate::auth::store::AuthStore::new(
crate::auth::AuthConfig {
enabled: true,
..Default::default()
},
));
let server = RedDBServer::new(runtime).with_auth(auth);
let resp = server.handle_capabilities();
assert_eq!(resp.status, 200);
let body = parse_body(&resp);
let auth = body.get("auth").expect("auth");
assert_eq!(auth.get("enabled").and_then(JsonValue::as_bool), Some(true));
let methods = auth.get("methods").expect("methods");
assert_eq!(cap_state(methods, "password"), "supported");
assert_eq!(cap_state(methods, "bearer"), "supported");
assert_eq!(cap_state(methods, "mtls"), "disabled");
}
#[test]
fn capabilities_failed_transport_is_unavailable_with_reason() {
let runtime =
crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
.expect("runtime");
let mut options = crate::server::ServerOptions::default();
options.transport_readiness = crate::service_cli::TransportReadiness {
active: vec![crate::service_cli::TransportListenerState {
transport: "http".to_string(),
bind_addr: "127.0.0.1:5055".to_string(),
explicit: false,
}],
failed: vec![crate::service_cli::TransportListenerFailure {
transport: "grpc".to_string(),
bind_addr: "127.0.0.1:50051".to_string(),
explicit: true,
reason: "address in use".to_string(),
}],
};
let server = RedDBServer::with_options(runtime, options);
let resp = server.handle_capabilities();
assert_eq!(resp.status, 200);
let body = parse_body(&resp);
let contracts = body.get("api_contracts").expect("api_contracts");
assert_eq!(cap_state(contracts, "http"), "supported");
let grpc = contracts.get("grpc").expect("grpc contract");
assert_eq!(
grpc.get("state").and_then(JsonValue::as_str),
Some("unavailable")
);
assert_eq!(
grpc.get("reason").and_then(JsonValue::as_str),
Some("address in use")
);
}
#[test]
fn auth_capabilities_anonymous_no_auth_grants_open_access() {
let server = test_server();
let headers = std::collections::BTreeMap::new();
let resp = server.handle_auth_capabilities(&headers);
assert_eq!(resp.status, 200);
let body = parse_body(&resp);
assert_eq!(
body.get("auth_enabled").and_then(JsonValue::as_bool),
Some(false)
);
assert_eq!(
body.get("authenticated").and_then(JsonValue::as_bool),
Some(false)
);
let eff = body.get("effective").expect("effective");
assert_eq!(cap_state(eff, "read"), "supported");
assert_eq!(cap_state(eff, "write"), "supported");
}
#[test]
fn auth_capabilities_enabled_anonymous_is_unauthenticated() {
let runtime =
crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
.expect("runtime");
let auth = std::sync::Arc::new(crate::auth::store::AuthStore::new(
crate::auth::AuthConfig {
enabled: true,
..Default::default()
},
));
let server = RedDBServer::new(runtime).with_auth(auth);
let headers = std::collections::BTreeMap::new();
let resp = server.handle_auth_capabilities(&headers);
assert_eq!(resp.status, 200);
let body = parse_body(&resp);
assert_eq!(
body.get("auth_enabled").and_then(JsonValue::as_bool),
Some(true)
);
assert_eq!(
body.get("authenticated").and_then(JsonValue::as_bool),
Some(false)
);
let eff = body.get("effective").expect("effective");
assert_eq!(cap_state(eff, "read"), "supported");
assert_eq!(cap_state(eff, "write"), "disabled");
assert_eq!(cap_state(eff, "admin"), "disabled");
}
#[test]
fn admin_blob_cache_sweep_happy_path_returns_well_formed_report() {
let server = test_server();
let body = br#"{"limit_entries": 100, "limit_millis": 50}"#.to_vec();
let resp = server.handle_admin_blob_cache_sweep(body);
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
for field in [
"entries_scanned",
"entries_evicted",
"bytes_reclaimed",
"elapsed_ms",
"truncated_due_to_limit",
] {
assert!(
parsed.get(field).is_some(),
"missing field {field} in response: {parsed:?}"
);
}
}
#[test]
fn admin_blob_cache_sweep_empty_body_uses_unbounded_default() {
let server = test_server();
let resp = server.handle_admin_blob_cache_sweep(Vec::new());
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
}
#[test]
fn admin_blob_cache_sweep_invalid_json_returns_400() {
let server = test_server();
let resp = server.handle_admin_blob_cache_sweep(b"not json".to_vec());
assert_eq!(resp.status, 400);
}
#[test]
fn admin_blob_cache_flush_namespace_happy_path() {
let server = test_server();
let body = br#"{"namespace": "tenant-42:results"}"#.to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
assert_eq!(
parsed.get("namespace").and_then(|v| v.as_str()),
Some("tenant-42:results")
);
assert!(parsed.get("elapsed_micros").is_some());
assert!(parsed.get("generation_before").is_some());
assert!(parsed.get("generation_after").is_some());
}
#[test]
fn admin_blob_cache_flush_namespace_missing_body_returns_400() {
let server = test_server();
let resp = server.handle_admin_blob_cache_flush_namespace(Vec::new());
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
assert!(parsed
.get("error")
.and_then(|v| v.as_str())
.map(|s| s.contains("namespace"))
.unwrap_or(false));
}
#[test]
fn admin_blob_cache_flush_namespace_missing_field_returns_400() {
let server = test_server();
let body = br#"{"other": "x"}"#.to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 400);
}
#[test]
fn admin_blob_cache_flush_namespace_empty_string_returns_400() {
let server = test_server();
let body = br#"{"namespace": ""}"#.to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 400);
}
#[test]
fn admin_blob_cache_flush_namespace_rejects_crlf_smuggling_attempt() {
let server = test_server();
let body = br#"{"namespace": "real-ns\r\nfake-audit: spliced"}"#.to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
let msg = parsed
.get("error")
.and_then(|v| v.as_str())
.unwrap_or_default();
assert!(msg.contains("CR/LF"), "unexpected error: {msg}");
}
#[test]
fn admin_blob_cache_flush_namespace_rejects_nul_byte() {
let server = test_server();
let body = b"{\"namespace\": \"with-nul-\\u0000-here\"}".to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
let msg = parsed
.get("error")
.and_then(|v| v.as_str())
.unwrap_or_default();
assert!(msg.contains("NUL"), "unexpected error: {msg}");
}
#[test]
fn admin_blob_cache_flush_namespace_response_round_trips_unicode() {
let server = test_server();
let body = r#"{"namespace": "日本語-ns-🦀"}"#.as_bytes().to_vec();
let resp = server.handle_admin_blob_cache_flush_namespace(body);
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(
parsed.get("namespace").and_then(|v| v.as_str()),
Some("日本語-ns-🦀")
);
}
fn cas_body(namespace: &str, key: &str, new_value: &[u8], new_version: u64) -> Vec<u8> {
let b64 = {
let mut s = String::new();
for chunk in new_value.chunks(3) {
const CHARS: &[u8] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let b0 = chunk[0] as u32;
let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
s.push(CHARS[((n >> 18) & 63) as usize] as char);
s.push(CHARS[((n >> 12) & 63) as usize] as char);
s.push(if chunk.len() > 1 {
CHARS[((n >> 6) & 63) as usize] as char
} else {
'='
});
s.push(if chunk.len() > 2 {
CHARS[(n & 63) as usize] as char
} else {
'='
});
}
s
};
format!(
r#"{{"namespace":"{namespace}","key":"{key}","expected_version":0,"new_value_b64":"{b64}","new_version":{new_version}}}"#
)
.into_bytes()
}
#[test]
fn cas_happy_first_write() {
let server = test_server();
let body = cas_body("ns1", "k1", b"hello", 1);
let resp = server.handle_admin_blob_cache_compare_and_set(body);
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(
parsed.get("committed").and_then(|v| v.as_bool()),
Some(true)
);
assert_eq!(
parsed.get("current_version").and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn cas_happy_update_increments_version() {
let server = test_server();
server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v1", 1));
let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v2", 2));
assert_eq!(resp.status, 200);
let parsed = parse_body(&resp);
assert_eq!(
parsed.get("committed").and_then(|v| v.as_bool()),
Some(true)
);
assert_eq!(
parsed.get("current_version").and_then(|v| v.as_u64()),
Some(2)
);
}
#[test]
fn cas_conflict_same_version_returns_409() {
let server = test_server();
server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v1", 5));
let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v2", 5));
assert_eq!(resp.status, 409);
let parsed = parse_body(&resp);
assert_eq!(
parsed.get("committed").and_then(|v| v.as_bool()),
Some(false)
);
assert_eq!(
parsed.get("reason").and_then(|v| v.as_str()),
Some("VersionMismatch")
);
}
#[test]
fn cas_stale_expected_version_returns_409() {
let server = test_server();
server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v1", 10));
let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v2", 9));
assert_eq!(resp.status, 409);
let parsed = parse_body(&resp);
assert_eq!(
parsed.get("current_version").and_then(|v| v.as_u64()),
Some(10)
);
}
#[test]
fn cas_crlf_in_namespace_returns_400() {
let server = test_server();
let body = b"{\"namespace\":\"real\\r\\ninjected\",\"key\":\"k\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
let resp = server.handle_admin_blob_cache_compare_and_set(body);
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
assert!(msg.contains("CR/LF"), "expected CR/LF error, got: {msg}");
}
#[test]
fn cas_nul_in_key_returns_400() {
let server = test_server();
let body = b"{\"namespace\":\"ns\",\"key\":\"k\\u0000nul\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
let resp = server.handle_admin_blob_cache_compare_and_set(body);
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
assert!(msg.contains("NUL"), "expected NUL error, got: {msg}");
}
#[test]
fn cas_bad_base64_returns_400() {
let server = test_server();
let body = br#"{"namespace":"ns","key":"k","expected_version":0,"new_value_b64":"!!!invalid!!!","new_version":1}"#.to_vec();
let resp = server.handle_admin_blob_cache_compare_and_set(body);
assert_eq!(resp.status, 400);
let parsed = parse_body(&resp);
let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
assert!(msg.contains("base64"), "expected base64 error, got: {msg}");
}
#[test]
fn cas_missing_bearer_returns_401_via_route() {
use std::sync::Mutex;
static GUARD: Mutex<()> = Mutex::new(());
let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
let prev = std::env::var("RED_ADMIN_TOKEN").ok();
unsafe {
std::env::set_var("RED_ADMIN_TOKEN", "test-token-195");
}
let server = test_server();
let req = crate::server::transport::HttpRequest {
method: "POST".to_string(),
path: "/admin/cache/compare-and-set".to_string(),
query: std::collections::BTreeMap::new(),
headers: std::collections::BTreeMap::new(),
body: cas_body("ns", "k", b"v", 1),
};
let resp = server.route(req);
assert_eq!(resp.status, 401);
unsafe {
match prev {
Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
None => std::env::remove_var("RED_ADMIN_TOKEN"),
}
}
}
#[test]
fn cas_wrong_bearer_returns_401_via_route() {
use std::sync::Mutex;
static GUARD: Mutex<()> = Mutex::new(());
let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
let prev = std::env::var("RED_ADMIN_TOKEN").ok();
unsafe {
std::env::set_var("RED_ADMIN_TOKEN", "correct-token");
}
let server = test_server();
let mut headers = std::collections::BTreeMap::new();
headers.insert(
"authorization".to_string(),
"Bearer wrong-token".to_string(),
);
let req = crate::server::transport::HttpRequest {
method: "POST".to_string(),
path: "/admin/cache/compare-and-set".to_string(),
query: std::collections::BTreeMap::new(),
headers,
body: cas_body("ns", "k", b"v", 1),
};
let resp = server.route(req);
assert_eq!(resp.status, 401);
unsafe {
match prev {
Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
None => std::env::remove_var("RED_ADMIN_TOKEN"),
}
}
}
#[test]
fn cas_concurrent_race_exactly_one_commits() {
use std::sync::{Arc, Mutex};
let server = Arc::new(Mutex::new(test_server()));
let committed = Arc::new(Mutex::new(0u32));
let conflicted = Arc::new(Mutex::new(0u32));
let handles: Vec<_> = (0..8)
.map(|_| {
let server = Arc::clone(&server);
let committed = Arc::clone(&committed);
let conflicted = Arc::clone(&conflicted);
std::thread::spawn(move || {
let body = cas_body("race-ns", "race-key", b"payload", 1);
let resp = {
let s = server.lock().unwrap();
s.handle_admin_blob_cache_compare_and_set(body)
};
match resp.status {
200 => *committed.lock().unwrap() += 1,
409 => *conflicted.lock().unwrap() += 1,
s => panic!("unexpected status {s}"),
}
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
assert_eq!(
*committed.lock().unwrap(),
1,
"exactly one CAS should commit (version 1 can only be written once)"
);
}
#[test]
fn admin_blob_cache_routes_reject_unauth_when_admin_token_set() {
use std::sync::Mutex;
static GUARD: Mutex<()> = Mutex::new(());
let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
let prev = std::env::var("RED_ADMIN_TOKEN").ok();
unsafe {
std::env::set_var("RED_ADMIN_TOKEN", "test-token-148");
}
let server = test_server();
let req = crate::server::transport::HttpRequest {
method: "POST".to_string(),
path: "/admin/blob_cache/sweep".to_string(),
query: std::collections::BTreeMap::new(),
headers: std::collections::BTreeMap::new(),
body: br#"{"limit_entries":1}"#.to_vec(),
};
let resp = server.route(req);
assert_eq!(resp.status, 401, "sweep without admin token must be 401");
let req = crate::server::transport::HttpRequest {
method: "POST".to_string(),
path: "/admin/blob_cache/flush_namespace".to_string(),
query: std::collections::BTreeMap::new(),
headers: std::collections::BTreeMap::new(),
body: br#"{"namespace":"x"}"#.to_vec(),
};
let resp = server.route(req);
assert_eq!(resp.status, 401, "flush without admin token must be 401");
let mut headers = std::collections::BTreeMap::new();
headers.insert(
"authorization".to_string(),
"Bearer test-token-148".to_string(),
);
let req = crate::server::transport::HttpRequest {
method: "POST".to_string(),
path: "/admin/blob_cache/flush_namespace".to_string(),
query: std::collections::BTreeMap::new(),
headers,
body: br#"{"namespace":"ok"}"#.to_vec(),
};
let resp = server.route(req);
assert_eq!(resp.status, 200, "flush with admin token must be 200");
unsafe {
match prev {
Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
None => std::env::remove_var("RED_ADMIN_TOKEN"),
}
}
}
}