Skip to main content

reddb_server/server/
handlers_admin.rs

1//! Lifecycle / admin HTTP endpoints (PLAN.md Phase 1).
2//!
3//! Universal contract surface consumed by orchestrators (K8s preStop,
4//! Fly autostop, ECS drain, systemd, custom).
5//!
6//! - `POST /admin/shutdown` — flush + checkpoint + optional backup,
7//!   200 only when safe to die. Idempotent.
8//! - `POST /admin/drain` — stop accepting new writes, in-flight finish,
9//!   200 once drain complete. Soft pre-shutdown step.
10//! - `GET  /health/live` — process responsive (always cheap).
11//! - `GET  /health/ready` — accepts queries (WAL replay + restore done).
12//! - `GET  /health/startup` — same logic as ready, K8s-style longer
13//!   timeout window.
14
15use super::*;
16use crate::runtime::lifecycle::Phase;
17use std::path::{Path, PathBuf};
18
19/// Path to the persistent runtime-toggle file kept beside the
20/// `.rdb` data file. Operators can prep a fresh deploy by writing
21/// `{"read_only": true}` before first boot to come up locked.
22pub(crate) fn runtime_state_path(data_path: &Path) -> PathBuf {
23    let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
24    parent.join(".runtime-state.json")
25}
26
27/// Atomically persist the read_only toggle. Writes to a sibling
28/// `.tmp` file then renames to defeat torn writes — same pattern
29/// the snapshot publish path uses.
30pub(crate) fn persist_runtime_readonly(state_path: &Path, enabled: bool) -> std::io::Result<()> {
31    use std::io::Write;
32    let mut object = crate::json::Map::new();
33    object.insert("read_only".to_string(), crate::json::Value::Bool(enabled));
34    let body = crate::serde_json::to_string_pretty(&crate::json::Value::Object(object))
35        .map_err(|err| std::io::Error::other(err.to_string()))?;
36    if let Some(parent) = state_path.parent() {
37        if !parent.as_os_str().is_empty() {
38            std::fs::create_dir_all(parent)?;
39        }
40    }
41    let tmp = state_path.with_extension("json.tmp");
42    {
43        let mut f = std::fs::File::create(&tmp)?;
44        f.write_all(body.as_bytes())?;
45        f.sync_all()?;
46    }
47    std::fs::rename(&tmp, state_path)?;
48    Ok(())
49}
50
51/// Read a previously-persisted read_only toggle. Returns `None`
52/// when the file doesn't exist or doesn't parse — boot continues
53/// from the env-var / RedDBOptions value in that case.
54pub fn load_runtime_readonly(data_path: &Path) -> Option<bool> {
55    let state_path = runtime_state_path(data_path);
56    let bytes = std::fs::read(&state_path).ok()?;
57    let parsed: crate::json::Value = crate::json::from_slice(&bytes).ok()?;
58    parsed.get("read_only").and_then(|v| v.as_bool())
59}
60
61/// PLAN.md Phase 11.6 — default lease holder id when the operator
62/// doesn't pin one in the promotion request body. Mirrors the boot
63/// loop's resolution (`RED_LEASE_HOLDER_ID` → `<hostname>-<pid>`).
64fn default_holder_id() -> String {
65    if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
66        return explicit;
67    }
68    let host = std::env::var("HOSTNAME")
69        .or_else(|_| std::env::var("HOST"))
70        .unwrap_or_else(|_| "unknown-host".to_string());
71    format!("{host}-{}", std::process::id())
72}
73
74/// Sanitize replica IDs for use as Prometheus label values.
75/// Replaces double quotes, backslashes, and newlines so the resulting
76/// metric line stays parseable. Operators picking aggressive replica
77/// IDs is rare but malicious input must not break /metrics.
78fn sanitize_label(value: &str) -> String {
79    let mut out = String::with_capacity(value.len());
80    for ch in value.chars() {
81        match ch {
82            '"' => out.push_str("\\\""),
83            '\\' => out.push_str("\\\\"),
84            '\n' => out.push_str("\\n"),
85            '\r' => out.push_str("\\r"),
86            _ => out.push(ch),
87        }
88    }
89    out
90}
91
92/// Standard base64 decode (RFC 4648 §4, alphabet `A-Za-z0-9+/`).
93/// Returns `Err` on any invalid character; padding `=` is optional.
94fn b64_decode(input: &str) -> Result<Vec<u8>, String> {
95    let input = input.trim_end_matches('=');
96    let mut buf = Vec::with_capacity(input.len() * 3 / 4 + 1);
97
98    let lookup = |c: u8| -> Result<u32, String> {
99        match c {
100            b'A'..=b'Z' => Ok((c - b'A') as u32),
101            b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
102            b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
103            b'+' => Ok(62),
104            b'/' => Ok(63),
105            other => Err(format!("invalid base64 character: {}", other as char)),
106        }
107    };
108
109    let bytes: Vec<u8> = input.bytes().collect();
110    for chunk in bytes.chunks(4) {
111        let v: Vec<u32> = chunk.iter().map(|&b| lookup(b)).collect::<Result<_, _>>()?;
112        match v.len() {
113            4 => {
114                let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6) | v[3];
115                buf.push((n >> 16) as u8);
116                buf.push((n >> 8) as u8);
117                buf.push(n as u8);
118            }
119            3 => {
120                let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6);
121                buf.push((n >> 16) as u8);
122                buf.push((n >> 8) as u8);
123            }
124            2 => {
125                let n = (v[0] << 18) | (v[1] << 12);
126                buf.push((n >> 16) as u8);
127            }
128            _ => {}
129        }
130    }
131    Ok(buf)
132}
133
134/// Reject CR, LF, and NUL bytes in caller-controlled strings that flow into
135/// audit logs and response envelopes (ADR 0010).
136fn reject_smuggling_bytes(field: &str, value: &str) -> Option<HttpResponse> {
137    for (idx, byte) in value.as_bytes().iter().enumerate() {
138        match *byte {
139            b'\0' => {
140                return Some(json_error(
141                    400,
142                    format!("field `{field}` contains forbidden NUL byte at index {idx}"),
143                ));
144            }
145            b'\r' | b'\n' => {
146                return Some(json_error(
147                    400,
148                    format!("field `{field}` contains forbidden CR/LF byte at index {idx}"),
149                ));
150            }
151            _ => {}
152        }
153    }
154    None
155}
156
157impl RedDBServer {
158    /// `POST /admin/shutdown` — graceful shutdown coordinator.
159    /// Returns 200 with the shutdown report when complete; 200 with
160    /// the cached report when already shut down (idempotent); 500
161    /// on flush failure (process should still exit afterwards).
162    ///
163    /// The HTTP layer does not own process exit — that's the
164    /// signal-handler / `run_server` driver. This handler reports
165    /// state; orchestrators that posted SIGTERM separately will see
166    /// the process die when their grace window elapses.
167    pub(crate) fn handle_admin_shutdown(&self) -> HttpResponse {
168        let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
169            .ok()
170            .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
171            .unwrap_or(true);
172
173        match self.runtime.graceful_shutdown(backup_on_shutdown) {
174            Ok(report) => {
175                // PLAN.md Phase 6.5 — audit operator-triggered
176                // shutdown. Recorded as "ok" + duration so the log
177                // shipper can graph shutdown latency over time.
178                let mut details = Map::new();
179                details.insert(
180                    "backup_uploaded".to_string(),
181                    JsonValue::Bool(report.backup_uploaded),
182                );
183                details.insert(
184                    "duration_ms".to_string(),
185                    JsonValue::Number(report.duration_ms as f64),
186                );
187                self.runtime.audit_log().record(
188                    "admin/shutdown",
189                    "operator",
190                    "instance",
191                    "ok",
192                    JsonValue::Object(details),
193                );
194                let mut object = Map::new();
195                object.insert("ok".to_string(), JsonValue::Bool(true));
196                object.insert(
197                    "phase".to_string(),
198                    JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
199                );
200                object.insert(
201                    "flushed_wal".to_string(),
202                    JsonValue::Bool(report.flushed_wal),
203                );
204                object.insert(
205                    "final_checkpoint".to_string(),
206                    JsonValue::Bool(report.final_checkpoint),
207                );
208                object.insert(
209                    "backup_uploaded".to_string(),
210                    JsonValue::Bool(report.backup_uploaded),
211                );
212                object.insert(
213                    "duration_ms".to_string(),
214                    JsonValue::Number(report.duration_ms as f64),
215                );
216                json_response(200, JsonValue::Object(object))
217            }
218            Err(err) => json_error(500, err.to_string()),
219        }
220    }
221
222    /// `POST /admin/restore` — operator-triggered restore from the
223    /// configured remote backend (PLAN.md Phase 3.2). Refuses unless
224    /// the runtime is read_only / replica so live writes can't race
225    /// the swap. Body fields are optional:
226    /// `{"to_lsn": u64, "to_timestamp_ms": u64, "snapshot_id": str}`.
227    /// Empty body restores to latest.
228    pub(crate) fn handle_admin_restore(&self, body: Vec<u8>) -> HttpResponse {
229        if !self.runtime.write_gate().is_read_only() {
230            return json_error(
231                409,
232                "POST /admin/restore requires the runtime to be read_only or replica-role; \
233                 toggle via RED_READONLY=true or POST /admin/readonly first",
234            );
235        }
236        let db = self.runtime.db();
237        let Some(backend) = db.options().remote_backend.clone() else {
238            return json_error(412, "no remote backend configured (RED_BACKEND=none)");
239        };
240        let Some(local_path) = db.path().map(|p| p.to_path_buf()) else {
241            return json_error(412, "in-memory runtime cannot be restored from remote");
242        };
243        let snapshot_prefix = db.options().default_snapshot_prefix();
244        let wal_prefix = db.options().default_wal_archive_prefix();
245        let target_time_ms = if body.is_empty() {
246            0u64
247        } else {
248            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
249                Ok(v) => v
250                    .get("to_timestamp_ms")
251                    .and_then(|n| n.as_u64())
252                    .or_else(|| {
253                        v.get("to_timestamp")
254                            .and_then(|n| n.as_u64())
255                            .map(|s| s.saturating_mul(1000))
256                    })
257                    .unwrap_or(0),
258                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
259            }
260        };
261        let recovery =
262            crate::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
263        match recovery.restore_to(target_time_ms, &local_path) {
264            Ok(report) => {
265                let mut details = Map::new();
266                details.insert(
267                    "snapshot_used".to_string(),
268                    JsonValue::Number(report.snapshot_used as f64),
269                );
270                details.insert(
271                    "wal_segments_replayed".to_string(),
272                    JsonValue::Number(report.wal_segments_replayed as f64),
273                );
274                details.insert(
275                    "records_applied".to_string(),
276                    JsonValue::Number(report.records_applied as f64),
277                );
278                details.insert(
279                    "recovered_to_lsn".to_string(),
280                    JsonValue::Number(report.recovered_to_lsn as f64),
281                );
282                details.insert(
283                    "recovered_to_time".to_string(),
284                    JsonValue::Number(report.recovered_to_time as f64),
285                );
286                self.runtime.audit_log().record(
287                    "admin/restore",
288                    "operator",
289                    "instance",
290                    "ok",
291                    JsonValue::Object(details.clone()),
292                );
293                let mut object = Map::new();
294                object.insert("ok".to_string(), JsonValue::Bool(true));
295                for (k, v) in details {
296                    object.insert(k, v);
297                }
298                json_response(200, JsonValue::Object(object))
299            }
300            Err(err) => {
301                self.runtime.audit_log().record(
302                    "admin/restore",
303                    "operator",
304                    "instance",
305                    &format!("err: {err}"),
306                    JsonValue::Null,
307                );
308                json_error(500, err.to_string())
309            }
310        }
311    }
312
313    /// `POST /admin/backup` — operator-triggered backup, alias of
314    /// `/backup/trigger` placed under the universal `/admin/*`
315    /// namespace per PLAN.md Phase 3.3.
316    pub(crate) fn handle_admin_backup(
317        &self,
318        _query: &std::collections::BTreeMap<String, String>,
319    ) -> HttpResponse {
320        match self.runtime.trigger_backup() {
321            Ok(result) => {
322                let mut details = Map::new();
323                details.insert(
324                    "snapshot_id".to_string(),
325                    JsonValue::Number(result.snapshot_id as f64),
326                );
327                details.insert("uploaded".to_string(), JsonValue::Bool(result.uploaded));
328                details.insert(
329                    "duration_ms".to_string(),
330                    JsonValue::Number(result.duration_ms as f64),
331                );
332                self.runtime.audit_log().record(
333                    "admin/backup",
334                    "operator",
335                    "instance",
336                    "ok",
337                    JsonValue::Object(details.clone()),
338                );
339                let mut object = Map::new();
340                object.insert("ok".to_string(), JsonValue::Bool(true));
341                for (k, v) in details {
342                    object.insert(k, v);
343                }
344                json_response(200, JsonValue::Object(object))
345            }
346            Err(err) => {
347                self.runtime.audit_log().record(
348                    "admin/backup",
349                    "operator",
350                    "instance",
351                    &format!("err: {err}"),
352                    JsonValue::Null,
353                );
354                json_error(500, err.to_string())
355            }
356        }
357    }
358
359    /// `POST /admin/blob_cache/sweep` — bounded sweep of expired L1
360    /// entries on the runtime result Blob Cache (issue #148 follow-up,
361    /// closing the deferred half wired by sweeper.rs flag #4).
362    ///
363    /// Body (JSON, all fields optional):
364    ///
365    /// ```json
366    /// { "limit_entries": 1000, "limit_millis": 100 }
367    /// ```
368    ///
369    /// - Both null / missing → unbounded sweep (`SweepLimit::Either`
370    ///   with `usize::MAX` / `u32::MAX` so the sweeper still has the
371    ///   single composite-bound code path).
372    /// - One field set → `SweepLimit::Entries` or `SweepLimit::Millis`.
373    /// - Both set → `SweepLimit::Either { entries, millis }` (first
374    ///   bound to fire wins).
375    ///
376    /// Returns the [`SweepReport`](crate::storage::cache::sweeper::SweepReport)
377    /// fields plus `ok:true`. Caller-influenced strings (none today —
378    /// the report holds only numeric fields) would round-trip through
379    /// `SerializedJsonField::tainted` per ADR 0010 §3.
380    pub(crate) fn handle_admin_blob_cache_sweep(&self, body: Vec<u8>) -> HttpResponse {
381        use crate::storage::cache::sweeper::{BlobCacheSweeper, SweepLimit};
382
383        let (limit_entries, limit_millis) = if body.is_empty() {
384            (None, None)
385        } else {
386            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
387                Ok(v) => {
388                    let entries = v
389                        .get("limit_entries")
390                        .and_then(|n| n.as_u64())
391                        .map(|n| usize::try_from(n).unwrap_or(usize::MAX));
392                    let millis = v
393                        .get("limit_millis")
394                        .and_then(|n| n.as_u64())
395                        .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
396                    (entries, millis)
397                }
398                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
399            }
400        };
401
402        let limit = match (limit_entries, limit_millis) {
403            (None, None) => SweepLimit::Either {
404                entries: usize::MAX,
405                millis: u32::MAX,
406            },
407            (Some(e), None) => SweepLimit::Entries(e),
408            (None, Some(m)) => SweepLimit::Millis(m),
409            (Some(e), Some(m)) => SweepLimit::Either {
410                entries: e,
411                millis: m,
412            },
413        };
414
415        let report = BlobCacheSweeper::sweep_expired(self.runtime.result_blob_cache(), limit);
416
417        let mut object = Map::new();
418        object.insert("ok".to_string(), JsonValue::Bool(true));
419        object.insert(
420            "entries_scanned".to_string(),
421            JsonValue::Number(report.entries_scanned as f64),
422        );
423        object.insert(
424            "entries_evicted".to_string(),
425            JsonValue::Number(report.entries_evicted as f64),
426        );
427        object.insert(
428            "bytes_reclaimed".to_string(),
429            JsonValue::Number(report.bytes_reclaimed as f64),
430        );
431        object.insert(
432            "elapsed_ms".to_string(),
433            JsonValue::Number(report.elapsed_ms as f64),
434        );
435        object.insert(
436            "truncated_due_to_limit".to_string(),
437            JsonValue::Bool(report.truncated_due_to_limit),
438        );
439
440        // Audit. Operator-driven sweep is rare; logging it gives the
441        // log shipper a primary-key on which to graph cache-sweep cadence.
442        let mut details = Map::new();
443        details.insert(
444            "entries_evicted".to_string(),
445            JsonValue::Number(report.entries_evicted as f64),
446        );
447        details.insert(
448            "bytes_reclaimed".to_string(),
449            JsonValue::Number(report.bytes_reclaimed as f64),
450        );
451        details.insert(
452            "elapsed_ms".to_string(),
453            JsonValue::Number(report.elapsed_ms as f64),
454        );
455        self.runtime.audit_log().record(
456            "admin/blob_cache/sweep",
457            "operator",
458            "instance",
459            "ok",
460            JsonValue::Object(details),
461        );
462
463        json_response(200, JsonValue::Object(object))
464    }
465
466    /// `POST /admin/blob_cache/flush_namespace` — foreground-fast
467    /// namespace flush on the runtime result Blob Cache (issue #148
468    /// follow-up, closing sweeper.rs flag #4).
469    ///
470    /// Body (JSON):
471    ///
472    /// ```json
473    /// { "namespace": "tenant-42:results" }
474    /// ```
475    ///
476    /// Validation contract:
477    ///
478    /// - `namespace` is **required**, **non-empty**, and must contain
479    ///   no NUL or CR/LF bytes — the same constraints
480    ///   [`crate::server::header_escape_guard::HeaderEscapeGuard`]
481    ///   enforces on response headers, applied here on the request side
482    ///   so a CRLF-laden namespace cannot smuggle audit-log lines or
483    ///   sneak past the JSON-envelope guard. Reflected back into the
484    ///   response through
485    ///   [`crate::json_field::SerializedJsonField::tainted`] per ADR 0010 §3.
486    ///
487    /// Returns the [`NamespaceFlushReport`](crate::storage::cache::sweeper::NamespaceFlushReport)
488    /// fields plus `ok:true`.
489    pub(crate) fn handle_admin_blob_cache_flush_namespace(&self, body: Vec<u8>) -> HttpResponse {
490        use crate::storage::cache::sweeper::BlobCacheSweeper;
491
492        if body.is_empty() {
493            return json_error(400, "missing JSON body with required `namespace` field");
494        }
495        let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
496            Ok(v) => v,
497            Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
498        };
499        let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
500            Some(n) => n.to_string(),
501            None => return json_error(400, "field `namespace` is required and must be a string"),
502        };
503        if namespace.is_empty() {
504            return json_error(400, "field `namespace` must not be empty");
505        }
506        // Adversarial-byte rejection. The namespace string is
507        // caller-controlled and may end up in audit logs, dashboards,
508        // and the response envelope. Reject CR/LF/NUL on the request
509        // side rather than relying on every downstream sink to escape.
510        for (idx, byte) in namespace.as_bytes().iter().enumerate() {
511            match *byte {
512                b'\0' => {
513                    return json_error(
514                        400,
515                        format!("field `namespace` contains forbidden NUL byte at index {idx}"),
516                    );
517                }
518                b'\r' | b'\n' => {
519                    return json_error(
520                        400,
521                        format!("field `namespace` contains forbidden CR/LF byte at index {idx}"),
522                    );
523                }
524                _ => {}
525            }
526        }
527
528        let report =
529            BlobCacheSweeper::flush_namespace(self.runtime.result_blob_cache(), &namespace);
530
531        let mut object = Map::new();
532        object.insert("ok".to_string(), JsonValue::Bool(true));
533        // Reflect the caller-supplied namespace back through the
534        // JSON-boundary guard so any high-bit / Unicode bytes round-trip
535        // through canonical RFC-8259 escaping.
536        object.insert(
537            "namespace".to_string(),
538            crate::json_field::SerializedJsonField::tainted(&report.namespace),
539        );
540        object.insert(
541            "generation_before".to_string(),
542            JsonValue::Number(report.generation_before as f64),
543        );
544        object.insert(
545            "generation_after".to_string(),
546            JsonValue::Number(report.generation_after as f64),
547        );
548        object.insert(
549            "elapsed_micros".to_string(),
550            JsonValue::Number(report.elapsed_micros as f64),
551        );
552
553        let mut details = Map::new();
554        details.insert(
555            "namespace".to_string(),
556            crate::json_field::SerializedJsonField::tainted(&report.namespace),
557        );
558        details.insert(
559            "elapsed_micros".to_string(),
560            JsonValue::Number(report.elapsed_micros as f64),
561        );
562        self.runtime.audit_log().record(
563            "admin/blob_cache/flush_namespace",
564            "operator",
565            "instance",
566            "ok",
567            JsonValue::Object(details),
568        );
569
570        json_response(200, JsonValue::Object(object))
571    }
572
573    /// `POST /admin/cache/compare-and-set` — optimistic-lock put on the
574    /// runtime result Blob Cache (issue #195).
575    ///
576    /// Body (JSON):
577    ///
578    /// ```json
579    /// {
580    ///   "namespace":      "tenant-42:results",
581    ///   "key":            "query-abc",
582    ///   "expected_version": 3,
583    ///   "new_value_b64":  "<standard base64>",
584    ///   "new_version":    4,
585    ///   "ttl_ms":         60000
586    /// }
587    /// ```
588    ///
589    /// `ttl_ms` is optional. `expected_version` is informational —
590    /// the atomic guard comes from `BlobCache::put`'s internal
591    /// `check_version`: if the stored version ≥ `new_version` the
592    /// write is rejected with 409.
593    ///
594    /// Returns:
595    /// - 200 `{ committed: true, current_version }`
596    /// - 409 `{ committed: false, current_version, reason: "VersionMismatch" }`
597    /// - 400 malformed body / CRLF/NUL injection / bad base64
598    /// - 401 missing or wrong admin bearer token
599    pub(crate) fn handle_admin_blob_cache_compare_and_set(&self, body: Vec<u8>) -> HttpResponse {
600        use crate::storage::cache::blob::{BlobCachePolicy, BlobCachePut, CacheError};
601
602        if body.is_empty() {
603            return json_error(400, "missing JSON body");
604        }
605        let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
606            Ok(v) => v,
607            Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
608        };
609
610        let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
611            Some(n) if !n.is_empty() => n.to_string(),
612            Some(_) => return json_error(400, "field `namespace` must not be empty"),
613            None => return json_error(400, "field `namespace` is required and must be a string"),
614        };
615        let key = match parsed.get("key").and_then(|v| v.as_str()) {
616            Some(k) if !k.is_empty() => k.to_string(),
617            Some(_) => return json_error(400, "field `key` must not be empty"),
618            None => return json_error(400, "field `key` is required and must be a string"),
619        };
620        let new_value_b64 = match parsed.get("new_value_b64").and_then(|v| v.as_str()) {
621            Some(v) => v.to_string(),
622            None => {
623                return json_error(
624                    400,
625                    "field `new_value_b64` is required and must be a string",
626                )
627            }
628        };
629        let new_version = match parsed.get("new_version").and_then(|v| v.as_u64()) {
630            Some(v) => v,
631            None => {
632                return json_error(
633                    400,
634                    "field `new_version` is required and must be a non-negative integer",
635                )
636            }
637        };
638        // expected_version is optional and informational; validate type if present.
639        if let Some(ev) = parsed.get("expected_version") {
640            if ev.as_u64().is_none() {
641                return json_error(
642                    400,
643                    "field `expected_version` must be a non-negative integer",
644                );
645            }
646        }
647        let ttl_ms = parsed.get("ttl_ms").and_then(|v| v.as_u64());
648
649        // Adversarial-byte rejection on caller-controlled strings.
650        if let Some(err) = reject_smuggling_bytes("namespace", &namespace) {
651            return err;
652        }
653        if let Some(err) = reject_smuggling_bytes("key", &key) {
654            return err;
655        }
656
657        // Base64 decode the payload.
658        let bytes = match b64_decode(&new_value_b64) {
659            Ok(b) => b,
660            Err(e) => return json_error(400, format!("invalid base64 in `new_value_b64`: {e}")),
661        };
662
663        // Build and execute the versioned put.
664        let policy = if let Some(ttl) = ttl_ms {
665            BlobCachePolicy::default().version(new_version).ttl_ms(ttl)
666        } else {
667            BlobCachePolicy::default().version(new_version)
668        };
669        let put = BlobCachePut::new(bytes).with_policy(policy);
670
671        match self.runtime.result_blob_cache().put(&namespace, &key, put) {
672            Ok(()) => {
673                let mut obj = Map::new();
674                obj.insert("committed".to_string(), JsonValue::Bool(true));
675                obj.insert(
676                    "current_version".to_string(),
677                    JsonValue::Number(new_version as f64),
678                );
679
680                let mut details = Map::new();
681                details.insert(
682                    "namespace".to_string(),
683                    crate::json_field::SerializedJsonField::tainted(&namespace),
684                );
685                details.insert(
686                    "key".to_string(),
687                    crate::json_field::SerializedJsonField::tainted(&key),
688                );
689                details.insert(
690                    "new_version".to_string(),
691                    JsonValue::Number(new_version as f64),
692                );
693                self.runtime.audit_log().record(
694                    "admin/cache/compare_and_set",
695                    "operator",
696                    "instance",
697                    "ok",
698                    JsonValue::Object(details),
699                );
700
701                json_response(200, JsonValue::Object(obj))
702            }
703            Err(CacheError::VersionMismatch { existing, .. }) => {
704                let mut obj = Map::new();
705                obj.insert("committed".to_string(), JsonValue::Bool(false));
706                obj.insert(
707                    "current_version".to_string(),
708                    JsonValue::Number(existing as f64),
709                );
710                obj.insert(
711                    "reason".to_string(),
712                    JsonValue::String("VersionMismatch".to_string()),
713                );
714                json_response(409, JsonValue::Object(obj))
715            }
716            Err(err) => json_error(500, format!("cache put failed: {err:?}")),
717        }
718    }
719
720    /// `GET /admin/blob_cache/stats` — blob cache telemetry snapshot.
721    ///
722    /// Returns a JSON object with the global [`BlobCacheStats`] counters.
723    /// Optional query parameter `namespace` is reserved for future
724    /// per-namespace filtering; the current implementation ignores it
725    /// and always returns instance-global counters.
726    pub(crate) fn handle_admin_blob_cache_stats(
727        &self,
728        _query: &std::collections::BTreeMap<String, String>,
729    ) -> HttpResponse {
730        let s = self.runtime.result_blob_cache().stats();
731        let mut obj = Map::new();
732        obj.insert("ok".to_string(), JsonValue::Bool(true));
733        obj.insert("hits".to_string(), JsonValue::Number(s.hits() as f64));
734        obj.insert("misses".to_string(), JsonValue::Number(s.misses() as f64));
735        obj.insert(
736            "insertions".to_string(),
737            JsonValue::Number(s.insertions() as f64),
738        );
739        obj.insert(
740            "evictions".to_string(),
741            JsonValue::Number(s.evictions() as f64),
742        );
743        obj.insert(
744            "expirations".to_string(),
745            JsonValue::Number(s.expirations() as f64),
746        );
747        obj.insert(
748            "invalidations".to_string(),
749            JsonValue::Number(s.invalidations() as f64),
750        );
751        obj.insert(
752            "namespace_flushes".to_string(),
753            JsonValue::Number(s.namespace_flushes() as f64),
754        );
755        obj.insert(
756            "version_mismatches".to_string(),
757            JsonValue::Number(s.version_mismatches() as f64),
758        );
759        obj.insert("entries".to_string(), JsonValue::Number(s.entries() as f64));
760        obj.insert(
761            "bytes_in_use".to_string(),
762            JsonValue::Number(s.bytes_in_use() as f64),
763        );
764        obj.insert(
765            "l1_bytes_max".to_string(),
766            JsonValue::Number(s.l1_bytes_max() as f64),
767        );
768        obj.insert(
769            "l2_bytes_in_use".to_string(),
770            JsonValue::Number(s.l2_bytes_in_use() as f64),
771        );
772        obj.insert(
773            "l2_bytes_max".to_string(),
774            JsonValue::Number(s.l2_bytes_max() as f64),
775        );
776        obj.insert(
777            "l2_full_rejections".to_string(),
778            JsonValue::Number(s.l2_full_rejections() as f64),
779        );
780        obj.insert(
781            "l2_metadata_reads".to_string(),
782            JsonValue::Number(s.l2_metadata_reads() as f64),
783        );
784        obj.insert(
785            "l2_negative_skips".to_string(),
786            JsonValue::Number(s.l2_negative_skips() as f64),
787        );
788        obj.insert(
789            "synopsis_metadata_reads".to_string(),
790            JsonValue::Number(s.synopsis_metadata_reads() as f64),
791        );
792        obj.insert(
793            "synopsis_bytes".to_string(),
794            JsonValue::Number(s.synopsis_bytes() as f64),
795        );
796        obj.insert(
797            "namespaces".to_string(),
798            JsonValue::Number(s.namespaces() as f64),
799        );
800        obj.insert(
801            "max_namespaces".to_string(),
802            JsonValue::Number(s.max_namespaces() as f64),
803        );
804        obj.insert(
805            "promotion_queued".to_string(),
806            JsonValue::Number(s.promotion_queued() as f64),
807        );
808        obj.insert(
809            "promotion_dropped".to_string(),
810            JsonValue::Number(s.promotion_dropped() as f64),
811        );
812        obj.insert(
813            "promotion_completed".to_string(),
814            JsonValue::Number(s.promotion_completed() as f64),
815        );
816        obj.insert(
817            "promotion_queue_depth".to_string(),
818            JsonValue::Number(s.promotion_queue_depth() as f64),
819        );
820        obj.insert(
821            "l2_compression_ratio_observed".to_string(),
822            JsonValue::Number(s.l2_compression_ratio_observed()),
823        );
824        obj.insert(
825            "l2_compression_skipped_total".to_string(),
826            JsonValue::Number(s.l2_compression_skipped_total() as f64),
827        );
828        obj.insert(
829            "l2_bytes_saved_total".to_string(),
830            JsonValue::Number(s.l2_bytes_saved_total() as f64),
831        );
832        json_response(200, JsonValue::Object(obj))
833    }
834
835    /// `POST /admin/readonly` — flip the public-mutation gate
836    /// (PLAN.md Phase 4.3).
837    ///
838    /// Body: `{"enabled": true|false}`. Returns the new state. Useful
839    /// for orchestrators that need to suspend writes (maintenance,
840    /// billing suspension, hot key rotation) without killing the
841    /// process or detaching the volume. Replicas reject writes
842    /// regardless of this flag — the replication-role gate fires
843    /// first.
844    ///
845    /// Persistence: the new state is written to
846    /// `<data_dir>/.runtime-state.json` so a subsequent restart
847    /// re-applies it. Failure to persist returns 500 — the in-memory
848    /// flag is reverted so caller and disk stay consistent.
849    pub(crate) fn handle_admin_readonly(&self, body: Vec<u8>) -> HttpResponse {
850        let enabled = if body.is_empty() {
851            true
852        } else {
853            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
854                Ok(v) => v.get("enabled").and_then(|n| n.as_bool()).unwrap_or(true),
855                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
856            }
857        };
858
859        let previous = self.runtime.write_gate().set_read_only(enabled);
860
861        // Persist the toggle so a subsequent restart re-applies it
862        // before any client surface comes online. Best-effort: on
863        // failure we revert the in-memory flag so disk and runtime
864        // agree (operator can then re-issue once the storage issue
865        // is resolved).
866        if let Some(data_path) = self.runtime.db().path() {
867            let state_path = runtime_state_path(data_path);
868            if let Err(err) = persist_runtime_readonly(&state_path, enabled) {
869                self.runtime.write_gate().set_read_only(previous);
870                return json_error(
871                    500,
872                    format!("read_only persisted to {state_path:?} failed: {err}"),
873                );
874            }
875        }
876
877        let mut details = Map::new();
878        details.insert("enabled".to_string(), JsonValue::Bool(enabled));
879        details.insert("previous".to_string(), JsonValue::Bool(previous));
880        self.runtime.audit_log().record(
881            "admin/readonly",
882            "operator",
883            "instance",
884            "ok",
885            JsonValue::Object(details),
886        );
887        let mut object = Map::new();
888        object.insert("ok".to_string(), JsonValue::Bool(true));
889        object.insert("read_only".to_string(), JsonValue::Bool(enabled));
890        object.insert("previous".to_string(), JsonValue::Bool(previous));
891        json_response(200, JsonValue::Object(object))
892    }
893
894    /// `GET /metrics` — Prometheus / OpenMetrics exposition.
895    ///
896    /// Initial metric set (PLAN.md Phase 5.1) covers the
897    /// orchestrator-relevant signals: uptime, health phase, read-
898    /// only state, replication role, last-backup outcome, on-disk
899    /// size when known. Counters that need request-path
900    /// instrumentation (ops_total, query_duration_seconds_bucket)
901    /// land in a follow-up commit so this endpoint can ship today
902    /// against the existing data sources.
903    pub(crate) fn handle_metrics(&self) -> HttpResponse {
904        use std::fmt::Write;
905        let lifecycle = self.runtime.lifecycle();
906        let phase = lifecycle.phase();
907        let now_ms = std::time::SystemTime::now()
908            .duration_since(std::time::UNIX_EPOCH)
909            .map(|d| d.as_millis() as u64)
910            .unwrap_or(0);
911        let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
912        let cold_start_secs = lifecycle
913            .ready_at_ms()
914            .map(|ready_ms| (ready_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0);
915        let health_status: u8 = match phase {
916            Phase::Stopped => 0,
917            Phase::Starting | Phase::ShuttingDown => 0,
918            Phase::Draining => 1,
919            Phase::Ready => 2,
920        };
921        let read_only = self.runtime.write_gate().is_read_only();
922        let role = match self.runtime.write_gate().role() {
923            crate::replication::ReplicationRole::Standalone => "standalone",
924            crate::replication::ReplicationRole::Primary => "primary",
925            crate::replication::ReplicationRole::Replica { .. } => "replica",
926        };
927        let db_size_bytes = self
928            .runtime
929            .db()
930            .path()
931            .and_then(|p| std::fs::metadata(p).ok())
932            .map(|m| m.len())
933            .unwrap_or(0);
934        let runtime_stats = self.runtime.stats();
935        let result_blob_stats = runtime_stats.result_blob_cache;
936        let kv_stats = runtime_stats.kv;
937        let metrics_ingest = runtime_stats.metrics_ingest;
938
939        let mut body = String::with_capacity(1024);
940        let _ = writeln!(
941            body,
942            "# HELP reddb_uptime_seconds Seconds since the runtime was constructed."
943        );
944        let _ = writeln!(body, "# TYPE reddb_uptime_seconds gauge");
945        let _ = writeln!(body, "reddb_uptime_seconds {}", uptime_secs);
946
947        let _ = writeln!(
948            body,
949            "# HELP reddb_health_status 0=down/starting, 1=degraded/draining, 2=ready."
950        );
951        let _ = writeln!(body, "# TYPE reddb_health_status gauge");
952        let _ = writeln!(body, "reddb_health_status {}", health_status);
953
954        let _ = writeln!(
955            body,
956            "# HELP reddb_phase Lifecycle phase as a labeled gauge (always 1; phase in label)."
957        );
958        let _ = writeln!(body, "# TYPE reddb_phase gauge");
959        let _ = writeln!(body, "reddb_phase{{phase=\"{}\"}} 1", phase.as_str());
960
961        let _ = writeln!(
962            body,
963            "# HELP reddb_read_only 1 when public mutations are gated, 0 otherwise."
964        );
965        let _ = writeln!(body, "# TYPE reddb_read_only gauge");
966        let _ = writeln!(body, "reddb_read_only {}", if read_only { 1 } else { 0 });
967
968        let _ = writeln!(
969            body,
970            "# HELP reddb_replication_role Replication role of this instance."
971        );
972        let _ = writeln!(body, "# TYPE reddb_replication_role gauge");
973        let _ = writeln!(body, "reddb_replication_role{{role=\"{}\"}} 1", role);
974
975        // PLAN.md Phase 5 / W6 — serverless writer lease state.
976        // `not_required` for instances that opted out of lease fencing;
977        // `held` / `not_held` for instances behind the fence so dashboards
978        // can alert on lease loss without scraping logs.
979        let lease_state = self.runtime.write_gate().lease_state();
980        let _ = writeln!(
981            body,
982            "# HELP reddb_writer_lease_state Serverless writer-lease gate state (label)."
983        );
984        let _ = writeln!(body, "# TYPE reddb_writer_lease_state gauge");
985        let _ = writeln!(
986            body,
987            "reddb_writer_lease_state{{state=\"{}\"}} 1",
988            lease_state.label()
989        );
990
991        // PLAN.md Phase 5.1 — backup + WAL archive lag.
992        // These are the SRE signals an orchestrator alerts on when a
993        // serverless instance is healthy on the surface but its DR
994        // posture has degraded silently.
995        let backup_status = self.runtime.backup_status();
996        if let Some(last) = backup_status.last_backup.as_ref() {
997            let last_ts_secs = (last.timestamp as f64) / 1000.0;
998            let _ = writeln!(
999                body,
1000                "# HELP reddb_backup_last_success_timestamp_seconds Unix ts (s) of the most recent successful backup."
1001            );
1002            let _ = writeln!(
1003                body,
1004                "# TYPE reddb_backup_last_success_timestamp_seconds gauge"
1005            );
1006            let _ = writeln!(
1007                body,
1008                "reddb_backup_last_success_timestamp_seconds {}",
1009                last_ts_secs
1010            );
1011            let age_secs = ((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0;
1012            let _ = writeln!(
1013                body,
1014                "# HELP reddb_backup_age_seconds Seconds since last successful backup."
1015            );
1016            let _ = writeln!(body, "# TYPE reddb_backup_age_seconds gauge");
1017            let _ = writeln!(body, "reddb_backup_age_seconds {}", age_secs);
1018            let _ = writeln!(
1019                body,
1020                "# HELP reddb_backup_last_duration_seconds Wall-clock duration of the most recent backup."
1021            );
1022            let _ = writeln!(body, "# TYPE reddb_backup_last_duration_seconds gauge");
1023            let _ = writeln!(
1024                body,
1025                "reddb_backup_last_duration_seconds {}",
1026                (last.duration_ms as f64) / 1000.0
1027            );
1028        }
1029        let _ = writeln!(
1030            body,
1031            "# HELP reddb_backup_failures_total Total backup failures since process start."
1032        );
1033        let _ = writeln!(body, "# TYPE reddb_backup_failures_total counter");
1034        let _ = writeln!(
1035            body,
1036            "reddb_backup_failures_total {}",
1037            backup_status.total_failures
1038        );
1039        let _ = writeln!(
1040            body,
1041            "# HELP reddb_backup_total_total Total successful backups since process start."
1042        );
1043        let _ = writeln!(body, "# TYPE reddb_backup_total_total counter");
1044        let _ = writeln!(
1045            body,
1046            "reddb_backup_total_total {}",
1047            backup_status.total_backups
1048        );
1049
1050        // WAL archive lag — distance between the engine's current LSN
1051        // and the last archived LSN. Operators alert when this grows
1052        // unbounded; it means archive uploads are failing or paused
1053        // (e.g. backend unreachable, lease lost).
1054        let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1055        let lag = current_lsn.saturating_sub(last_archived_lsn);
1056        let _ = writeln!(
1057            body,
1058            "# HELP reddb_wal_current_lsn Current local LSN (most recent record visible to writers)."
1059        );
1060        let _ = writeln!(body, "# TYPE reddb_wal_current_lsn gauge");
1061        let _ = writeln!(body, "reddb_wal_current_lsn {}", current_lsn);
1062        let _ = writeln!(
1063            body,
1064            "# HELP reddb_wal_last_archived_lsn LSN of the most recently archived WAL segment."
1065        );
1066        let _ = writeln!(body, "# TYPE reddb_wal_last_archived_lsn gauge");
1067        let _ = writeln!(body, "reddb_wal_last_archived_lsn {}", last_archived_lsn);
1068        let _ = writeln!(
1069            body,
1070            "# HELP reddb_wal_archive_lag_records Records between current LSN and last archived LSN."
1071        );
1072        let _ = writeln!(body, "# TYPE reddb_wal_archive_lag_records gauge");
1073        let _ = writeln!(body, "reddb_wal_archive_lag_records {}", lag);
1074
1075        let _ = writeln!(
1076            body,
1077            "# HELP reddb_metrics_remote_write_samples_accepted_total Metrics remote-write samples accepted since process start."
1078        );
1079        let _ = writeln!(
1080            body,
1081            "# TYPE reddb_metrics_remote_write_samples_accepted_total counter"
1082        );
1083        let _ = writeln!(
1084            body,
1085            "reddb_metrics_remote_write_samples_accepted_total {}",
1086            metrics_ingest.samples_accepted
1087        );
1088        let _ = writeln!(
1089            body,
1090            "# HELP reddb_metrics_remote_write_series_accepted_total Metrics remote-write series accepted since process start."
1091        );
1092        let _ = writeln!(
1093            body,
1094            "# TYPE reddb_metrics_remote_write_series_accepted_total counter"
1095        );
1096        let _ = writeln!(
1097            body,
1098            "reddb_metrics_remote_write_series_accepted_total {}",
1099            metrics_ingest.series_accepted
1100        );
1101        let _ = writeln!(
1102            body,
1103            "# HELP reddb_metrics_remote_write_samples_rejected_total Metrics remote-write samples rejected since process start."
1104        );
1105        let _ = writeln!(
1106            body,
1107            "# TYPE reddb_metrics_remote_write_samples_rejected_total counter"
1108        );
1109        let _ = writeln!(
1110            body,
1111            "reddb_metrics_remote_write_samples_rejected_total {}",
1112            metrics_ingest.samples_rejected
1113        );
1114        let _ = writeln!(
1115            body,
1116            "# HELP reddb_metrics_remote_write_series_rejected_total Metrics remote-write series rejected since process start."
1117        );
1118        let _ = writeln!(
1119            body,
1120            "# TYPE reddb_metrics_remote_write_series_rejected_total counter"
1121        );
1122        let _ = writeln!(
1123            body,
1124            "reddb_metrics_remote_write_series_rejected_total {}",
1125            metrics_ingest.series_rejected
1126        );
1127        let _ = writeln!(
1128            body,
1129            "# HELP reddb_metrics_remote_write_series_rejected_by_reason_total Metrics remote-write series rejected since process start by reason."
1130        );
1131        let _ = writeln!(
1132            body,
1133            "# TYPE reddb_metrics_remote_write_series_rejected_by_reason_total counter"
1134        );
1135        let _ = writeln!(
1136            body,
1137            "reddb_metrics_remote_write_series_rejected_by_reason_total{{reason=\"cardinality_budget\"}} {}",
1138            metrics_ingest.series_rejected_cardinality_budget
1139        );
1140        let _ = writeln!(
1141            body,
1142            "# HELP reddb_metrics_tenant_activity_total Metrics adapter requests by tenant, namespace, and operation since process start."
1143        );
1144        let _ = writeln!(body, "# TYPE reddb_metrics_tenant_activity_total counter");
1145        for activity in self.runtime.metrics_tenant_activity_snapshot() {
1146            let _ = writeln!(
1147                body,
1148                "reddb_metrics_tenant_activity_total{{tenant=\"{}\",namespace=\"{}\",operation=\"{}\"}} {}",
1149                sanitize_label(&activity.tenant),
1150                sanitize_label(&activity.namespace),
1151                sanitize_label(&activity.operation),
1152                activity.count
1153            );
1154        }
1155
1156        // PLAN.md Phase 11.4 — per-replica lag visibility. Emitted
1157        // when this primary has registered replicas; replicas that
1158        // haven't ack'd anything yet (`last_acked_lsn == 0`) still
1159        // show up so dashboards can detect "registered but stuck".
1160        let replicas = self.runtime.primary_replica_snapshots();
1161        let _ = writeln!(
1162            body,
1163            "# HELP reddb_replica_count Currently registered replicas."
1164        );
1165        let _ = writeln!(body, "# TYPE reddb_replica_count gauge");
1166        let _ = writeln!(body, "reddb_replica_count {}", replicas.len());
1167        if !replicas.is_empty() {
1168            let replica_lag_budget_secs = std::env::var("RED_SLO_REPLICA_LAG_BUDGET_SECONDS")
1169                .ok()
1170                .and_then(|value| value.parse::<f64>().ok())
1171                .filter(|value| value.is_finite() && *value >= 0.0)
1172                .unwrap_or(60.0);
1173            let _ = writeln!(
1174                body,
1175                "# HELP reddb_replica_ack_lsn Most recent LSN acked by each replica."
1176            );
1177            let _ = writeln!(body, "# TYPE reddb_replica_ack_lsn gauge");
1178            for r in &replicas {
1179                let _ = writeln!(
1180                    body,
1181                    "reddb_replica_ack_lsn{{replica_id=\"{}\"}} {}",
1182                    sanitize_label(&r.id),
1183                    r.last_acked_lsn
1184                );
1185            }
1186            let _ = writeln!(
1187                body,
1188                "# HELP reddb_replica_lag_records Distance from primary current LSN to replica acked LSN."
1189            );
1190            let _ = writeln!(body, "# TYPE reddb_replica_lag_records gauge");
1191            for r in &replicas {
1192                let _ = writeln!(
1193                    body,
1194                    "reddb_replica_lag_records{{replica_id=\"{}\"}} {}",
1195                    sanitize_label(&r.id),
1196                    current_lsn.saturating_sub(r.last_acked_lsn)
1197                );
1198            }
1199            let _ = writeln!(
1200                body,
1201                "# HELP reddb_replica_lag_seconds Wall-clock seconds since the replica was last seen."
1202            );
1203            let _ = writeln!(body, "# TYPE reddb_replica_lag_seconds gauge");
1204            let _ = writeln!(
1205                body,
1206                "# HELP reddb_slo_lag_budget_remaining_seconds Remaining per-replica lag budget; negative means SLO breach."
1207            );
1208            let _ = writeln!(body, "# TYPE reddb_slo_lag_budget_remaining_seconds gauge");
1209            for r in &replicas {
1210                let lag_ms = (now_ms as u128).saturating_sub(r.last_seen_at_unix_ms);
1211                let lag_secs = (lag_ms as f64) / 1000.0;
1212                let _ = writeln!(
1213                    body,
1214                    "reddb_replica_lag_seconds{{replica_id=\"{}\"}} {}",
1215                    sanitize_label(&r.id),
1216                    lag_secs
1217                );
1218                let _ = writeln!(
1219                    body,
1220                    "reddb_slo_lag_budget_remaining_seconds{{replica_id=\"{}\"}} {}",
1221                    sanitize_label(&r.id),
1222                    replica_lag_budget_secs - lag_secs
1223                );
1224            }
1225        }
1226
1227        // PLAN.md Phase 11.5 — replica apply error counters and
1228        // current health label. Counters are global across the
1229        // instance lifetime; the health label reflects whatever the
1230        // replica loop last persisted (`ok`, `connecting`, `gap`,
1231        // `divergence`, `apply_error`, `stalled_gap`).
1232        let _ = writeln!(
1233            body,
1234            "# HELP reddb_replica_apply_errors_total Replica WAL apply errors since process start, by kind."
1235        );
1236        let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
1237        for (kind, count) in self.runtime.replica_apply_error_counts() {
1238            let _ = writeln!(
1239                body,
1240                "reddb_replica_apply_errors_total{{kind=\"{}\"}} {}",
1241                kind.label(),
1242                count
1243            );
1244        }
1245        if let Some(health) = self.runtime.replica_apply_health() {
1246            let _ = writeln!(
1247                body,
1248                "# HELP reddb_replica_apply_health Replica apply state (label, value=1)."
1249            );
1250            let _ = writeln!(body, "# TYPE reddb_replica_apply_health gauge");
1251            let _ = writeln!(
1252                body,
1253                "reddb_replica_apply_health{{state=\"{}\"}} 1",
1254                sanitize_label(&health)
1255            );
1256        }
1257
1258        // PLAN.md Phase 4.4 — per-caller quota rejections. Empty
1259        // when the quota is unconfigured or no caller has been
1260        // throttled yet. Opportunistic eviction here keeps the
1261        // rejection map bounded on long-lived processes.
1262        self.runtime.quota_bucket().evict_idle();
1263        let rejections = self.runtime.quota_bucket().rejection_snapshot();
1264        if !rejections.is_empty() {
1265            let _ = writeln!(
1266                body,
1267                "# HELP reddb_quota_rejected_total Requests rejected by per-caller QPS quota."
1268            );
1269            let _ = writeln!(body, "# TYPE reddb_quota_rejected_total counter");
1270            for (principal, count) in &rejections {
1271                let _ = writeln!(
1272                    body,
1273                    "reddb_quota_rejected_total{{principal=\"{}\"}} {}",
1274                    sanitize_label(principal),
1275                    count
1276                );
1277            }
1278        }
1279
1280        // PLAN.md Phase 11.4 — commit waiter outcome counters and
1281        // last-wait gauge. Operators alert when `timed_out` rises
1282        // (policy too tight or replicas stalled) and watch the
1283        // last-wait gauge for p95 trends.
1284        let (reached, timed_out, not_required, last_micros) =
1285            self.runtime.commit_waiter_metrics_snapshot();
1286        let _ = writeln!(
1287            body,
1288            "# HELP reddb_commit_wait_total Commit-wait outcomes by kind."
1289        );
1290        let _ = writeln!(body, "# TYPE reddb_commit_wait_total counter");
1291        let _ = writeln!(
1292            body,
1293            "reddb_commit_wait_total{{outcome=\"reached\"}} {}",
1294            reached
1295        );
1296        let _ = writeln!(
1297            body,
1298            "reddb_commit_wait_total{{outcome=\"timed_out\"}} {}",
1299            timed_out
1300        );
1301        let _ = writeln!(
1302            body,
1303            "reddb_commit_wait_total{{outcome=\"not_required\"}} {}",
1304            not_required
1305        );
1306        let _ = writeln!(
1307            body,
1308            "# HELP reddb_commit_wait_last_seconds Wall-clock seconds of the most recent commit wait."
1309        );
1310        let _ = writeln!(body, "# TYPE reddb_commit_wait_last_seconds gauge");
1311        let _ = writeln!(
1312            body,
1313            "reddb_commit_wait_last_seconds {}",
1314            (last_micros as f64) / 1_000_000.0
1315        );
1316
1317        // PLAN.md Phase 11.4 — declared commit policy as a labeled
1318        // gauge so dashboards can confirm what the operator pinned.
1319        // The default `local` is emitted even when no replication is
1320        // configured, so the metric is always present.
1321        let policy = self.runtime.commit_policy();
1322        let _ = writeln!(
1323            body,
1324            "# HELP reddb_primary_commit_policy Active commit policy on the primary."
1325        );
1326        let _ = writeln!(body, "# TYPE reddb_primary_commit_policy gauge");
1327        let _ = writeln!(
1328            body,
1329            "reddb_primary_commit_policy{{policy=\"{}\"}} 1",
1330            policy.label()
1331        );
1332
1333        // Blob Cache observability for the SQL result-cache adapter.
1334        // Per-namespace label cardinality is acceptable while the MVP namespace
1335        // cap stays near 256; raising that cap should move per-namespace detail
1336        // to an on-demand admin query and keep scrape metrics rolled up.
1337        let blob_ns = "runtime.result_cache";
1338        let _ = writeln!(
1339            body,
1340            "# HELP reddb_cache_blob_get_total Blob Cache get outcomes by namespace."
1341        );
1342        let _ = writeln!(body, "# TYPE reddb_cache_blob_get_total counter");
1343        let _ = writeln!(
1344            body,
1345            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l1\"}} {}",
1346            blob_ns,
1347            result_blob_stats.hits()
1348        );
1349        let _ = writeln!(
1350            body,
1351            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l2\"}} 0",
1352            blob_ns
1353        );
1354        let _ = writeln!(
1355            body,
1356            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"miss\"}} {}",
1357            blob_ns,
1358            result_blob_stats.misses()
1359        );
1360        let _ = writeln!(
1361            body,
1362            "# HELP reddb_cache_blob_put_total Blob Cache put outcomes by namespace."
1363        );
1364        let _ = writeln!(body, "# TYPE reddb_cache_blob_put_total counter");
1365        let _ = writeln!(
1366            body,
1367            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"ok\"}} {}",
1368            blob_ns,
1369            result_blob_stats.insertions()
1370        );
1371        let _ = writeln!(
1372            body,
1373            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"version_mismatch\"}} {}",
1374            blob_ns,
1375            result_blob_stats.version_mismatches()
1376        );
1377        let _ = writeln!(
1378            body,
1379            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"too_large\"}} 0",
1380            blob_ns
1381        );
1382        let _ = writeln!(
1383            body,
1384            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"metadata_too_large\"}} 0",
1385            blob_ns
1386        );
1387        let _ = writeln!(
1388            body,
1389            "# HELP reddb_cache_blob_invalidate_total Blob Cache invalidations by namespace and kind."
1390        );
1391        let _ = writeln!(body, "# TYPE reddb_cache_blob_invalidate_total counter");
1392        for (kind, count) in [
1393            ("key", 0),
1394            ("prefix", 0),
1395            ("tag", 0),
1396            ("dependency", result_blob_stats.invalidations()),
1397            ("namespace", result_blob_stats.namespace_flushes()),
1398        ] {
1399            let _ = writeln!(
1400                body,
1401                "reddb_cache_blob_invalidate_total{{namespace=\"{}\",kind=\"{}\"}} {}",
1402                blob_ns, kind, count
1403            );
1404        }
1405        let _ = writeln!(
1406            body,
1407            "# HELP reddb_cache_blob_evict_total Blob Cache evictions by namespace and reason."
1408        );
1409        let _ = writeln!(body, "# TYPE reddb_cache_blob_evict_total counter");
1410        for (reason, count) in [
1411            ("capacity", result_blob_stats.evictions()),
1412            ("expiry", result_blob_stats.expirations()),
1413            ("policy", 0),
1414        ] {
1415            let _ = writeln!(
1416                body,
1417                "reddb_cache_blob_evict_total{{namespace=\"{}\",reason=\"{}\"}} {}",
1418                blob_ns, reason, count
1419            );
1420        }
1421        let _ = writeln!(
1422            body,
1423            "# HELP reddb_cache_blob_l1_bytes_in_use L1 bytes currently used by Blob Cache namespace."
1424        );
1425        let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_bytes_in_use gauge");
1426        let _ = writeln!(
1427            body,
1428            "reddb_cache_blob_l1_bytes_in_use{{namespace=\"{}\"}} {}",
1429            blob_ns,
1430            result_blob_stats.bytes_in_use()
1431        );
1432        let _ = writeln!(
1433            body,
1434            "# HELP reddb_cache_blob_l1_entries L1 entries currently held by Blob Cache namespace."
1435        );
1436        let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_entries gauge");
1437        let _ = writeln!(
1438            body,
1439            "reddb_cache_blob_l1_entries{{namespace=\"{}\"}} {}",
1440            blob_ns,
1441            result_blob_stats.entries()
1442        );
1443        let _ = writeln!(
1444            body,
1445            "# HELP reddb_cache_blob_l2_bytes_in_use L2 bytes currently used by Blob Cache namespace."
1446        );
1447        let _ = writeln!(body, "# TYPE reddb_cache_blob_l2_bytes_in_use gauge");
1448        let _ = writeln!(
1449            body,
1450            "reddb_cache_blob_l2_bytes_in_use{{namespace=\"{}\"}} {}",
1451            blob_ns,
1452            result_blob_stats.l2_bytes_in_use()
1453        );
1454        let _ = writeln!(
1455            body,
1456            "# HELP reddb_cache_blob_l2_full_rejections_total Blob Cache puts rejected because L2 is full."
1457        );
1458        let _ = writeln!(
1459            body,
1460            "# TYPE reddb_cache_blob_l2_full_rejections_total counter"
1461        );
1462        let _ = writeln!(
1463            body,
1464            "reddb_cache_blob_l2_full_rejections_total{{namespace=\"{}\"}} {}",
1465            blob_ns,
1466            result_blob_stats.l2_full_rejections()
1467        );
1468        let _ = writeln!(
1469            body,
1470            "# HELP reddb_cache_blob_version_mismatch_total Blob Cache CAS version mismatches by namespace."
1471        );
1472        let _ = writeln!(
1473            body,
1474            "# TYPE reddb_cache_blob_version_mismatch_total counter"
1475        );
1476        let _ = writeln!(
1477            body,
1478            "reddb_cache_blob_version_mismatch_total{{namespace=\"{}\"}} {}",
1479            blob_ns,
1480            result_blob_stats.version_mismatches()
1481        );
1482
1483        let _ = writeln!(
1484            body,
1485            "# HELP reddb_kv_ops_total Normal-KV operations since process start."
1486        );
1487        let _ = writeln!(body, "# TYPE reddb_kv_ops_total counter");
1488        for (verb, count) in [
1489            ("put", kv_stats.puts),
1490            ("get", kv_stats.gets),
1491            ("delete", kv_stats.deletes),
1492            ("incr", kv_stats.incrs),
1493        ] {
1494            let _ = writeln!(body, "reddb_kv_ops_total{{verb=\"{}\"}} {}", verb, count);
1495        }
1496        let _ = writeln!(
1497            body,
1498            "# HELP reddb_kv_cas_total Normal-KV CAS outcomes since process start."
1499        );
1500        let _ = writeln!(body, "# TYPE reddb_kv_cas_total counter");
1501        let _ = writeln!(
1502            body,
1503            "reddb_kv_cas_total{{outcome=\"success\"}} {}",
1504            kv_stats.cas_success
1505        );
1506        let _ = writeln!(
1507            body,
1508            "reddb_kv_cas_total{{outcome=\"conflict\"}} {}",
1509            kv_stats.cas_conflict
1510        );
1511        let _ = writeln!(
1512            body,
1513            "# HELP reddb_kv_watch_streams_active Active normal-KV WATCH streams."
1514        );
1515        let _ = writeln!(body, "# TYPE reddb_kv_watch_streams_active gauge");
1516        let _ = writeln!(
1517            body,
1518            "reddb_kv_watch_streams_active {}",
1519            kv_stats.watch_streams_active
1520        );
1521        let _ = writeln!(
1522            body,
1523            "# HELP reddb_kv_watch_events_emitted_total Normal-KV WATCH events emitted since process start."
1524        );
1525        let _ = writeln!(body, "# TYPE reddb_kv_watch_events_emitted_total counter");
1526        let _ = writeln!(
1527            body,
1528            "reddb_kv_watch_events_emitted_total {}",
1529            kv_stats.watch_events_emitted
1530        );
1531        let _ = writeln!(
1532            body,
1533            "# HELP reddb_kv_watch_drops_total Normal-KV WATCH events dropped by bounded subscriber buffers."
1534        );
1535        let _ = writeln!(body, "# TYPE reddb_kv_watch_drops_total counter");
1536        let _ = writeln!(body, "reddb_kv_watch_drops_total {}", kv_stats.watch_drops);
1537
1538        let _ = writeln!(
1539            body,
1540            "# HELP reddb_db_size_bytes On-disk size of the primary database file."
1541        );
1542        let _ = writeln!(body, "# TYPE reddb_db_size_bytes gauge");
1543        let _ = writeln!(body, "reddb_db_size_bytes {}", db_size_bytes);
1544
1545        if let Some(secs) = cold_start_secs {
1546            let _ = writeln!(
1547                body,
1548                "# HELP reddb_cold_start_duration_seconds Seconds from process start to /health/ready 200."
1549            );
1550            let _ = writeln!(body, "# TYPE reddb_cold_start_duration_seconds gauge");
1551            let _ = writeln!(body, "reddb_cold_start_duration_seconds {}", secs);
1552        }
1553
1554        // PLAN.md Phase 9.1 — per-phase cold-start breakdown.
1555        // Operators use this to identify which phase dominates the
1556        // cold-start budget (restore, WAL replay, index warmup).
1557        // Phases that haven't fired yet are simply absent — no zero
1558        // entries to confuse alert rules.
1559        let phases = lifecycle.cold_start_phases().durations_ms();
1560        if !phases.is_empty() {
1561            let _ = writeln!(
1562                body,
1563                "# HELP reddb_cold_start_phase_seconds Per-phase cold-start duration."
1564            );
1565            let _ = writeln!(body, "# TYPE reddb_cold_start_phase_seconds gauge");
1566            for (name, dur_ms) in phases {
1567                let _ = writeln!(
1568                    body,
1569                    "reddb_cold_start_phase_seconds{{phase=\"{}\"}} {}",
1570                    name,
1571                    (dur_ms as f64) / 1000.0
1572                );
1573            }
1574        }
1575
1576        // Operator-imposed limits (PLAN.md Phase 4.1). Emitted as
1577        // gauges so external dashboards can graph headroom against
1578        // current usage. `0` means "no cap pinned at boot"; we
1579        // still emit it so absence vs presence is unambiguous.
1580        let limits = self.runtime.resource_limits();
1581        if let Some(v) = limits.max_db_size_bytes {
1582            let _ = writeln!(
1583                body,
1584                "# HELP reddb_limit_db_size_bytes Operator-pinned cap on the primary DB file size."
1585            );
1586            let _ = writeln!(body, "# TYPE reddb_limit_db_size_bytes gauge");
1587            let _ = writeln!(body, "reddb_limit_db_size_bytes {}", v);
1588        }
1589        if let Some(v) = limits.max_connections {
1590            let _ = writeln!(body, "# TYPE reddb_limit_connections gauge");
1591            let _ = writeln!(body, "reddb_limit_connections {}", v);
1592        }
1593        if let Some(v) = limits.max_qps {
1594            let _ = writeln!(body, "# TYPE reddb_limit_qps gauge");
1595            let _ = writeln!(body, "reddb_limit_qps {}", v);
1596        }
1597        if let Some(v) = limits.max_batch_size {
1598            let _ = writeln!(body, "# TYPE reddb_limit_batch_size gauge");
1599            let _ = writeln!(body, "reddb_limit_batch_size {}", v);
1600        }
1601        if let Some(v) = limits.max_memory_bytes {
1602            let _ = writeln!(body, "# TYPE reddb_limit_memory_bytes gauge");
1603            let _ = writeln!(body, "reddb_limit_memory_bytes {}", v);
1604        }
1605
1606        // Queue lifecycle counters — slice 10 of issue #527 / ADR-0017.
1607        // Process-local counters per (queue, group, mode); the
1608        // pending gauge is scraped live from `red_queue_meta` so it
1609        // cannot drift from the source of truth. Cardinality is
1610        // bounded by the catalog: only queues/groups the operator
1611        // already created appear here.
1612        {
1613            let queue_telemetry = self.runtime.queue_telemetry_snapshot();
1614            let _ = writeln!(
1615                body,
1616                "# HELP queue_delivered_total Messages handed to a consumer (per queue/group/mode)."
1617            );
1618            let _ = writeln!(body, "# TYPE queue_delivered_total counter");
1619            for ((queue, group, mode), n) in &queue_telemetry.delivered {
1620                let _ = writeln!(
1621                    body,
1622                    "queue_delivered_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
1623                    sanitize_label(queue),
1624                    sanitize_label(group),
1625                    sanitize_label(mode),
1626                    n
1627                );
1628            }
1629            let _ = writeln!(
1630                body,
1631                "# HELP queue_acked_total Messages acknowledged (per queue/group/mode)."
1632            );
1633            let _ = writeln!(body, "# TYPE queue_acked_total counter");
1634            for ((queue, group, mode), n) in &queue_telemetry.acked {
1635                let _ = writeln!(
1636                    body,
1637                    "queue_acked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
1638                    sanitize_label(queue),
1639                    sanitize_label(group),
1640                    sanitize_label(mode),
1641                    n
1642                );
1643            }
1644            let _ = writeln!(
1645                body,
1646                "# HELP queue_nacked_total Messages negatively-acknowledged (per queue/group/mode/outcome)."
1647            );
1648            let _ = writeln!(body, "# TYPE queue_nacked_total counter");
1649            for ((queue, group, mode, outcome), n) in &queue_telemetry.nacked {
1650                let _ = writeln!(
1651                    body,
1652                    "queue_nacked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\",outcome=\"{}\"}} {}",
1653                    sanitize_label(queue),
1654                    sanitize_label(group),
1655                    sanitize_label(mode),
1656                    outcome,
1657                    n
1658                );
1659            }
1660            let pending = self.runtime.queue_pending_counts();
1661            let _ = writeln!(
1662                body,
1663                "# HELP queue_pending_gauge In-flight (delivered, not yet acked) messages per queue/group."
1664            );
1665            let _ = writeln!(body, "# TYPE queue_pending_gauge gauge");
1666            for ((queue, group), n) in &pending {
1667                let _ = writeln!(
1668                    body,
1669                    "queue_pending_gauge{{queue=\"{}\",group=\"{}\"}} {}",
1670                    sanitize_label(queue),
1671                    sanitize_label(group),
1672                    n
1673                );
1674            }
1675        }
1676
1677        // Events outbox metrics — issue #299
1678        {
1679            use crate::runtime::impl_queue::{
1680                EVENTS_DLQ_TOTAL, EVENTS_DRAIN_RETRIES_TOTAL, EVENTS_ENQUEUED_TOTAL,
1681            };
1682            let enqueued = EVENTS_ENQUEUED_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1683            let retries = EVENTS_DRAIN_RETRIES_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1684            let dlq_total = EVENTS_DLQ_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1685
1686            let _ = writeln!(
1687                body,
1688                "# HELP reddb_events_enqueued_total Total events successfully pushed to target queues."
1689            );
1690            let _ = writeln!(body, "# TYPE reddb_events_enqueued_total counter");
1691            let _ = writeln!(body, "reddb_events_enqueued_total {enqueued}");
1692
1693            let _ = writeln!(
1694                body,
1695                "# HELP reddb_events_drain_retries_total Total event push failures that triggered DLQ routing."
1696            );
1697            let _ = writeln!(body, "# TYPE reddb_events_drain_retries_total counter");
1698            let _ = writeln!(
1699                body,
1700                "reddb_events_drain_retries_total{{reason=\"queue_full\"}} {retries}"
1701            );
1702
1703            let _ = writeln!(
1704                body,
1705                "# HELP reddb_events_dlq_total Total events routed to dead-letter queues."
1706            );
1707            let _ = writeln!(body, "# TYPE reddb_events_dlq_total counter");
1708            let _ = writeln!(body, "reddb_events_dlq_total {dlq_total}");
1709        }
1710
1711        // AI provider and embedding metrics — issue #280.
1712        crate::runtime::ai::metrics::render_ai_metrics(&mut body);
1713
1714        // HTTP handler-thread pool metrics — issue #573 slice 4.
1715        // Renders four series (`http_active_handler_threads`,
1716        // `http_handler_cap`, `http_handler_rejected_total`,
1717        // `http_handler_duration_seconds`) so operators can observe
1718        // saturation against the bounded handler-thread cap.
1719        self.http_metrics().render(&mut body, self.http_limiter());
1720
1721        HttpResponse {
1722            status: 200,
1723            content_type: "text/plain; version=0.0.4",
1724            body: body.into_bytes(),
1725            extra_headers: Vec::new(),
1726        }
1727    }
1728
1729    /// `GET /admin/status` — full structured snapshot of operator-
1730    /// relevant state (PLAN.md Phase 5.4). One JSON object that
1731    /// frontend dashboards / control-plane sidecars can poll
1732    /// without scraping multiple endpoints.
1733    pub(crate) fn handle_admin_status(&self) -> HttpResponse {
1734        let lifecycle = self.runtime.lifecycle();
1735        let phase = lifecycle.phase();
1736        let now_ms = std::time::SystemTime::now()
1737            .duration_since(std::time::UNIX_EPOCH)
1738            .map(|d| d.as_millis() as u64)
1739            .unwrap_or(0);
1740        let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
1741        let read_only = self.runtime.write_gate().is_read_only();
1742        let role = match self.runtime.write_gate().role() {
1743            crate::replication::ReplicationRole::Standalone => "standalone",
1744            crate::replication::ReplicationRole::Primary => "primary",
1745            crate::replication::ReplicationRole::Replica { .. } => "replica",
1746        };
1747        let db = self.runtime.db();
1748        let db_size_bytes = db
1749            .path()
1750            .and_then(|p| std::fs::metadata(p).ok())
1751            .map(|m| m.len())
1752            .unwrap_or(0);
1753        let backend_kind = db
1754            .options()
1755            .remote_backend
1756            .as_ref()
1757            .map(|b| b.name().to_string());
1758
1759        let mut object = Map::new();
1760        object.insert(
1761            "version".to_string(),
1762            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
1763        );
1764        object.insert(
1765            "phase".to_string(),
1766            JsonValue::String(phase.as_str().to_string()),
1767        );
1768        object.insert(
1769            "uptime_secs".to_string(),
1770            JsonValue::Number((uptime_secs * 1000.0).round() / 1000.0),
1771        );
1772        object.insert(
1773            "started_at_unix_ms".to_string(),
1774            JsonValue::Number(lifecycle.started_at_ms() as f64),
1775        );
1776        if let Some(ready_at) = lifecycle.ready_at_ms() {
1777            object.insert(
1778                "ready_at_unix_ms".to_string(),
1779                JsonValue::Number(ready_at as f64),
1780            );
1781        }
1782        object.insert(
1783            "db_size_bytes".to_string(),
1784            JsonValue::Number(db_size_bytes as f64),
1785        );
1786        object.insert("read_only".to_string(), JsonValue::Bool(read_only));
1787        object.insert(
1788            "replication_role".to_string(),
1789            JsonValue::String(role.to_string()),
1790        );
1791        object.insert(
1792            "writer_lease".to_string(),
1793            JsonValue::String(self.runtime.write_gate().lease_state().label().to_string()),
1794        );
1795
1796        // PLAN.md Phase 6.3 — surface encryption-at-rest configuration
1797        // so dashboards / `red doctor` can flag a misconfigured key
1798        // (Err on parse) before it silently leaves data plaintext.
1799        let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
1800        let mut enc_obj = Map::new();
1801        enc_obj.insert(
1802            "state".to_string(),
1803            JsonValue::String(enc_state.to_string()),
1804        );
1805        if let Some(err) = enc_error {
1806            enc_obj.insert("error".to_string(), JsonValue::String(err));
1807        }
1808        object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc_obj));
1809
1810        // Backup posture (PLAN.md Phase 5.1). `last_backup` carries
1811        // the same shape /metrics emits so dashboards and alert rules
1812        // share a single contract.
1813        let backup = self.runtime.backup_status();
1814        let mut backup_obj = Map::new();
1815        if let Some(last) = backup.last_backup.as_ref() {
1816            backup_obj.insert(
1817                "last_success_unix_ms".to_string(),
1818                JsonValue::Number(last.timestamp as f64),
1819            );
1820            backup_obj.insert(
1821                "last_duration_ms".to_string(),
1822                JsonValue::Number(last.duration_ms as f64),
1823            );
1824            backup_obj.insert(
1825                "age_seconds".to_string(),
1826                JsonValue::Number(((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0),
1827            );
1828        }
1829        backup_obj.insert(
1830            "total_successes".to_string(),
1831            JsonValue::Number(backup.total_backups as f64),
1832        );
1833        backup_obj.insert(
1834            "total_failures".to_string(),
1835            JsonValue::Number(backup.total_failures as f64),
1836        );
1837        backup_obj.insert(
1838            "interval_secs".to_string(),
1839            JsonValue::Number(backup.interval_secs as f64),
1840        );
1841        object.insert("backup".to_string(), JsonValue::Object(backup_obj));
1842
1843        // WAL archive lag.
1844        let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1845        let mut wal_obj = Map::new();
1846        wal_obj.insert(
1847            "current_lsn".to_string(),
1848            JsonValue::Number(current_lsn as f64),
1849        );
1850        wal_obj.insert(
1851            "last_archived_lsn".to_string(),
1852            JsonValue::Number(last_archived_lsn as f64),
1853        );
1854        wal_obj.insert(
1855            "archive_lag_records".to_string(),
1856            JsonValue::Number(current_lsn.saturating_sub(last_archived_lsn) as f64),
1857        );
1858        object.insert("wal".to_string(), JsonValue::Object(wal_obj));
1859
1860        // PLAN.md Phase 11.5 — replica apply health + counters.
1861        // Always emit so dashboards have a stable shape; missing
1862        // health label means this isn't a replica or no apply has
1863        // happened yet.
1864        let mut replica_obj = Map::new();
1865        if let Some(health) = self.runtime.replica_apply_health() {
1866            replica_obj.insert("apply_health".to_string(), JsonValue::String(health));
1867        }
1868        let mut errors_obj = Map::new();
1869        for (kind, count) in self.runtime.replica_apply_error_counts() {
1870            errors_obj.insert(kind.label().to_string(), JsonValue::Number(count as f64));
1871        }
1872        replica_obj.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
1873        // Per-replica array (primary view). Empty on replica/standalone.
1874        let snaps = self.runtime.primary_replica_snapshots();
1875        if !snaps.is_empty() {
1876            let arr: Vec<JsonValue> = snaps
1877                .iter()
1878                .map(|r| {
1879                    let mut o = Map::new();
1880                    o.insert("id".to_string(), JsonValue::String(r.id.clone()));
1881                    o.insert(
1882                        "last_acked_lsn".to_string(),
1883                        JsonValue::Number(r.last_acked_lsn as f64),
1884                    );
1885                    o.insert(
1886                        "last_sent_lsn".to_string(),
1887                        JsonValue::Number(r.last_sent_lsn as f64),
1888                    );
1889                    o.insert(
1890                        "last_durable_lsn".to_string(),
1891                        JsonValue::Number(r.last_durable_lsn as f64),
1892                    );
1893                    o.insert(
1894                        "last_seen_at_unix_ms".to_string(),
1895                        JsonValue::Number(r.last_seen_at_unix_ms as f64),
1896                    );
1897                    o.insert(
1898                        "lag_records".to_string(),
1899                        JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
1900                    );
1901                    if let Some(region) = &r.region {
1902                        o.insert("region".to_string(), JsonValue::String(region.clone()));
1903                    }
1904                    JsonValue::Object(o)
1905                })
1906                .collect();
1907            replica_obj.insert("primary_view".to_string(), JsonValue::Array(arr));
1908        }
1909        replica_obj.insert(
1910            "commit_policy".to_string(),
1911            JsonValue::String(self.runtime.commit_policy().label().to_string()),
1912        );
1913        // PLAN.md Phase 11.4 — durable-LSN map per replica for
1914        // ack_n debugging. Empty until at least one ack lands.
1915        let durable = self.runtime.commit_waiter_snapshot();
1916        if !durable.is_empty() {
1917            let arr: Vec<JsonValue> = durable
1918                .into_iter()
1919                .map(|(id, lsn)| {
1920                    let mut o = Map::new();
1921                    o.insert("replica_id".to_string(), JsonValue::String(id));
1922                    o.insert("durable_lsn".to_string(), JsonValue::Number(lsn as f64));
1923                    JsonValue::Object(o)
1924                })
1925                .collect();
1926            replica_obj.insert("durable_view".to_string(), JsonValue::Array(arr));
1927        }
1928        object.insert("replica".to_string(), JsonValue::Object(replica_obj));
1929        if let Some(backend) = backend_kind {
1930            object.insert("remote_backend".to_string(), JsonValue::String(backend));
1931        }
1932        // PLAN.md Phase 4.1 — operator-imposed limits surface so
1933        // external dashboards can show headroom alongside usage.
1934        let limits = self.runtime.resource_limits();
1935        let mut limits_obj = Map::new();
1936        if let Some(v) = limits.max_db_size_bytes {
1937            limits_obj.insert("max_db_size_bytes".to_string(), JsonValue::Number(v as f64));
1938        }
1939        if let Some(v) = limits.max_connections {
1940            limits_obj.insert("max_connections".to_string(), JsonValue::Number(v as f64));
1941        }
1942        if let Some(v) = limits.max_qps {
1943            limits_obj.insert("max_qps".to_string(), JsonValue::Number(v as f64));
1944        }
1945        if let Some(v) = limits.max_batch_size {
1946            limits_obj.insert("max_batch_size".to_string(), JsonValue::Number(v as f64));
1947        }
1948        if let Some(v) = limits.max_memory_bytes {
1949            limits_obj.insert("max_memory_bytes".to_string(), JsonValue::Number(v as f64));
1950        }
1951        if let Some(d) = limits.max_query_duration {
1952            limits_obj.insert(
1953                "max_query_duration_ms".to_string(),
1954                JsonValue::Number(d.as_millis() as f64),
1955            );
1956        }
1957        if let Some(v) = limits.max_result_bytes {
1958            limits_obj.insert("max_result_bytes".to_string(), JsonValue::Number(v as f64));
1959        }
1960        object.insert("limits".to_string(), JsonValue::Object(limits_obj));
1961
1962        if let Some(report) = lifecycle.shutdown_report() {
1963            let mut shutdown_obj = Map::new();
1964            shutdown_obj.insert(
1965                "duration_ms".to_string(),
1966                JsonValue::Number(report.duration_ms as f64),
1967            );
1968            shutdown_obj.insert(
1969                "flushed_wal".to_string(),
1970                JsonValue::Bool(report.flushed_wal),
1971            );
1972            shutdown_obj.insert(
1973                "backup_uploaded".to_string(),
1974                JsonValue::Bool(report.backup_uploaded),
1975            );
1976            object.insert("shutdown".to_string(), JsonValue::Object(shutdown_obj));
1977        }
1978        json_response(200, JsonValue::Object(object))
1979    }
1980
1981    /// `POST /admin/drain` — flip to Draining phase. Subsequent
1982    /// `WriteGate`-checked writes will be rejected until shutdown
1983    /// completes or another phase override re-enables Ready.
1984    /// Idempotent.
1985    /// `POST /admin/failover/promote` — manual replica → primary
1986    /// promotion (PLAN.md Phase 11.6).
1987    ///
1988    /// Hard checks before bumping the lease generation:
1989    ///   * Caller is currently a replica (role guard) — primaries
1990    ///     don't promote themselves.
1991    ///   * Remote backend is configured (lease lives there).
1992    ///   * Replica apply health is `ok` — no unresolved WAL gap or
1993    ///     divergence. A replica that's behind cannot become the
1994    ///     authoritative writer.
1995    ///   * Lease can be acquired — `try_acquire` returns success.
1996    ///     Failure surfaces the existing holder so the operator
1997    ///     understands why.
1998    ///
1999    /// Body: `{"holder_id": "...", "ttl_ms": <u64>}`. `holder_id`
2000    /// defaults to `RED_LEASE_HOLDER_ID` env / `<hostname>-<pid>`.
2001    /// `ttl_ms` defaults to 60_000.
2002    ///
2003    /// On success the response includes the new lease's generation
2004    /// and acquired_at. **Promotion does NOT flip the running role
2005    /// to primary** — the operator's runbook is to restart the
2006    /// process with `RED_REPLICATION_MODE=primary` after a
2007    /// successful promotion. Auto-role-flip is a Phase 11.6 follow-
2008    /// up that requires draining live read traffic safely.
2009    pub(crate) fn handle_admin_failover_promote(&self, body: Vec<u8>) -> HttpResponse {
2010        // Role guard.
2011        if !matches!(
2012            self.runtime.write_gate().role(),
2013            crate::replication::ReplicationRole::Replica { .. }
2014        ) {
2015            return json_error(
2016                409,
2017                "promotion only allowed on a replica (current role is not Replica)",
2018            );
2019        }
2020
2021        // Backend guard.
2022        let Some(backend) = self.runtime.db().options().remote_backend_atomic.clone() else {
2023            return json_error(
2024                412,
2025                "promotion requires a CAS-capable remote backend (use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true)",
2026            );
2027        };
2028
2029        // Apply health guard. Anything other than `ok` / `healthy`
2030        // / `connecting` indicates the replica isn't current.
2031        let health = self.runtime.replica_apply_health().unwrap_or_default();
2032        if matches!(
2033            health.as_str(),
2034            "stalled_gap" | "divergence" | "apply_error"
2035        ) {
2036            return json_error(
2037                409,
2038                format!(
2039                    "promotion refused — replica apply state is `{health}`; resolve before promoting"
2040                ),
2041            );
2042        }
2043
2044        // Body parsing.
2045        let (holder_id, ttl_ms) = if body.is_empty() {
2046            (default_holder_id(), 60_000u64)
2047        } else {
2048            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
2049                Ok(v) => {
2050                    let holder = v
2051                        .get("holder_id")
2052                        .and_then(|n| n.as_str())
2053                        .map(|s| s.to_string())
2054                        .unwrap_or_else(default_holder_id);
2055                    let ttl = v
2056                        .get("ttl_ms")
2057                        .and_then(|n| n.as_u64())
2058                        .filter(|t| *t > 0)
2059                        .unwrap_or(60_000);
2060                    (holder, ttl)
2061                }
2062                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
2063            }
2064        };
2065
2066        let database_key = self
2067            .runtime
2068            .db()
2069            .options()
2070            .remote_key
2071            .clone()
2072            .unwrap_or_else(|| "main".to_string());
2073        let store = crate::replication::LeaseStore::new(backend);
2074
2075        match crate::runtime::lease_lifecycle::admin_promote_lease(
2076            &store,
2077            self.runtime.audit_log(),
2078            &database_key,
2079            &holder_id,
2080            ttl_ms,
2081        ) {
2082            Ok(lease) => {
2083                let mut object = Map::new();
2084                object.insert("ok".to_string(), JsonValue::Bool(true));
2085                object.insert("holder_id".to_string(), JsonValue::String(lease.holder_id));
2086                object.insert(
2087                    "generation".to_string(),
2088                    JsonValue::Number(lease.generation as f64),
2089                );
2090                object.insert(
2091                    "acquired_at_ms".to_string(),
2092                    JsonValue::Number(lease.acquired_at_ms as f64),
2093                );
2094                object.insert(
2095                    "expires_at_ms".to_string(),
2096                    JsonValue::Number(lease.expires_at_ms as f64),
2097                );
2098                object.insert(
2099                    "next_step".to_string(),
2100                    JsonValue::String(
2101                        "restart with RED_REPLICATION_MODE=primary to start accepting writes"
2102                            .to_string(),
2103                    ),
2104                );
2105                json_response(200, JsonValue::Object(object))
2106            }
2107            Err(err) => json_error(409, format!("promotion refused: {err}")),
2108        }
2109    }
2110
2111    /// `GET /admin/audit` — structured audit log query for compliance
2112    /// (SOC 2 / HIPAA / ISO 27001). Reads the active `.audit.log`
2113    /// plus rotated `.audit.log.<ms>.zst` archives, applies the
2114    /// query filters, and returns the matching events as a JSON
2115    /// object: `{"count": n, "events": [...]}`.
2116    ///
2117    /// Supported query params:
2118    ///   * `since` / `until` — RFC 3339 (`...Z`) or ms epoch.
2119    ///   * `principal` — exact match (e.g. `alice@acme`).
2120    ///   * `tenant` — exact match.
2121    ///   * `action` — prefix match (e.g. `auth/`, `admin/`).
2122    ///   * `outcome` — `success` / `denied` / `error`.
2123    ///   * `limit` — default 100, max 1000.
2124    ///   * `format` — `json` (default) or `jsonl`.
2125    ///
2126    /// Auth: relies on the `RED_ADMIN_TOKEN` gate already enforced
2127    /// for every `/admin/*` path in `is_authorized`. When that env
2128    /// var is unset the endpoint is open — same posture as every
2129    /// other admin endpoint.
2130    pub(crate) fn handle_admin_audit_query(
2131        &self,
2132        query: &std::collections::BTreeMap<String, String>,
2133    ) -> HttpResponse {
2134        use crate::runtime::audit_log::Outcome;
2135        use crate::runtime::audit_query::{
2136            events_to_json_array, parse_time_arg, run_query, AuditQuery,
2137        };
2138
2139        let mut q = AuditQuery::new();
2140        if let Some(s) = query.get("since") {
2141            q.since_ms = parse_time_arg(s);
2142            if q.since_ms.is_none() {
2143                return json_error(400, format!("invalid 'since' value: {s}"));
2144            }
2145        }
2146        if let Some(u) = query.get("until") {
2147            q.until_ms = parse_time_arg(u);
2148            if q.until_ms.is_none() {
2149                return json_error(400, format!("invalid 'until' value: {u}"));
2150            }
2151        }
2152        if let Some(p) = query.get("principal") {
2153            if !p.is_empty() {
2154                q.principal = Some(p.clone());
2155            }
2156        }
2157        if let Some(t) = query.get("tenant") {
2158            if !t.is_empty() {
2159                q.tenant = Some(t.clone());
2160            }
2161        }
2162        if let Some(a) = query.get("action") {
2163            if !a.is_empty() {
2164                q.action_prefix = Some(a.clone());
2165            }
2166        }
2167        if let Some(o) = query.get("outcome") {
2168            if let Some(parsed) = Outcome::parse(o) {
2169                q.outcome = Some(parsed);
2170            } else {
2171                return json_error(
2172                    400,
2173                    format!("invalid 'outcome' value: {o} (expected success|denied|error)"),
2174                );
2175            }
2176        }
2177        if let Some(l) = query.get("limit") {
2178            match l.parse::<usize>() {
2179                Ok(n) if n > 0 => q.limit = n.min(1000),
2180                _ => return json_error(400, format!("invalid 'limit' value: {l}")),
2181            }
2182        } else {
2183            q.limit = 100;
2184        }
2185
2186        let format = query
2187            .get("format")
2188            .map(|s| s.to_ascii_lowercase())
2189            .unwrap_or_default();
2190
2191        let path = self.runtime.audit_log().path().to_path_buf();
2192        let events = run_query(&path, &q);
2193
2194        if format == "jsonl" || format == "ndjson" {
2195            let mut body = String::new();
2196            for ev in &events {
2197                body.push_str(&ev.to_json_line(None));
2198                body.push('\n');
2199            }
2200            return HttpResponse {
2201                status: 200,
2202                content_type: "application/x-ndjson",
2203                body: body.into_bytes(),
2204                extra_headers: Vec::new(),
2205            };
2206        }
2207
2208        json_response(200, events_to_json_array(&events))
2209    }
2210
2211    pub(crate) fn handle_admin_drain(&self) -> HttpResponse {
2212        self.runtime.lifecycle().mark_draining();
2213        self.runtime.audit_log().record(
2214            "admin/drain",
2215            "operator",
2216            "instance",
2217            "ok",
2218            JsonValue::Null,
2219        );
2220        let mut object = Map::new();
2221        object.insert("ok".to_string(), JsonValue::Bool(true));
2222        object.insert(
2223            "phase".to_string(),
2224            JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
2225        );
2226        json_response(200, JsonValue::Object(object))
2227    }
2228
2229    /// `GET /health/live` — process is alive and responsive. Always
2230    /// 200 once the runtime is constructed; 503 only after Stopped.
2231    /// Never touches I/O.
2232    pub(crate) fn handle_health_live(&self) -> HttpResponse {
2233        let phase = self.runtime.lifecycle().phase();
2234        let alive = !matches!(phase, Phase::Stopped);
2235        let status = if alive { 200 } else { 503 };
2236        let mut object = Map::new();
2237        object.insert(
2238            "status".to_string(),
2239            JsonValue::String(if alive { "alive" } else { "stopped" }.to_string()),
2240        );
2241        object.insert(
2242            "phase".to_string(),
2243            JsonValue::String(phase.as_str().to_string()),
2244        );
2245        json_response(status, JsonValue::Object(object))
2246    }
2247
2248    /// `GET /health/ready` — runtime is fully past WAL replay /
2249    /// restore-from-remote and accepts queries.
2250    pub(crate) fn handle_health_ready(&self) -> HttpResponse {
2251        self.health_ready_response("ready")
2252    }
2253
2254    /// `GET /health/startup` — Kubernetes startup probe variant.
2255    /// Same readiness logic as `/health/ready`; orchestrator gives
2256    /// it a longer grace window before failing the pod.
2257    pub(crate) fn handle_health_startup(&self) -> HttpResponse {
2258        self.health_ready_response("startup")
2259    }
2260
2261    fn health_ready_response(&self, probe: &str) -> HttpResponse {
2262        let lifecycle = self.runtime.lifecycle();
2263        let phase = lifecycle.phase();
2264        let now = std::time::SystemTime::now()
2265            .duration_since(std::time::UNIX_EPOCH)
2266            .map(|d| d.as_millis() as u64)
2267            .unwrap_or(0);
2268        let started_at = lifecycle.started_at_ms();
2269        let since_secs = (now.saturating_sub(started_at) as f64) / 1000.0;
2270        let mut object = Map::new();
2271        object.insert("probe".to_string(), JsonValue::String(probe.to_string()));
2272        object.insert(
2273            "transport_listeners".to_string(),
2274            self.transport_readiness_json(),
2275        );
2276        object.insert(
2277            "phase".to_string(),
2278            JsonValue::String(phase.as_str().to_string()),
2279        );
2280        object.insert(
2281            "since_secs".to_string(),
2282            JsonValue::Number((since_secs * 1000.0).round() / 1000.0),
2283        );
2284        if let Some(ready_at) = lifecycle.ready_at_ms() {
2285            object.insert(
2286                "ready_at_unix_ms".to_string(),
2287                JsonValue::Number(ready_at as f64),
2288            );
2289        }
2290
2291        if phase.accepts_queries() {
2292            object.insert("status".to_string(), JsonValue::String("ready".to_string()));
2293            json_response(200, JsonValue::Object(object))
2294        } else {
2295            object.insert(
2296                "status".to_string(),
2297                JsonValue::String(phase.as_str().to_string()),
2298            );
2299            if let Some(reason) = lifecycle.not_ready_reason() {
2300                object.insert("reason".to_string(), JsonValue::String(reason));
2301            } else {
2302                object.insert(
2303                    "reason".to_string(),
2304                    JsonValue::String(match phase {
2305                        Phase::Starting => "starting".to_string(),
2306                        Phase::ShuttingDown => "shutting_down".to_string(),
2307                        Phase::Stopped => "stopped".to_string(),
2308                        Phase::Draining => "draining".to_string(),
2309                        Phase::Ready => "ready".to_string(),
2310                    }),
2311                );
2312            }
2313            json_response(503, JsonValue::Object(object))
2314        }
2315    }
2316
2317    // -----------------------------------------------------------------
2318    // IAM policy admin endpoints
2319    // -----------------------------------------------------------------
2320
2321    fn iam_audit(&self, action: &str, target: &str, outcome: &str) {
2322        self.runtime
2323            .audit_log()
2324            .record(action, "operator", target, outcome, JsonValue::Null);
2325    }
2326
2327    /// `PUT /admin/policies/:id` — install or replace an IAM policy.
2328    pub(crate) fn handle_iam_policy_put(&self, id: &str, body: Vec<u8>) -> HttpResponse {
2329        let Some(store) = self.auth_store.as_ref() else {
2330            return json_error(503, "auth store not configured");
2331        };
2332        let Ok(text) = std::str::from_utf8(&body) else {
2333            return json_error(400, "body must be utf-8 JSON");
2334        };
2335        let mut policy = match crate::auth::policies::Policy::from_json_str(text) {
2336            Ok(p) => p,
2337            Err(e) => return json_error(400, format!("policy parse: {e}")),
2338        };
2339        if policy.id != id {
2340            policy.id = id.to_string();
2341        }
2342        if let Err(e) = store.put_policy(policy) {
2343            return json_error(400, e.to_string());
2344        }
2345        self.runtime.invalidate_result_cache();
2346        self.iam_audit("iam/policy.put", id, "ok");
2347        let mut obj = Map::new();
2348        obj.insert("ok".to_string(), JsonValue::Bool(true));
2349        obj.insert("id".to_string(), JsonValue::String(id.to_string()));
2350        json_response(200, JsonValue::Object(obj))
2351    }
2352
2353    /// `GET /admin/policies/:id` — fetch a single policy as JSON.
2354    pub(crate) fn handle_iam_policy_get(&self, id: &str) -> HttpResponse {
2355        let Some(store) = self.auth_store.as_ref() else {
2356            return json_error(503, "auth store not configured");
2357        };
2358        let Some(p) = store.get_policy(id) else {
2359            return json_error(404, format!("policy `{id}` not found"));
2360        };
2361        let body = p.to_json_string();
2362        HttpResponse {
2363            status: 200,
2364            content_type: "application/json",
2365            body: body.into_bytes(),
2366            extra_headers: Vec::new(),
2367        }
2368    }
2369
2370    /// `GET /admin/policies` — list policies (id-sorted summary).
2371    pub(crate) fn handle_iam_policy_list(&self) -> HttpResponse {
2372        let Some(store) = self.auth_store.as_ref() else {
2373            return json_error(503, "auth store not configured");
2374        };
2375        let pols = store.list_policies();
2376        let items: Vec<JsonValue> = pols
2377            .iter()
2378            .map(|p| {
2379                let mut obj = Map::new();
2380                obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2381                obj.insert("version".to_string(), JsonValue::Number(p.version as f64));
2382                obj.insert(
2383                    "statements".to_string(),
2384                    JsonValue::Number(p.statements.len() as f64),
2385                );
2386                obj.insert(
2387                    "tenant".to_string(),
2388                    p.tenant
2389                        .as_deref()
2390                        .map(|t| JsonValue::String(t.to_string()))
2391                        .unwrap_or(JsonValue::Null),
2392                );
2393                JsonValue::Object(obj)
2394            })
2395            .collect();
2396        let mut envelope = Map::new();
2397        envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2398        envelope.insert("items".to_string(), JsonValue::Array(items));
2399        json_response(200, JsonValue::Object(envelope))
2400    }
2401
2402    /// `DELETE /admin/policies/:id` — drop a policy.
2403    pub(crate) fn handle_iam_policy_delete(&self, id: &str) -> HttpResponse {
2404        let Some(store) = self.auth_store.as_ref() else {
2405            return json_error(503, "auth store not configured");
2406        };
2407        match store.delete_policy(id) {
2408            Ok(()) => {
2409                self.runtime.invalidate_result_cache();
2410                self.iam_audit("iam/policy.drop", id, "ok");
2411                HttpResponse {
2412                    status: 204,
2413                    content_type: "application/json",
2414                    body: Vec::new(),
2415                    extra_headers: Vec::new(),
2416                }
2417            }
2418            Err(e) => json_error(404, e.to_string()),
2419        }
2420    }
2421
2422    /// `PUT /admin/users/:user/policies/:policy_id`. `:user` may
2423    /// optionally be tenant-qualified as `tenant.username`.
2424    pub(crate) fn handle_iam_attach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2425        let Some(store) = self.auth_store.as_ref() else {
2426            return json_error(503, "auth store not configured");
2427        };
2428        let uid = decode_user_arg(user);
2429        match store.attach_policy(
2430            crate::auth::store::PrincipalRef::User(uid.clone()),
2431            policy_id,
2432        ) {
2433            Ok(()) => {
2434                self.runtime.invalidate_result_cache();
2435                self.iam_audit(
2436                    "iam/policy.attach",
2437                    &format!("user:{uid}::{policy_id}"),
2438                    "ok",
2439                );
2440                let mut obj = Map::new();
2441                obj.insert("ok".to_string(), JsonValue::Bool(true));
2442                json_response(200, JsonValue::Object(obj))
2443            }
2444            Err(e) => json_error(400, e.to_string()),
2445        }
2446    }
2447
2448    /// `DELETE /admin/users/:user/policies/:policy_id`.
2449    pub(crate) fn handle_iam_detach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2450        let Some(store) = self.auth_store.as_ref() else {
2451            return json_error(503, "auth store not configured");
2452        };
2453        let uid = decode_user_arg(user);
2454        match store.detach_policy(
2455            crate::auth::store::PrincipalRef::User(uid.clone()),
2456            policy_id,
2457        ) {
2458            Ok(()) => {
2459                self.runtime.invalidate_result_cache();
2460                self.iam_audit(
2461                    "iam/policy.detach",
2462                    &format!("user:{uid}::{policy_id}"),
2463                    "ok",
2464                );
2465                HttpResponse {
2466                    status: 204,
2467                    content_type: "application/json",
2468                    body: Vec::new(),
2469                    extra_headers: Vec::new(),
2470                }
2471            }
2472            Err(e) => json_error(400, e.to_string()),
2473        }
2474    }
2475
2476    /// `PUT /admin/users/:user/groups/:group`.
2477    pub(crate) fn handle_iam_add_user_group(&self, user: &str, group: &str) -> HttpResponse {
2478        let Some(store) = self.auth_store.as_ref() else {
2479            return json_error(503, "auth store not configured");
2480        };
2481        let uid = decode_user_arg(user);
2482        match store.add_user_to_group(&uid, group) {
2483            Ok(()) => {
2484                self.runtime.invalidate_result_cache();
2485                self.iam_audit("iam/group.add", &format!("user:{uid}::group:{group}"), "ok");
2486                let mut obj = Map::new();
2487                obj.insert("ok".to_string(), JsonValue::Bool(true));
2488                json_response(200, JsonValue::Object(obj))
2489            }
2490            Err(e) => json_error(400, e.to_string()),
2491        }
2492    }
2493
2494    /// `DELETE /admin/users/:user/groups/:group`.
2495    pub(crate) fn handle_iam_remove_user_group(&self, user: &str, group: &str) -> HttpResponse {
2496        let Some(store) = self.auth_store.as_ref() else {
2497            return json_error(503, "auth store not configured");
2498        };
2499        let uid = decode_user_arg(user);
2500        match store.remove_user_from_group(&uid, group) {
2501            Ok(()) => {
2502                self.runtime.invalidate_result_cache();
2503                self.iam_audit(
2504                    "iam/group.remove",
2505                    &format!("user:{uid}::group:{group}"),
2506                    "ok",
2507                );
2508                HttpResponse {
2509                    status: 204,
2510                    content_type: "application/json",
2511                    body: Vec::new(),
2512                    extra_headers: Vec::new(),
2513                }
2514            }
2515            Err(e) => json_error(400, e.to_string()),
2516        }
2517    }
2518
2519    /// `PUT /admin/groups/:group/policies/:policy_id`.
2520    pub(crate) fn handle_iam_attach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2521        let Some(store) = self.auth_store.as_ref() else {
2522            return json_error(503, "auth store not configured");
2523        };
2524        match store.attach_policy(
2525            crate::auth::store::PrincipalRef::Group(group.to_string()),
2526            policy_id,
2527        ) {
2528            Ok(()) => {
2529                self.runtime.invalidate_result_cache();
2530                self.iam_audit(
2531                    "iam/policy.attach",
2532                    &format!("group:{group}::{policy_id}"),
2533                    "ok",
2534                );
2535                let mut obj = Map::new();
2536                obj.insert("ok".to_string(), JsonValue::Bool(true));
2537                json_response(200, JsonValue::Object(obj))
2538            }
2539            Err(e) => json_error(400, e.to_string()),
2540        }
2541    }
2542
2543    /// `DELETE /admin/groups/:group/policies/:policy_id`.
2544    pub(crate) fn handle_iam_detach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2545        let Some(store) = self.auth_store.as_ref() else {
2546            return json_error(503, "auth store not configured");
2547        };
2548        match store.detach_policy(
2549            crate::auth::store::PrincipalRef::Group(group.to_string()),
2550            policy_id,
2551        ) {
2552            Ok(()) => {
2553                self.runtime.invalidate_result_cache();
2554                self.iam_audit(
2555                    "iam/policy.detach",
2556                    &format!("group:{group}::{policy_id}"),
2557                    "ok",
2558                );
2559                HttpResponse {
2560                    status: 204,
2561                    content_type: "application/json",
2562                    body: Vec::new(),
2563                    extra_headers: Vec::new(),
2564                }
2565            }
2566            Err(e) => json_error(400, e.to_string()),
2567        }
2568    }
2569
2570    /// `GET /admin/users/:user/effective-permissions[?resource=kind:name]`.
2571    pub(crate) fn handle_iam_effective_permissions(
2572        &self,
2573        user: &str,
2574        query: &std::collections::BTreeMap<String, String>,
2575    ) -> HttpResponse {
2576        let Some(store) = self.auth_store.as_ref() else {
2577            return json_error(503, "auth store not configured");
2578        };
2579        let uid = decode_user_arg(user);
2580        let pols = store.effective_policies(&uid);
2581
2582        // Build a JSON array of policy summaries scoped to the user.
2583        // The optional `resource` query string parameter is parsed but
2584        // currently only echoed back — fine-grained matching falls
2585        // through to `simulate`.
2586        let resource_echo = query.get("resource").cloned();
2587        let items: Vec<JsonValue> = pols
2588            .iter()
2589            .map(|p| {
2590                let mut obj = Map::new();
2591                obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2592                obj.insert(
2593                    "statements".to_string(),
2594                    JsonValue::Number(p.statements.len() as f64),
2595                );
2596                JsonValue::Object(obj)
2597            })
2598            .collect();
2599        let mut envelope = Map::new();
2600        envelope.insert("user".to_string(), JsonValue::String(uid.to_string()));
2601        if let Some(r) = resource_echo {
2602            envelope.insert("resource".to_string(), JsonValue::String(r));
2603        }
2604        envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2605        envelope.insert("policies".to_string(), JsonValue::Array(items));
2606        json_response(200, JsonValue::Object(envelope))
2607    }
2608
2609    /// `POST /admin/policies/simulate` —
2610    /// body: `{principal, action, resource: {kind, name, tenant?}, ctx?}`.
2611    pub(crate) fn handle_iam_simulate(&self, body: Vec<u8>) -> HttpResponse {
2612        let Some(store) = self.auth_store.as_ref() else {
2613            return json_error(503, "auth store not configured");
2614        };
2615        let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
2616            std::str::from_utf8(&body).unwrap_or(""),
2617        ) {
2618            Ok(v) => v,
2619            Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
2620        };
2621        let obj = match parsed.as_object() {
2622            Some(o) => o,
2623            None => return json_error(400, "body must be a JSON object"),
2624        };
2625        let principal = match obj.get("principal").and_then(|v| v.as_str()) {
2626            Some(s) => decode_user_arg(s),
2627            None => return json_error(400, "missing `principal`"),
2628        };
2629        let action = match obj.get("action").and_then(|v| v.as_str()) {
2630            Some(s) => s.to_string(),
2631            None => return json_error(400, "missing `action`"),
2632        };
2633        let resource = match obj.get("resource") {
2634            Some(JsonValue::Object(r)) => {
2635                let kind = r
2636                    .get("kind")
2637                    .and_then(|v| v.as_str())
2638                    .unwrap_or("")
2639                    .to_string();
2640                let name = r
2641                    .get("name")
2642                    .and_then(|v| v.as_str())
2643                    .unwrap_or("")
2644                    .to_string();
2645                if kind.is_empty() || name.is_empty() {
2646                    return json_error(400, "resource needs kind+name");
2647                }
2648                let mut rr = crate::auth::policies::ResourceRef::new(kind, name);
2649                if let Some(t) = r.get("tenant").and_then(|v| v.as_str()) {
2650                    rr = rr.with_tenant(t.to_string());
2651                }
2652                rr
2653            }
2654            Some(JsonValue::String(s)) => match s.split_once(':') {
2655                Some((k, n)) => crate::auth::policies::ResourceRef::new(k, n),
2656                None => return json_error(400, "resource string must be `kind:name`"),
2657            },
2658            _ => return json_error(400, "missing `resource`"),
2659        };
2660        let mut sim_ctx = crate::auth::store::SimCtx::default();
2661        if let Some(c) = obj.get("ctx").and_then(|v| v.as_object()) {
2662            if let Some(t) = c.get("current_tenant").and_then(|v| v.as_str()) {
2663                sim_ctx.current_tenant = Some(t.to_string());
2664            }
2665            if let Some(true) = c.get("mfa").and_then(|v| v.as_bool()) {
2666                sim_ctx.mfa_present = true;
2667            }
2668            if let Some(ip) = c
2669                .get("source_ip")
2670                .or_else(|| c.get("peer_ip"))
2671                .and_then(|v| v.as_str())
2672            {
2673                if let Ok(addr) = ip.parse() {
2674                    sim_ctx.peer_ip = Some(addr);
2675                }
2676            }
2677            if let Some(ms) = c.get("now_ms").and_then(|v| v.as_u64()) {
2678                sim_ctx.now_ms = Some(ms as u128);
2679            }
2680        }
2681        let outcome = store.simulate(&principal, &action, &resource, sim_ctx);
2682        let (decision_str, matched_pid, matched_sid) =
2683            crate::runtime::impl_core::decision_to_strings(&outcome.decision);
2684
2685        self.iam_audit("iam/policy.simulate", &principal.to_string(), &decision_str);
2686
2687        let mut envelope = Map::new();
2688        envelope.insert("decision".to_string(), JsonValue::String(decision_str));
2689        envelope.insert(
2690            "matched_policy_id".to_string(),
2691            matched_pid
2692                .map(JsonValue::String)
2693                .unwrap_or(JsonValue::Null),
2694        );
2695        envelope.insert(
2696            "matched_sid".to_string(),
2697            matched_sid
2698                .map(JsonValue::String)
2699                .unwrap_or(JsonValue::Null),
2700        );
2701        envelope.insert("reason".to_string(), JsonValue::String(outcome.reason));
2702        let trail: Vec<JsonValue> = outcome
2703            .trail
2704            .into_iter()
2705            .map(|t| {
2706                let mut obj = Map::new();
2707                obj.insert("policy_id".to_string(), JsonValue::String(t.policy_id));
2708                obj.insert(
2709                    "sid".to_string(),
2710                    t.sid.map(JsonValue::String).unwrap_or(JsonValue::Null),
2711                );
2712                obj.insert("matched".to_string(), JsonValue::Bool(t.matched));
2713                obj.insert(
2714                    "effect".to_string(),
2715                    JsonValue::String(
2716                        match t.effect {
2717                            crate::auth::policies::Effect::Allow => "allow",
2718                            crate::auth::policies::Effect::Deny => "deny",
2719                        }
2720                        .to_string(),
2721                    ),
2722                );
2723                obj.insert(
2724                    "why_skipped".to_string(),
2725                    t.why_skipped
2726                        .map(|s| JsonValue::String(s.to_string()))
2727                        .unwrap_or(JsonValue::Null),
2728                );
2729                JsonValue::Object(obj)
2730            })
2731            .collect();
2732        envelope.insert("trail".to_string(), JsonValue::Array(trail));
2733        json_response(200, JsonValue::Object(envelope))
2734    }
2735}
2736
2737fn decode_user_arg(raw: &str) -> crate::auth::UserId {
2738    // Accepts `username` (platform tenant), `tenant.username` or
2739    // `tenant/username` to align with the SQL path / display form.
2740    if let Some((tenant, name)) = raw.split_once('/') {
2741        return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2742    }
2743    if let Some((tenant, name)) = raw.split_once('.') {
2744        return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2745    }
2746    crate::auth::UserId::platform(raw.to_string())
2747}
2748
2749#[cfg(test)]
2750mod tests {
2751    use super::*;
2752
2753    #[test]
2754    fn metrics_expose_result_blob_cache_label_set() {
2755        let runtime =
2756            crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2757                .expect("runtime");
2758        runtime
2759            .db()
2760            .store()
2761            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
2762
2763        runtime.execute_query("SELECT 1").expect("populate miss");
2764        runtime.execute_query("SELECT 1").expect("blob hit");
2765        runtime.invalidate_result_cache();
2766
2767        let server = RedDBServer::new(runtime);
2768        let response = server.handle_metrics();
2769        let body = String::from_utf8(response.body).expect("utf8 metrics");
2770
2771        for needle in [
2772            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l1\"}",
2773            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l2\"}",
2774            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"miss\"}",
2775            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"ok\"}",
2776            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"version_mismatch\"}",
2777            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"too_large\"}",
2778            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"metadata_too_large\"}",
2779            "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"dependency\"}",
2780            "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"namespace\"}",
2781            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"capacity\"}",
2782            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"expiry\"}",
2783            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"policy\"}",
2784            "reddb_cache_blob_l1_bytes_in_use{namespace=\"runtime.result_cache\"}",
2785            "reddb_cache_blob_l1_entries{namespace=\"runtime.result_cache\"}",
2786            "reddb_cache_blob_l2_bytes_in_use{namespace=\"runtime.result_cache\"}",
2787            "reddb_cache_blob_l2_full_rejections_total{namespace=\"runtime.result_cache\"}",
2788            "reddb_cache_blob_version_mismatch_total{namespace=\"runtime.result_cache\"}",
2789        ] {
2790            assert!(body.contains(needle), "missing metric line for {needle}");
2791        }
2792    }
2793
2794    // -------------------------------------------------------------------
2795    // Issue #148 — Blob Cache admin endpoints (smoke + adversarial input)
2796    // -------------------------------------------------------------------
2797
2798    fn test_server() -> RedDBServer {
2799        let runtime =
2800            crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2801                .expect("runtime");
2802        RedDBServer::new(runtime)
2803    }
2804
2805    fn parse_body(resp: &HttpResponse) -> JsonValue {
2806        let s = std::str::from_utf8(&resp.body).expect("utf8 body");
2807        crate::serde_json::from_str::<JsonValue>(s).expect("JSON body")
2808    }
2809
2810    #[test]
2811    fn admin_blob_cache_sweep_happy_path_returns_well_formed_report() {
2812        let server = test_server();
2813        let body = br#"{"limit_entries": 100, "limit_millis": 50}"#.to_vec();
2814        let resp = server.handle_admin_blob_cache_sweep(body);
2815        assert_eq!(resp.status, 200);
2816        let parsed = parse_body(&resp);
2817        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2818        // Sweeper today is bounded scaffolding; report must still be
2819        // well-formed with all expected fields present.
2820        for field in [
2821            "entries_scanned",
2822            "entries_evicted",
2823            "bytes_reclaimed",
2824            "elapsed_ms",
2825            "truncated_due_to_limit",
2826        ] {
2827            assert!(
2828                parsed.get(field).is_some(),
2829                "missing field {field} in response: {parsed:?}"
2830            );
2831        }
2832    }
2833
2834    #[test]
2835    fn admin_blob_cache_sweep_empty_body_uses_unbounded_default() {
2836        let server = test_server();
2837        let resp = server.handle_admin_blob_cache_sweep(Vec::new());
2838        assert_eq!(resp.status, 200);
2839        let parsed = parse_body(&resp);
2840        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2841    }
2842
2843    #[test]
2844    fn admin_blob_cache_sweep_invalid_json_returns_400() {
2845        let server = test_server();
2846        let resp = server.handle_admin_blob_cache_sweep(b"not json".to_vec());
2847        assert_eq!(resp.status, 400);
2848    }
2849
2850    #[test]
2851    fn admin_blob_cache_flush_namespace_happy_path() {
2852        let server = test_server();
2853        let body = br#"{"namespace": "tenant-42:results"}"#.to_vec();
2854        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2855        assert_eq!(resp.status, 200);
2856        let parsed = parse_body(&resp);
2857        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2858        assert_eq!(
2859            parsed.get("namespace").and_then(|v| v.as_str()),
2860            Some("tenant-42:results")
2861        );
2862        assert!(parsed.get("elapsed_micros").is_some());
2863        assert!(parsed.get("generation_before").is_some());
2864        assert!(parsed.get("generation_after").is_some());
2865    }
2866
2867    #[test]
2868    fn admin_blob_cache_flush_namespace_missing_body_returns_400() {
2869        let server = test_server();
2870        let resp = server.handle_admin_blob_cache_flush_namespace(Vec::new());
2871        assert_eq!(resp.status, 400);
2872        let parsed = parse_body(&resp);
2873        assert!(parsed
2874            .get("error")
2875            .and_then(|v| v.as_str())
2876            .map(|s| s.contains("namespace"))
2877            .unwrap_or(false));
2878    }
2879
2880    #[test]
2881    fn admin_blob_cache_flush_namespace_missing_field_returns_400() {
2882        let server = test_server();
2883        let body = br#"{"other": "x"}"#.to_vec();
2884        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2885        assert_eq!(resp.status, 400);
2886    }
2887
2888    #[test]
2889    fn admin_blob_cache_flush_namespace_empty_string_returns_400() {
2890        let server = test_server();
2891        let body = br#"{"namespace": ""}"#.to_vec();
2892        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2893        assert_eq!(resp.status, 400);
2894    }
2895
2896    #[test]
2897    fn admin_blob_cache_flush_namespace_rejects_crlf_smuggling_attempt() {
2898        let server = test_server();
2899        // Classic CRLF smuggling shape — the namespace tries to splice
2900        // a fake audit line into structured logs.
2901        let body = br#"{"namespace": "real-ns\r\nfake-audit: spliced"}"#.to_vec();
2902        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2903        assert_eq!(resp.status, 400);
2904        let parsed = parse_body(&resp);
2905        let msg = parsed
2906            .get("error")
2907            .and_then(|v| v.as_str())
2908            .unwrap_or_default();
2909        assert!(msg.contains("CR/LF"), "unexpected error: {msg}");
2910    }
2911
2912    #[test]
2913    fn admin_blob_cache_flush_namespace_rejects_nul_byte() {
2914        let server = test_server();
2915        // JSON `` decodes to a literal NUL byte after parse;
2916        // the guard must reject it (NUL truncates downstream sinks
2917        // like proxies and log shippers). Build the body with a
2918        // string literal so the source file contains no raw NUL.
2919        let body = b"{\"namespace\": \"with-nul-\\u0000-here\"}".to_vec();
2920        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2921        assert_eq!(resp.status, 400);
2922        let parsed = parse_body(&resp);
2923        let msg = parsed
2924            .get("error")
2925            .and_then(|v| v.as_str())
2926            .unwrap_or_default();
2927        assert!(msg.contains("NUL"), "unexpected error: {msg}");
2928    }
2929
2930    #[test]
2931    fn admin_blob_cache_flush_namespace_response_round_trips_unicode() {
2932        let server = test_server();
2933        let body = r#"{"namespace": "日本語-ns-🦀"}"#.as_bytes().to_vec();
2934        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2935        assert_eq!(resp.status, 200);
2936        let parsed = parse_body(&resp);
2937        assert_eq!(
2938            parsed.get("namespace").and_then(|v| v.as_str()),
2939            Some("日本語-ns-🦀")
2940        );
2941    }
2942
2943    // -------------------------------------------------------------------
2944    // Issue #195 — compare-and-set endpoint tests
2945    // -------------------------------------------------------------------
2946
2947    fn cas_body(namespace: &str, key: &str, new_value: &[u8], new_version: u64) -> Vec<u8> {
2948        let b64 = {
2949            let mut s = String::new();
2950            for chunk in new_value.chunks(3) {
2951                const CHARS: &[u8] =
2952                    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
2953                let b0 = chunk[0] as u32;
2954                let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
2955                let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
2956                let n = (b0 << 16) | (b1 << 8) | b2;
2957                s.push(CHARS[((n >> 18) & 63) as usize] as char);
2958                s.push(CHARS[((n >> 12) & 63) as usize] as char);
2959                s.push(if chunk.len() > 1 {
2960                    CHARS[((n >> 6) & 63) as usize] as char
2961                } else {
2962                    '='
2963                });
2964                s.push(if chunk.len() > 2 {
2965                    CHARS[(n & 63) as usize] as char
2966                } else {
2967                    '='
2968                });
2969            }
2970            s
2971        };
2972        format!(
2973            r#"{{"namespace":"{namespace}","key":"{key}","expected_version":0,"new_value_b64":"{b64}","new_version":{new_version}}}"#
2974        )
2975        .into_bytes()
2976    }
2977
2978    #[test]
2979    fn cas_happy_first_write() {
2980        let server = test_server();
2981        let body = cas_body("ns1", "k1", b"hello", 1);
2982        let resp = server.handle_admin_blob_cache_compare_and_set(body);
2983        assert_eq!(resp.status, 200);
2984        let parsed = parse_body(&resp);
2985        assert_eq!(
2986            parsed.get("committed").and_then(|v| v.as_bool()),
2987            Some(true)
2988        );
2989        assert_eq!(
2990            parsed.get("current_version").and_then(|v| v.as_u64()),
2991            Some(1)
2992        );
2993    }
2994
2995    #[test]
2996    fn cas_happy_update_increments_version() {
2997        let server = test_server();
2998        // First write at version 1.
2999        server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v1", 1));
3000        // Update to version 2 — existing (1) < new_version (2) → ok.
3001        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v2", 2));
3002        assert_eq!(resp.status, 200);
3003        let parsed = parse_body(&resp);
3004        assert_eq!(
3005            parsed.get("committed").and_then(|v| v.as_bool()),
3006            Some(true)
3007        );
3008        assert_eq!(
3009            parsed.get("current_version").and_then(|v| v.as_u64()),
3010            Some(2)
3011        );
3012    }
3013
3014    #[test]
3015    fn cas_conflict_same_version_returns_409() {
3016        let server = test_server();
3017        // Write version 5.
3018        server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v1", 5));
3019        // Try to write version 5 again — existing (5) >= new_version (5) → conflict.
3020        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v2", 5));
3021        assert_eq!(resp.status, 409);
3022        let parsed = parse_body(&resp);
3023        assert_eq!(
3024            parsed.get("committed").and_then(|v| v.as_bool()),
3025            Some(false)
3026        );
3027        assert_eq!(
3028            parsed.get("reason").and_then(|v| v.as_str()),
3029            Some("VersionMismatch")
3030        );
3031    }
3032
3033    #[test]
3034    fn cas_stale_expected_version_returns_409() {
3035        let server = test_server();
3036        // Write version 10.
3037        server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v1", 10));
3038        // Try version 9 (going backwards) — existing (10) >= new_version (9) → conflict.
3039        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v2", 9));
3040        assert_eq!(resp.status, 409);
3041        let parsed = parse_body(&resp);
3042        assert_eq!(
3043            parsed.get("current_version").and_then(|v| v.as_u64()),
3044            Some(10)
3045        );
3046    }
3047
3048    #[test]
3049    fn cas_crlf_in_namespace_returns_400() {
3050        let server = test_server();
3051        // Embed CRLF via JSON unicode escapes.
3052        let body = b"{\"namespace\":\"real\\r\\ninjected\",\"key\":\"k\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
3053        let resp = server.handle_admin_blob_cache_compare_and_set(body);
3054        assert_eq!(resp.status, 400);
3055        let parsed = parse_body(&resp);
3056        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3057        assert!(msg.contains("CR/LF"), "expected CR/LF error, got: {msg}");
3058    }
3059
3060    #[test]
3061    fn cas_nul_in_key_returns_400() {
3062        let server = test_server();
3063        let body = b"{\"namespace\":\"ns\",\"key\":\"k\\u0000nul\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
3064        let resp = server.handle_admin_blob_cache_compare_and_set(body);
3065        assert_eq!(resp.status, 400);
3066        let parsed = parse_body(&resp);
3067        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3068        assert!(msg.contains("NUL"), "expected NUL error, got: {msg}");
3069    }
3070
3071    #[test]
3072    fn cas_bad_base64_returns_400() {
3073        let server = test_server();
3074        let body = br#"{"namespace":"ns","key":"k","expected_version":0,"new_value_b64":"!!!invalid!!!","new_version":1}"#.to_vec();
3075        let resp = server.handle_admin_blob_cache_compare_and_set(body);
3076        assert_eq!(resp.status, 400);
3077        let parsed = parse_body(&resp);
3078        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3079        assert!(msg.contains("base64"), "expected base64 error, got: {msg}");
3080    }
3081
3082    #[test]
3083    fn cas_missing_bearer_returns_401_via_route() {
3084        use std::sync::Mutex;
3085        static GUARD: Mutex<()> = Mutex::new(());
3086        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3087
3088        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3089        unsafe {
3090            std::env::set_var("RED_ADMIN_TOKEN", "test-token-195");
3091        }
3092
3093        let server = test_server();
3094        let req = crate::server::transport::HttpRequest {
3095            method: "POST".to_string(),
3096            path: "/admin/cache/compare-and-set".to_string(),
3097            query: std::collections::BTreeMap::new(),
3098            headers: std::collections::BTreeMap::new(),
3099            body: cas_body("ns", "k", b"v", 1),
3100        };
3101        let resp = server.route(req);
3102        assert_eq!(resp.status, 401);
3103
3104        unsafe {
3105            match prev {
3106                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3107                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3108            }
3109        }
3110    }
3111
3112    #[test]
3113    fn cas_wrong_bearer_returns_401_via_route() {
3114        use std::sync::Mutex;
3115        static GUARD: Mutex<()> = Mutex::new(());
3116        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3117
3118        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3119        unsafe {
3120            std::env::set_var("RED_ADMIN_TOKEN", "correct-token");
3121        }
3122
3123        let server = test_server();
3124        let mut headers = std::collections::BTreeMap::new();
3125        headers.insert(
3126            "authorization".to_string(),
3127            "Bearer wrong-token".to_string(),
3128        );
3129        let req = crate::server::transport::HttpRequest {
3130            method: "POST".to_string(),
3131            path: "/admin/cache/compare-and-set".to_string(),
3132            query: std::collections::BTreeMap::new(),
3133            headers,
3134            body: cas_body("ns", "k", b"v", 1),
3135        };
3136        let resp = server.route(req);
3137        assert_eq!(resp.status, 401);
3138
3139        unsafe {
3140            match prev {
3141                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3142                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3143            }
3144        }
3145    }
3146
3147    #[test]
3148    fn cas_concurrent_race_exactly_one_commits() {
3149        use std::sync::{Arc, Mutex};
3150
3151        // RedDBServer may not be Sync, so we protect it with a Mutex and share
3152        // across threads. The BlobCache's check_version runs under a shard write
3153        // lock, so even serialised calls exercise the version-monotonicity guard.
3154        let server = Arc::new(Mutex::new(test_server()));
3155        let committed = Arc::new(Mutex::new(0u32));
3156        let conflicted = Arc::new(Mutex::new(0u32));
3157
3158        let handles: Vec<_> = (0..8)
3159            .map(|_| {
3160                let server = Arc::clone(&server);
3161                let committed = Arc::clone(&committed);
3162                let conflicted = Arc::clone(&conflicted);
3163                std::thread::spawn(move || {
3164                    // All threads try to write version 1 to the same key.
3165                    let body = cas_body("race-ns", "race-key", b"payload", 1);
3166                    let resp = {
3167                        let s = server.lock().unwrap();
3168                        s.handle_admin_blob_cache_compare_and_set(body)
3169                    };
3170                    match resp.status {
3171                        200 => *committed.lock().unwrap() += 1,
3172                        409 => *conflicted.lock().unwrap() += 1,
3173                        s => panic!("unexpected status {s}"),
3174                    }
3175                })
3176            })
3177            .collect();
3178
3179        for h in handles {
3180            h.join().expect("thread panicked");
3181        }
3182
3183        assert_eq!(
3184            *committed.lock().unwrap(),
3185            1,
3186            "exactly one CAS should commit (version 1 can only be written once)"
3187        );
3188    }
3189
3190    // -------------------------------------------------------------------
3191    // Routing-layer auth gate: when RED_ADMIN_TOKEN is set the routes
3192    // must reject unauthenticated requests with 401. We exercise the
3193    // route() entrypoint, not the handler directly, so the gate (which
3194    // lives in is_authorized()) is on the path.
3195    // -------------------------------------------------------------------
3196
3197    #[test]
3198    fn admin_blob_cache_routes_reject_unauth_when_admin_token_set() {
3199        // Serialize on a per-process mutex because RED_ADMIN_TOKEN is a
3200        // process-wide env var; running other admin-auth tests in
3201        // parallel would race the unset/set sequence.
3202        use std::sync::Mutex;
3203        static GUARD: Mutex<()> = Mutex::new(());
3204        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3205
3206        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3207        // SAFETY: env mutation is unsafe in 2024; we serialize via
3208        // GUARD above and restore the previous value at the end.
3209        unsafe {
3210            std::env::set_var("RED_ADMIN_TOKEN", "test-token-148");
3211        }
3212
3213        let server = test_server();
3214
3215        // Sweep without auth → 401.
3216        let req = crate::server::transport::HttpRequest {
3217            method: "POST".to_string(),
3218            path: "/admin/blob_cache/sweep".to_string(),
3219            query: std::collections::BTreeMap::new(),
3220            headers: std::collections::BTreeMap::new(),
3221            body: br#"{"limit_entries":1}"#.to_vec(),
3222        };
3223        let resp = server.route(req);
3224        assert_eq!(resp.status, 401, "sweep without admin token must be 401");
3225
3226        // Flush namespace without auth → 401.
3227        let req = crate::server::transport::HttpRequest {
3228            method: "POST".to_string(),
3229            path: "/admin/blob_cache/flush_namespace".to_string(),
3230            query: std::collections::BTreeMap::new(),
3231            headers: std::collections::BTreeMap::new(),
3232            body: br#"{"namespace":"x"}"#.to_vec(),
3233        };
3234        let resp = server.route(req);
3235        assert_eq!(resp.status, 401, "flush without admin token must be 401");
3236
3237        // With matching bearer → 200.
3238        let mut headers = std::collections::BTreeMap::new();
3239        headers.insert(
3240            "authorization".to_string(),
3241            "Bearer test-token-148".to_string(),
3242        );
3243        let req = crate::server::transport::HttpRequest {
3244            method: "POST".to_string(),
3245            path: "/admin/blob_cache/flush_namespace".to_string(),
3246            query: std::collections::BTreeMap::new(),
3247            headers,
3248            body: br#"{"namespace":"ok"}"#.to_vec(),
3249        };
3250        let resp = server.route(req);
3251        assert_eq!(resp.status, 200, "flush with admin token must be 200");
3252
3253        // Restore previous env state.
3254        unsafe {
3255            match prev {
3256                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3257                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3258            }
3259        }
3260    }
3261}