Skip to main content

reddb_server/server/
handlers_admin.rs

1//! Lifecycle / admin HTTP endpoints (PLAN.md Phase 1).
2//!
3//! Universal contract surface consumed by orchestrators (K8s preStop,
4//! Fly autostop, ECS drain, systemd, custom).
5//!
6//! - `POST /admin/shutdown` — flush + checkpoint + optional backup,
7//!   200 only when safe to die. Idempotent.
8//! - `POST /admin/drain` — stop accepting new writes, in-flight finish,
9//!   200 once drain complete. Soft pre-shutdown step.
10//! - `GET  /health/live` — process responsive (always cheap).
11//! - `GET  /health/ready` — accepts queries (WAL replay + restore done).
12//! - `GET  /health/startup` — same logic as ready, K8s-style longer
13//!   timeout window.
14
15use super::*;
16use crate::runtime::lifecycle::Phase;
17use std::path::{Path, PathBuf};
18
19/// Path to the persistent runtime-toggle file kept beside the
20/// `.rdb` data file. Operators can prep a fresh deploy by writing
21/// `{"read_only": true}` before first boot to come up locked.
22pub(crate) fn runtime_state_path(data_path: &Path) -> PathBuf {
23    let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
24    parent.join(".runtime-state.json")
25}
26
27/// Atomically persist the read_only toggle. Writes to a sibling
28/// `.tmp` file then renames to defeat torn writes — same pattern
29/// the snapshot publish path uses.
30pub(crate) fn persist_runtime_readonly(state_path: &Path, enabled: bool) -> std::io::Result<()> {
31    use std::io::Write;
32    let mut object = crate::json::Map::new();
33    object.insert("read_only".to_string(), crate::json::Value::Bool(enabled));
34    let body = crate::serde_json::to_string_pretty(&crate::json::Value::Object(object))
35        .map_err(|err| std::io::Error::other(err.to_string()))?;
36    if let Some(parent) = state_path.parent() {
37        if !parent.as_os_str().is_empty() {
38            std::fs::create_dir_all(parent)?;
39        }
40    }
41    let tmp = state_path.with_extension("json.tmp");
42    {
43        let mut f = std::fs::File::create(&tmp)?;
44        f.write_all(body.as_bytes())?;
45        f.sync_all()?;
46    }
47    std::fs::rename(&tmp, state_path)?;
48    Ok(())
49}
50
51/// Read a previously-persisted read_only toggle. Returns `None`
52/// when the file doesn't exist or doesn't parse — boot continues
53/// from the env-var / RedDBOptions value in that case.
54pub fn load_runtime_readonly(data_path: &Path) -> Option<bool> {
55    let state_path = runtime_state_path(data_path);
56    let bytes = std::fs::read(&state_path).ok()?;
57    let parsed: crate::json::Value = crate::json::from_slice(&bytes).ok()?;
58    parsed.get("read_only").and_then(|v| v.as_bool())
59}
60
61/// PLAN.md Phase 11.6 — default lease holder id when the operator
62/// doesn't pin one in the promotion request body. Mirrors the boot
63/// loop's resolution (`RED_LEASE_HOLDER_ID` → `<hostname>-<pid>`).
64fn default_holder_id() -> String {
65    if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
66        return explicit;
67    }
68    let host = std::env::var("HOSTNAME")
69        .or_else(|_| std::env::var("HOST"))
70        .unwrap_or_else(|_| "unknown-host".to_string());
71    format!("{host}-{}", std::process::id())
72}
73
74/// Sanitize replica IDs for use as Prometheus label values.
75/// Replaces double quotes, backslashes, and newlines so the resulting
76/// metric line stays parseable. Operators picking aggressive replica
77/// IDs is rare but malicious input must not break /metrics.
78fn sanitize_label(value: &str) -> String {
79    let mut out = String::with_capacity(value.len());
80    for ch in value.chars() {
81        match ch {
82            '"' => out.push_str("\\\""),
83            '\\' => out.push_str("\\\\"),
84            '\n' => out.push_str("\\n"),
85            '\r' => out.push_str("\\r"),
86            _ => out.push(ch),
87        }
88    }
89    out
90}
91
92/// Standard base64 decode (RFC 4648 §4, alphabet `A-Za-z0-9+/`).
93/// Returns `Err` on any invalid character; padding `=` is optional.
94fn b64_decode(input: &str) -> Result<Vec<u8>, String> {
95    let input = input.trim_end_matches('=');
96    let mut buf = Vec::with_capacity(input.len() * 3 / 4 + 1);
97
98    let lookup = |c: u8| -> Result<u32, String> {
99        match c {
100            b'A'..=b'Z' => Ok((c - b'A') as u32),
101            b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
102            b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
103            b'+' => Ok(62),
104            b'/' => Ok(63),
105            other => Err(format!("invalid base64 character: {}", other as char)),
106        }
107    };
108
109    let bytes: Vec<u8> = input.bytes().collect();
110    for chunk in bytes.chunks(4) {
111        let v: Vec<u32> = chunk.iter().map(|&b| lookup(b)).collect::<Result<_, _>>()?;
112        match v.len() {
113            4 => {
114                let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6) | v[3];
115                buf.push((n >> 16) as u8);
116                buf.push((n >> 8) as u8);
117                buf.push(n as u8);
118            }
119            3 => {
120                let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6);
121                buf.push((n >> 16) as u8);
122                buf.push((n >> 8) as u8);
123            }
124            2 => {
125                let n = (v[0] << 18) | (v[1] << 12);
126                buf.push((n >> 16) as u8);
127            }
128            _ => {}
129        }
130    }
131    Ok(buf)
132}
133
134/// Reject CR, LF, and NUL bytes in caller-controlled strings that flow into
135/// audit logs and response envelopes (ADR 0010).
136fn reject_smuggling_bytes(field: &str, value: &str) -> Option<HttpResponse> {
137    for (idx, byte) in value.as_bytes().iter().enumerate() {
138        match *byte {
139            b'\0' => {
140                return Some(json_error(
141                    400,
142                    format!("field `{field}` contains forbidden NUL byte at index {idx}"),
143                ));
144            }
145            b'\r' | b'\n' => {
146                return Some(json_error(
147                    400,
148                    format!("field `{field}` contains forbidden CR/LF byte at index {idx}"),
149                ));
150            }
151            _ => {}
152        }
153    }
154    None
155}
156
157impl RedDBServer {
158    /// `POST /admin/shutdown` — graceful shutdown coordinator.
159    /// Returns 200 with the shutdown report when complete; 200 with
160    /// the cached report when already shut down (idempotent); 500
161    /// on flush failure (process should still exit afterwards).
162    ///
163    /// The HTTP layer does not own process exit — that's the
164    /// signal-handler / `run_server` driver. This handler reports
165    /// state; orchestrators that posted SIGTERM separately will see
166    /// the process die when their grace window elapses.
167    pub(crate) fn handle_admin_shutdown(&self) -> HttpResponse {
168        let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
169            .ok()
170            .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
171            .unwrap_or(true);
172
173        match self.runtime.graceful_shutdown(backup_on_shutdown) {
174            Ok(report) => {
175                // PLAN.md Phase 6.5 — audit operator-triggered
176                // shutdown. Recorded as "ok" + duration so the log
177                // shipper can graph shutdown latency over time.
178                let mut details = Map::new();
179                details.insert(
180                    "backup_uploaded".to_string(),
181                    JsonValue::Bool(report.backup_uploaded),
182                );
183                details.insert(
184                    "duration_ms".to_string(),
185                    JsonValue::Number(report.duration_ms as f64),
186                );
187                self.runtime.audit_log().record(
188                    "admin/shutdown",
189                    "operator",
190                    "instance",
191                    "ok",
192                    JsonValue::Object(details),
193                );
194                let mut object = Map::new();
195                object.insert("ok".to_string(), JsonValue::Bool(true));
196                object.insert(
197                    "phase".to_string(),
198                    JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
199                );
200                object.insert(
201                    "flushed_wal".to_string(),
202                    JsonValue::Bool(report.flushed_wal),
203                );
204                object.insert(
205                    "final_checkpoint".to_string(),
206                    JsonValue::Bool(report.final_checkpoint),
207                );
208                object.insert(
209                    "backup_uploaded".to_string(),
210                    JsonValue::Bool(report.backup_uploaded),
211                );
212                object.insert(
213                    "duration_ms".to_string(),
214                    JsonValue::Number(report.duration_ms as f64),
215                );
216                json_response(200, JsonValue::Object(object))
217            }
218            Err(err) => json_error(500, err.to_string()),
219        }
220    }
221
222    /// `POST /admin/restore` — operator-triggered restore from the
223    /// configured remote backend (PLAN.md Phase 3.2). Refuses unless
224    /// the runtime is read_only / replica so live writes can't race
225    /// the swap. Body fields are optional:
226    /// `{"to_lsn": u64, "to_timestamp_ms": u64, "snapshot_id": str}`.
227    /// Empty body restores to latest.
228    pub(crate) fn handle_admin_restore(&self, body: Vec<u8>) -> HttpResponse {
229        if !self.runtime.write_gate().is_read_only() {
230            return json_error(
231                409,
232                "POST /admin/restore requires the runtime to be read_only or replica-role; \
233                 toggle via RED_READONLY=true or POST /admin/readonly first",
234            );
235        }
236        let db = self.runtime.db();
237        let Some(backend) = db.options().remote_backend.clone() else {
238            return json_error(412, "no remote backend configured (RED_BACKEND=none)");
239        };
240        let Some(local_path) = db.path().map(|p| p.to_path_buf()) else {
241            return json_error(412, "in-memory runtime cannot be restored from remote");
242        };
243        let snapshot_prefix = db.options().default_snapshot_prefix();
244        let wal_prefix = db.options().default_wal_archive_prefix();
245        let target_time_ms = if body.is_empty() {
246            0u64
247        } else {
248            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
249                Ok(v) => v
250                    .get("to_timestamp_ms")
251                    .and_then(|n| n.as_u64())
252                    .or_else(|| {
253                        v.get("to_timestamp")
254                            .and_then(|n| n.as_u64())
255                            .map(|s| s.saturating_mul(1000))
256                    })
257                    .unwrap_or(0),
258                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
259            }
260        };
261        let recovery =
262            crate::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
263        match recovery.restore_to(target_time_ms, &local_path) {
264            Ok(report) => {
265                let mut details = Map::new();
266                details.insert(
267                    "snapshot_used".to_string(),
268                    JsonValue::Number(report.snapshot_used as f64),
269                );
270                details.insert(
271                    "wal_segments_replayed".to_string(),
272                    JsonValue::Number(report.wal_segments_replayed as f64),
273                );
274                details.insert(
275                    "records_applied".to_string(),
276                    JsonValue::Number(report.records_applied as f64),
277                );
278                details.insert(
279                    "recovered_to_lsn".to_string(),
280                    JsonValue::Number(report.recovered_to_lsn as f64),
281                );
282                details.insert(
283                    "recovered_to_time".to_string(),
284                    JsonValue::Number(report.recovered_to_time as f64),
285                );
286                self.runtime.audit_log().record(
287                    "admin/restore",
288                    "operator",
289                    "instance",
290                    "ok",
291                    JsonValue::Object(details.clone()),
292                );
293                let mut object = Map::new();
294                object.insert("ok".to_string(), JsonValue::Bool(true));
295                for (k, v) in details {
296                    object.insert(k, v);
297                }
298                json_response(200, JsonValue::Object(object))
299            }
300            Err(err) => {
301                self.runtime.audit_log().record(
302                    "admin/restore",
303                    "operator",
304                    "instance",
305                    &format!("err: {err}"),
306                    JsonValue::Null,
307                );
308                json_error(500, err.to_string())
309            }
310        }
311    }
312
313    /// `POST /admin/backup` — operator-triggered backup, alias of
314    /// `/backup/trigger` placed under the universal `/admin/*`
315    /// namespace per PLAN.md Phase 3.3.
316    pub(crate) fn handle_admin_backup(
317        &self,
318        _query: &std::collections::BTreeMap<String, String>,
319    ) -> HttpResponse {
320        match self.runtime.trigger_backup() {
321            Ok(result) => {
322                let mut details = Map::new();
323                details.insert(
324                    "snapshot_id".to_string(),
325                    JsonValue::Number(result.snapshot_id as f64),
326                );
327                details.insert("uploaded".to_string(), JsonValue::Bool(result.uploaded));
328                details.insert(
329                    "duration_ms".to_string(),
330                    JsonValue::Number(result.duration_ms as f64),
331                );
332                self.runtime.audit_log().record(
333                    "admin/backup",
334                    "operator",
335                    "instance",
336                    "ok",
337                    JsonValue::Object(details.clone()),
338                );
339                let mut object = Map::new();
340                object.insert("ok".to_string(), JsonValue::Bool(true));
341                for (k, v) in details {
342                    object.insert(k, v);
343                }
344                json_response(200, JsonValue::Object(object))
345            }
346            Err(err) => {
347                self.runtime.audit_log().record(
348                    "admin/backup",
349                    "operator",
350                    "instance",
351                    &format!("err: {err}"),
352                    JsonValue::Null,
353                );
354                json_error(500, err.to_string())
355            }
356        }
357    }
358
359    /// `POST /admin/blob_cache/sweep` — bounded sweep of expired L1
360    /// entries on the runtime result Blob Cache (issue #148 follow-up,
361    /// closing the deferred half wired by sweeper.rs flag #4).
362    ///
363    /// Body (JSON, all fields optional):
364    ///
365    /// ```json
366    /// { "limit_entries": 1000, "limit_millis": 100 }
367    /// ```
368    ///
369    /// - Both null / missing → unbounded sweep (`SweepLimit::Either`
370    ///   with `usize::MAX` / `u32::MAX` so the sweeper still has the
371    ///   single composite-bound code path).
372    /// - One field set → `SweepLimit::Entries` or `SweepLimit::Millis`.
373    /// - Both set → `SweepLimit::Either { entries, millis }` (first
374    ///   bound to fire wins).
375    ///
376    /// Returns the [`SweepReport`](crate::storage::cache::sweeper::SweepReport)
377    /// fields plus `ok:true`. Caller-influenced strings (none today —
378    /// the report holds only numeric fields) would round-trip through
379    /// `SerializedJsonField::tainted` per ADR 0010 §3.
380    pub(crate) fn handle_admin_blob_cache_sweep(&self, body: Vec<u8>) -> HttpResponse {
381        use crate::storage::cache::sweeper::{BlobCacheSweeper, SweepLimit};
382
383        let (limit_entries, limit_millis) = if body.is_empty() {
384            (None, None)
385        } else {
386            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
387                Ok(v) => {
388                    let entries = v
389                        .get("limit_entries")
390                        .and_then(|n| n.as_u64())
391                        .map(|n| usize::try_from(n).unwrap_or(usize::MAX));
392                    let millis = v
393                        .get("limit_millis")
394                        .and_then(|n| n.as_u64())
395                        .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
396                    (entries, millis)
397                }
398                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
399            }
400        };
401
402        let limit = match (limit_entries, limit_millis) {
403            (None, None) => SweepLimit::Either {
404                entries: usize::MAX,
405                millis: u32::MAX,
406            },
407            (Some(e), None) => SweepLimit::Entries(e),
408            (None, Some(m)) => SweepLimit::Millis(m),
409            (Some(e), Some(m)) => SweepLimit::Either {
410                entries: e,
411                millis: m,
412            },
413        };
414
415        let report = BlobCacheSweeper::sweep_expired(self.runtime.result_blob_cache(), limit);
416
417        let mut object = Map::new();
418        object.insert("ok".to_string(), JsonValue::Bool(true));
419        object.insert(
420            "entries_scanned".to_string(),
421            JsonValue::Number(report.entries_scanned as f64),
422        );
423        object.insert(
424            "entries_evicted".to_string(),
425            JsonValue::Number(report.entries_evicted as f64),
426        );
427        object.insert(
428            "bytes_reclaimed".to_string(),
429            JsonValue::Number(report.bytes_reclaimed as f64),
430        );
431        object.insert(
432            "elapsed_ms".to_string(),
433            JsonValue::Number(report.elapsed_ms as f64),
434        );
435        object.insert(
436            "truncated_due_to_limit".to_string(),
437            JsonValue::Bool(report.truncated_due_to_limit),
438        );
439
440        // Audit. Operator-driven sweep is rare; logging it gives the
441        // log shipper a primary-key on which to graph cache-sweep cadence.
442        let mut details = Map::new();
443        details.insert(
444            "entries_evicted".to_string(),
445            JsonValue::Number(report.entries_evicted as f64),
446        );
447        details.insert(
448            "bytes_reclaimed".to_string(),
449            JsonValue::Number(report.bytes_reclaimed as f64),
450        );
451        details.insert(
452            "elapsed_ms".to_string(),
453            JsonValue::Number(report.elapsed_ms as f64),
454        );
455        self.runtime.audit_log().record(
456            "admin/blob_cache/sweep",
457            "operator",
458            "instance",
459            "ok",
460            JsonValue::Object(details),
461        );
462
463        json_response(200, JsonValue::Object(object))
464    }
465
466    /// `POST /admin/blob_cache/flush_namespace` — foreground-fast
467    /// namespace flush on the runtime result Blob Cache (issue #148
468    /// follow-up, closing sweeper.rs flag #4).
469    ///
470    /// Body (JSON):
471    ///
472    /// ```json
473    /// { "namespace": "tenant-42:results" }
474    /// ```
475    ///
476    /// Validation contract:
477    ///
478    /// - `namespace` is **required**, **non-empty**, and must contain
479    ///   no NUL or CR/LF bytes — the same constraints
480    ///   [`crate::server::header_escape_guard::HeaderEscapeGuard`]
481    ///   enforces on response headers, applied here on the request side
482    ///   so a CRLF-laden namespace cannot smuggle audit-log lines or
483    ///   sneak past the JSON-envelope guard. Reflected back into the
484    ///   response through
485    ///   [`crate::json_field::SerializedJsonField::tainted`] per ADR 0010 §3.
486    ///
487    /// Returns the [`NamespaceFlushReport`](crate::storage::cache::sweeper::NamespaceFlushReport)
488    /// fields plus `ok:true`.
489    pub(crate) fn handle_admin_blob_cache_flush_namespace(&self, body: Vec<u8>) -> HttpResponse {
490        use crate::storage::cache::sweeper::BlobCacheSweeper;
491
492        if body.is_empty() {
493            return json_error(400, "missing JSON body with required `namespace` field");
494        }
495        let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
496            Ok(v) => v,
497            Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
498        };
499        let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
500            Some(n) => n.to_string(),
501            None => return json_error(400, "field `namespace` is required and must be a string"),
502        };
503        if namespace.is_empty() {
504            return json_error(400, "field `namespace` must not be empty");
505        }
506        // Adversarial-byte rejection. The namespace string is
507        // caller-controlled and may end up in audit logs, dashboards,
508        // and the response envelope. Reject CR/LF/NUL on the request
509        // side rather than relying on every downstream sink to escape.
510        for (idx, byte) in namespace.as_bytes().iter().enumerate() {
511            match *byte {
512                b'\0' => {
513                    return json_error(
514                        400,
515                        format!("field `namespace` contains forbidden NUL byte at index {idx}"),
516                    );
517                }
518                b'\r' | b'\n' => {
519                    return json_error(
520                        400,
521                        format!("field `namespace` contains forbidden CR/LF byte at index {idx}"),
522                    );
523                }
524                _ => {}
525            }
526        }
527
528        let report =
529            BlobCacheSweeper::flush_namespace(self.runtime.result_blob_cache(), &namespace);
530
531        let mut object = Map::new();
532        object.insert("ok".to_string(), JsonValue::Bool(true));
533        // Reflect the caller-supplied namespace back through the
534        // JSON-boundary guard so any high-bit / Unicode bytes round-trip
535        // through canonical RFC-8259 escaping.
536        object.insert(
537            "namespace".to_string(),
538            crate::json_field::SerializedJsonField::tainted(&report.namespace),
539        );
540        object.insert(
541            "generation_before".to_string(),
542            JsonValue::Number(report.generation_before as f64),
543        );
544        object.insert(
545            "generation_after".to_string(),
546            JsonValue::Number(report.generation_after as f64),
547        );
548        object.insert(
549            "elapsed_micros".to_string(),
550            JsonValue::Number(report.elapsed_micros as f64),
551        );
552
553        let mut details = Map::new();
554        details.insert(
555            "namespace".to_string(),
556            crate::json_field::SerializedJsonField::tainted(&report.namespace),
557        );
558        details.insert(
559            "elapsed_micros".to_string(),
560            JsonValue::Number(report.elapsed_micros as f64),
561        );
562        self.runtime.audit_log().record(
563            "admin/blob_cache/flush_namespace",
564            "operator",
565            "instance",
566            "ok",
567            JsonValue::Object(details),
568        );
569
570        json_response(200, JsonValue::Object(object))
571    }
572
573    /// `POST /admin/cache/compare-and-set` — optimistic-lock put on the
574    /// runtime result Blob Cache (issue #195).
575    ///
576    /// Body (JSON):
577    ///
578    /// ```json
579    /// {
580    ///   "namespace":      "tenant-42:results",
581    ///   "key":            "query-abc",
582    ///   "expected_version": 3,
583    ///   "new_value_b64":  "<standard base64>",
584    ///   "new_version":    4,
585    ///   "ttl_ms":         60000
586    /// }
587    /// ```
588    ///
589    /// `ttl_ms` is optional. `expected_version` is informational —
590    /// the atomic guard comes from `BlobCache::put`'s internal
591    /// `check_version`: if the stored version ≥ `new_version` the
592    /// write is rejected with 409.
593    ///
594    /// Returns:
595    /// - 200 `{ committed: true, current_version }`
596    /// - 409 `{ committed: false, current_version, reason: "VersionMismatch" }`
597    /// - 400 malformed body / CRLF/NUL injection / bad base64
598    /// - 401 missing or wrong admin bearer token
599    pub(crate) fn handle_admin_blob_cache_compare_and_set(&self, body: Vec<u8>) -> HttpResponse {
600        use crate::storage::cache::blob::{BlobCachePolicy, BlobCachePut, CacheError};
601
602        if body.is_empty() {
603            return json_error(400, "missing JSON body");
604        }
605        let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
606            Ok(v) => v,
607            Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
608        };
609
610        let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
611            Some(n) if !n.is_empty() => n.to_string(),
612            Some(_) => return json_error(400, "field `namespace` must not be empty"),
613            None => return json_error(400, "field `namespace` is required and must be a string"),
614        };
615        let key = match parsed.get("key").and_then(|v| v.as_str()) {
616            Some(k) if !k.is_empty() => k.to_string(),
617            Some(_) => return json_error(400, "field `key` must not be empty"),
618            None => return json_error(400, "field `key` is required and must be a string"),
619        };
620        let new_value_b64 = match parsed.get("new_value_b64").and_then(|v| v.as_str()) {
621            Some(v) => v.to_string(),
622            None => {
623                return json_error(
624                    400,
625                    "field `new_value_b64` is required and must be a string",
626                )
627            }
628        };
629        let new_version = match parsed.get("new_version").and_then(|v| v.as_u64()) {
630            Some(v) => v,
631            None => {
632                return json_error(
633                    400,
634                    "field `new_version` is required and must be a non-negative integer",
635                )
636            }
637        };
638        // expected_version is optional and informational; validate type if present.
639        if let Some(ev) = parsed.get("expected_version") {
640            if ev.as_u64().is_none() {
641                return json_error(
642                    400,
643                    "field `expected_version` must be a non-negative integer",
644                );
645            }
646        }
647        let ttl_ms = parsed.get("ttl_ms").and_then(|v| v.as_u64());
648
649        // Adversarial-byte rejection on caller-controlled strings.
650        if let Some(err) = reject_smuggling_bytes("namespace", &namespace) {
651            return err;
652        }
653        if let Some(err) = reject_smuggling_bytes("key", &key) {
654            return err;
655        }
656
657        // Base64 decode the payload.
658        let bytes = match b64_decode(&new_value_b64) {
659            Ok(b) => b,
660            Err(e) => return json_error(400, format!("invalid base64 in `new_value_b64`: {e}")),
661        };
662
663        // Build and execute the versioned put.
664        let policy = if let Some(ttl) = ttl_ms {
665            BlobCachePolicy::default().version(new_version).ttl_ms(ttl)
666        } else {
667            BlobCachePolicy::default().version(new_version)
668        };
669        let put = BlobCachePut::new(bytes).with_policy(policy);
670
671        match self.runtime.result_blob_cache().put(&namespace, &key, put) {
672            Ok(()) => {
673                let mut obj = Map::new();
674                obj.insert("committed".to_string(), JsonValue::Bool(true));
675                obj.insert(
676                    "current_version".to_string(),
677                    JsonValue::Number(new_version as f64),
678                );
679
680                let mut details = Map::new();
681                details.insert(
682                    "namespace".to_string(),
683                    crate::json_field::SerializedJsonField::tainted(&namespace),
684                );
685                details.insert(
686                    "key".to_string(),
687                    crate::json_field::SerializedJsonField::tainted(&key),
688                );
689                details.insert(
690                    "new_version".to_string(),
691                    JsonValue::Number(new_version as f64),
692                );
693                self.runtime.audit_log().record(
694                    "admin/cache/compare_and_set",
695                    "operator",
696                    "instance",
697                    "ok",
698                    JsonValue::Object(details),
699                );
700
701                json_response(200, JsonValue::Object(obj))
702            }
703            Err(CacheError::VersionMismatch { existing, .. }) => {
704                let mut obj = Map::new();
705                obj.insert("committed".to_string(), JsonValue::Bool(false));
706                obj.insert(
707                    "current_version".to_string(),
708                    JsonValue::Number(existing as f64),
709                );
710                obj.insert(
711                    "reason".to_string(),
712                    JsonValue::String("VersionMismatch".to_string()),
713                );
714                json_response(409, JsonValue::Object(obj))
715            }
716            Err(err) => json_error(500, format!("cache put failed: {err:?}")),
717        }
718    }
719
720    /// `GET /admin/blob_cache/stats` — blob cache telemetry snapshot.
721    ///
722    /// Returns a JSON object with the global [`BlobCacheStats`] counters.
723    /// Optional query parameter `namespace` is reserved for future
724    /// per-namespace filtering; the current implementation ignores it
725    /// and always returns instance-global counters.
726    pub(crate) fn handle_admin_blob_cache_stats(
727        &self,
728        _query: &std::collections::BTreeMap<String, String>,
729    ) -> HttpResponse {
730        let s = self.runtime.result_blob_cache().stats();
731        let mut obj = Map::new();
732        obj.insert("ok".to_string(), JsonValue::Bool(true));
733        obj.insert("hits".to_string(), JsonValue::Number(s.hits() as f64));
734        obj.insert("misses".to_string(), JsonValue::Number(s.misses() as f64));
735        obj.insert(
736            "insertions".to_string(),
737            JsonValue::Number(s.insertions() as f64),
738        );
739        obj.insert(
740            "evictions".to_string(),
741            JsonValue::Number(s.evictions() as f64),
742        );
743        obj.insert(
744            "expirations".to_string(),
745            JsonValue::Number(s.expirations() as f64),
746        );
747        obj.insert(
748            "invalidations".to_string(),
749            JsonValue::Number(s.invalidations() as f64),
750        );
751        obj.insert(
752            "namespace_flushes".to_string(),
753            JsonValue::Number(s.namespace_flushes() as f64),
754        );
755        obj.insert(
756            "version_mismatches".to_string(),
757            JsonValue::Number(s.version_mismatches() as f64),
758        );
759        obj.insert("entries".to_string(), JsonValue::Number(s.entries() as f64));
760        obj.insert(
761            "bytes_in_use".to_string(),
762            JsonValue::Number(s.bytes_in_use() as f64),
763        );
764        obj.insert(
765            "l1_bytes_max".to_string(),
766            JsonValue::Number(s.l1_bytes_max() as f64),
767        );
768        obj.insert(
769            "l2_bytes_in_use".to_string(),
770            JsonValue::Number(s.l2_bytes_in_use() as f64),
771        );
772        obj.insert(
773            "l2_bytes_max".to_string(),
774            JsonValue::Number(s.l2_bytes_max() as f64),
775        );
776        obj.insert(
777            "l2_full_rejections".to_string(),
778            JsonValue::Number(s.l2_full_rejections() as f64),
779        );
780        obj.insert(
781            "l2_metadata_reads".to_string(),
782            JsonValue::Number(s.l2_metadata_reads() as f64),
783        );
784        obj.insert(
785            "l2_negative_skips".to_string(),
786            JsonValue::Number(s.l2_negative_skips() as f64),
787        );
788        obj.insert(
789            "synopsis_metadata_reads".to_string(),
790            JsonValue::Number(s.synopsis_metadata_reads() as f64),
791        );
792        obj.insert(
793            "synopsis_bytes".to_string(),
794            JsonValue::Number(s.synopsis_bytes() as f64),
795        );
796        obj.insert(
797            "namespaces".to_string(),
798            JsonValue::Number(s.namespaces() as f64),
799        );
800        obj.insert(
801            "max_namespaces".to_string(),
802            JsonValue::Number(s.max_namespaces() as f64),
803        );
804        obj.insert(
805            "promotion_queued".to_string(),
806            JsonValue::Number(s.promotion_queued() as f64),
807        );
808        obj.insert(
809            "promotion_dropped".to_string(),
810            JsonValue::Number(s.promotion_dropped() as f64),
811        );
812        obj.insert(
813            "promotion_completed".to_string(),
814            JsonValue::Number(s.promotion_completed() as f64),
815        );
816        obj.insert(
817            "promotion_queue_depth".to_string(),
818            JsonValue::Number(s.promotion_queue_depth() as f64),
819        );
820        obj.insert(
821            "l2_compression_ratio_observed".to_string(),
822            JsonValue::Number(s.l2_compression_ratio_observed()),
823        );
824        obj.insert(
825            "l2_compression_skipped_total".to_string(),
826            JsonValue::Number(s.l2_compression_skipped_total() as f64),
827        );
828        obj.insert(
829            "l2_bytes_saved_total".to_string(),
830            JsonValue::Number(s.l2_bytes_saved_total() as f64),
831        );
832        json_response(200, JsonValue::Object(obj))
833    }
834
835    /// `POST /admin/readonly` — flip the public-mutation gate
836    /// (PLAN.md Phase 4.3).
837    ///
838    /// Body: `{"enabled": true|false}`. Returns the new state. Useful
839    /// for orchestrators that need to suspend writes (maintenance,
840    /// billing suspension, hot key rotation) without killing the
841    /// process or detaching the volume. Replicas reject writes
842    /// regardless of this flag — the replication-role gate fires
843    /// first.
844    ///
845    /// Persistence: the new state is written to
846    /// `<data_dir>/.runtime-state.json` so a subsequent restart
847    /// re-applies it. Failure to persist returns 500 — the in-memory
848    /// flag is reverted so caller and disk stay consistent.
849    pub(crate) fn handle_admin_readonly(&self, body: Vec<u8>) -> HttpResponse {
850        let enabled = if body.is_empty() {
851            true
852        } else {
853            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
854                Ok(v) => v.get("enabled").and_then(|n| n.as_bool()).unwrap_or(true),
855                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
856            }
857        };
858
859        let previous = self.runtime.write_gate().set_read_only(enabled);
860
861        // Persist the toggle so a subsequent restart re-applies it
862        // before any client surface comes online. Best-effort: on
863        // failure we revert the in-memory flag so disk and runtime
864        // agree (operator can then re-issue once the storage issue
865        // is resolved).
866        if let Some(data_path) = self.runtime.db().path() {
867            let state_path = runtime_state_path(data_path);
868            if let Err(err) = persist_runtime_readonly(&state_path, enabled) {
869                self.runtime.write_gate().set_read_only(previous);
870                return json_error(
871                    500,
872                    format!("read_only persisted to {state_path:?} failed: {err}"),
873                );
874            }
875        }
876
877        let mut details = Map::new();
878        details.insert("enabled".to_string(), JsonValue::Bool(enabled));
879        details.insert("previous".to_string(), JsonValue::Bool(previous));
880        self.runtime.audit_log().record(
881            "admin/readonly",
882            "operator",
883            "instance",
884            "ok",
885            JsonValue::Object(details),
886        );
887        let mut object = Map::new();
888        object.insert("ok".to_string(), JsonValue::Bool(true));
889        object.insert("read_only".to_string(), JsonValue::Bool(enabled));
890        object.insert("previous".to_string(), JsonValue::Bool(previous));
891        json_response(200, JsonValue::Object(object))
892    }
893
894    /// `GET /metrics` — Prometheus / OpenMetrics exposition.
895    ///
896    /// Initial metric set (PLAN.md Phase 5.1) covers the
897    /// orchestrator-relevant signals: uptime, health phase, read-
898    /// only state, replication role, last-backup outcome, on-disk
899    /// size when known. Counters that need request-path
900    /// instrumentation (ops_total, query_duration_seconds_bucket)
901    /// land in a follow-up commit so this endpoint can ship today
902    /// against the existing data sources.
903    pub(crate) fn handle_metrics(&self) -> HttpResponse {
904        use std::fmt::Write;
905        let lifecycle = self.runtime.lifecycle();
906        let phase = lifecycle.phase();
907        let now_ms = std::time::SystemTime::now()
908            .duration_since(std::time::UNIX_EPOCH)
909            .map(|d| d.as_millis() as u64)
910            .unwrap_or(0);
911        let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
912        let cold_start_secs = lifecycle
913            .ready_at_ms()
914            .map(|ready_ms| (ready_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0);
915        let health_status: u8 = match phase {
916            Phase::Stopped => 0,
917            Phase::Starting | Phase::ShuttingDown => 0,
918            Phase::Draining => 1,
919            Phase::Ready => 2,
920        };
921        let read_only = self.runtime.write_gate().is_read_only();
922        let role = match self.runtime.write_gate().role() {
923            crate::replication::ReplicationRole::Standalone => "standalone",
924            crate::replication::ReplicationRole::Primary => "primary",
925            crate::replication::ReplicationRole::Replica { .. } => "replica",
926        };
927        let db_size_bytes = self
928            .runtime
929            .db()
930            .path()
931            .and_then(|p| std::fs::metadata(p).ok())
932            .map(|m| m.len())
933            .unwrap_or(0);
934        let runtime_stats = self.runtime.stats();
935        let result_blob_stats = runtime_stats.result_blob_cache;
936        let kv_stats = runtime_stats.kv;
937        let metrics_ingest = runtime_stats.metrics_ingest;
938
939        let mut body = String::with_capacity(1024);
940        let _ = writeln!(
941            body,
942            "# HELP reddb_uptime_seconds Seconds since the runtime was constructed."
943        );
944        let _ = writeln!(body, "# TYPE reddb_uptime_seconds gauge");
945        let _ = writeln!(body, "reddb_uptime_seconds {}", uptime_secs);
946
947        let _ = writeln!(
948            body,
949            "# HELP reddb_health_status 0=down/starting, 1=degraded/draining, 2=ready."
950        );
951        let _ = writeln!(body, "# TYPE reddb_health_status gauge");
952        let _ = writeln!(body, "reddb_health_status {}", health_status);
953
954        let _ = writeln!(
955            body,
956            "# HELP reddb_phase Lifecycle phase as a labeled gauge (always 1; phase in label)."
957        );
958        let _ = writeln!(body, "# TYPE reddb_phase gauge");
959        let _ = writeln!(body, "reddb_phase{{phase=\"{}\"}} 1", phase.as_str());
960
961        let _ = writeln!(
962            body,
963            "# HELP reddb_read_only 1 when public mutations are gated, 0 otherwise."
964        );
965        let _ = writeln!(body, "# TYPE reddb_read_only gauge");
966        let _ = writeln!(body, "reddb_read_only {}", if read_only { 1 } else { 0 });
967
968        let _ = writeln!(
969            body,
970            "# HELP reddb_replication_role Replication role of this instance."
971        );
972        let _ = writeln!(body, "# TYPE reddb_replication_role gauge");
973        let _ = writeln!(body, "reddb_replication_role{{role=\"{}\"}} 1", role);
974
975        // PLAN.md Phase 5 / W6 — serverless writer lease state.
976        // `not_required` for instances that opted out of lease fencing;
977        // `held` / `not_held` for instances behind the fence so dashboards
978        // can alert on lease loss without scraping logs.
979        let lease_state = self.runtime.write_gate().lease_state();
980        let _ = writeln!(
981            body,
982            "# HELP reddb_writer_lease_state Serverless writer-lease gate state (label)."
983        );
984        let _ = writeln!(body, "# TYPE reddb_writer_lease_state gauge");
985        let _ = writeln!(
986            body,
987            "reddb_writer_lease_state{{state=\"{}\"}} 1",
988            lease_state.label()
989        );
990
991        // PLAN.md Phase 5.1 — backup + WAL archive lag.
992        // These are the SRE signals an orchestrator alerts on when a
993        // serverless instance is healthy on the surface but its DR
994        // posture has degraded silently.
995        let backup_status = self.runtime.backup_status();
996        if let Some(last) = backup_status.last_backup.as_ref() {
997            let last_ts_secs = (last.timestamp as f64) / 1000.0;
998            let _ = writeln!(
999                body,
1000                "# HELP reddb_backup_last_success_timestamp_seconds Unix ts (s) of the most recent successful backup."
1001            );
1002            let _ = writeln!(
1003                body,
1004                "# TYPE reddb_backup_last_success_timestamp_seconds gauge"
1005            );
1006            let _ = writeln!(
1007                body,
1008                "reddb_backup_last_success_timestamp_seconds {}",
1009                last_ts_secs
1010            );
1011            let age_secs = ((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0;
1012            let _ = writeln!(
1013                body,
1014                "# HELP reddb_backup_age_seconds Seconds since last successful backup."
1015            );
1016            let _ = writeln!(body, "# TYPE reddb_backup_age_seconds gauge");
1017            let _ = writeln!(body, "reddb_backup_age_seconds {}", age_secs);
1018            let _ = writeln!(
1019                body,
1020                "# HELP reddb_backup_last_duration_seconds Wall-clock duration of the most recent backup."
1021            );
1022            let _ = writeln!(body, "# TYPE reddb_backup_last_duration_seconds gauge");
1023            let _ = writeln!(
1024                body,
1025                "reddb_backup_last_duration_seconds {}",
1026                (last.duration_ms as f64) / 1000.0
1027            );
1028        }
1029        let _ = writeln!(
1030            body,
1031            "# HELP reddb_backup_failures_total Total backup failures since process start."
1032        );
1033        let _ = writeln!(body, "# TYPE reddb_backup_failures_total counter");
1034        let _ = writeln!(
1035            body,
1036            "reddb_backup_failures_total {}",
1037            backup_status.total_failures
1038        );
1039        let _ = writeln!(
1040            body,
1041            "# HELP reddb_backup_total_total Total successful backups since process start."
1042        );
1043        let _ = writeln!(body, "# TYPE reddb_backup_total_total counter");
1044        let _ = writeln!(
1045            body,
1046            "reddb_backup_total_total {}",
1047            backup_status.total_backups
1048        );
1049
1050        // WAL archive lag — distance between the engine's current LSN
1051        // and the last archived LSN. Operators alert when this grows
1052        // unbounded; it means archive uploads are failing or paused
1053        // (e.g. backend unreachable, lease lost).
1054        let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1055        let lag = current_lsn.saturating_sub(last_archived_lsn);
1056        let _ = writeln!(
1057            body,
1058            "# HELP reddb_wal_current_lsn Current local LSN (most recent record visible to writers)."
1059        );
1060        let _ = writeln!(body, "# TYPE reddb_wal_current_lsn gauge");
1061        let _ = writeln!(body, "reddb_wal_current_lsn {}", current_lsn);
1062        let _ = writeln!(
1063            body,
1064            "# HELP reddb_wal_last_archived_lsn LSN of the most recently archived WAL segment."
1065        );
1066        let _ = writeln!(body, "# TYPE reddb_wal_last_archived_lsn gauge");
1067        let _ = writeln!(body, "reddb_wal_last_archived_lsn {}", last_archived_lsn);
1068        let _ = writeln!(
1069            body,
1070            "# HELP reddb_wal_archive_lag_records Records between current LSN and last archived LSN."
1071        );
1072        let _ = writeln!(body, "# TYPE reddb_wal_archive_lag_records gauge");
1073        let _ = writeln!(body, "reddb_wal_archive_lag_records {}", lag);
1074
1075        let _ = writeln!(
1076            body,
1077            "# HELP reddb_metrics_remote_write_samples_accepted_total Metrics remote-write samples accepted since process start."
1078        );
1079        let _ = writeln!(
1080            body,
1081            "# TYPE reddb_metrics_remote_write_samples_accepted_total counter"
1082        );
1083        let _ = writeln!(
1084            body,
1085            "reddb_metrics_remote_write_samples_accepted_total {}",
1086            metrics_ingest.samples_accepted
1087        );
1088        let _ = writeln!(
1089            body,
1090            "# HELP reddb_metrics_remote_write_series_accepted_total Metrics remote-write series accepted since process start."
1091        );
1092        let _ = writeln!(
1093            body,
1094            "# TYPE reddb_metrics_remote_write_series_accepted_total counter"
1095        );
1096        let _ = writeln!(
1097            body,
1098            "reddb_metrics_remote_write_series_accepted_total {}",
1099            metrics_ingest.series_accepted
1100        );
1101        let _ = writeln!(
1102            body,
1103            "# HELP reddb_metrics_remote_write_samples_rejected_total Metrics remote-write samples rejected since process start."
1104        );
1105        let _ = writeln!(
1106            body,
1107            "# TYPE reddb_metrics_remote_write_samples_rejected_total counter"
1108        );
1109        let _ = writeln!(
1110            body,
1111            "reddb_metrics_remote_write_samples_rejected_total {}",
1112            metrics_ingest.samples_rejected
1113        );
1114        let _ = writeln!(
1115            body,
1116            "# HELP reddb_metrics_remote_write_series_rejected_total Metrics remote-write series rejected since process start."
1117        );
1118        let _ = writeln!(
1119            body,
1120            "# TYPE reddb_metrics_remote_write_series_rejected_total counter"
1121        );
1122        let _ = writeln!(
1123            body,
1124            "reddb_metrics_remote_write_series_rejected_total {}",
1125            metrics_ingest.series_rejected
1126        );
1127        let _ = writeln!(
1128            body,
1129            "# HELP reddb_metrics_remote_write_series_rejected_by_reason_total Metrics remote-write series rejected since process start by reason."
1130        );
1131        let _ = writeln!(
1132            body,
1133            "# TYPE reddb_metrics_remote_write_series_rejected_by_reason_total counter"
1134        );
1135        let _ = writeln!(
1136            body,
1137            "reddb_metrics_remote_write_series_rejected_by_reason_total{{reason=\"cardinality_budget\"}} {}",
1138            metrics_ingest.series_rejected_cardinality_budget
1139        );
1140        let _ = writeln!(
1141            body,
1142            "# HELP reddb_metrics_tenant_activity_total Metrics adapter requests by tenant, namespace, and operation since process start."
1143        );
1144        let _ = writeln!(body, "# TYPE reddb_metrics_tenant_activity_total counter");
1145        for activity in self.runtime.metrics_tenant_activity_snapshot() {
1146            let _ = writeln!(
1147                body,
1148                "reddb_metrics_tenant_activity_total{{tenant=\"{}\",namespace=\"{}\",operation=\"{}\"}} {}",
1149                sanitize_label(&activity.tenant),
1150                sanitize_label(&activity.namespace),
1151                sanitize_label(&activity.operation),
1152                activity.count
1153            );
1154        }
1155
1156        // PLAN.md Phase 11.4 — per-replica lag visibility. Emitted
1157        // when this primary has registered replicas; replicas that
1158        // haven't ack'd anything yet (`last_acked_lsn == 0`) still
1159        // show up so dashboards can detect "registered but stuck".
1160        let replicas = self.runtime.primary_replica_snapshots();
1161        let _ = writeln!(
1162            body,
1163            "# HELP reddb_replica_count Currently registered replicas."
1164        );
1165        let _ = writeln!(body, "# TYPE reddb_replica_count gauge");
1166        let _ = writeln!(body, "reddb_replica_count {}", replicas.len());
1167        if !replicas.is_empty() {
1168            let replica_lag_budget_secs = std::env::var("RED_SLO_REPLICA_LAG_BUDGET_SECONDS")
1169                .ok()
1170                .and_then(|value| value.parse::<f64>().ok())
1171                .filter(|value| value.is_finite() && *value >= 0.0)
1172                .unwrap_or(60.0);
1173            let _ = writeln!(
1174                body,
1175                "# HELP reddb_replica_ack_lsn Most recent LSN acked by each replica."
1176            );
1177            let _ = writeln!(body, "# TYPE reddb_replica_ack_lsn gauge");
1178            for r in &replicas {
1179                let _ = writeln!(
1180                    body,
1181                    "reddb_replica_ack_lsn{{replica_id=\"{}\"}} {}",
1182                    sanitize_label(&r.id),
1183                    r.last_acked_lsn
1184                );
1185            }
1186            let _ = writeln!(
1187                body,
1188                "# HELP reddb_replica_lag_records Distance from primary current LSN to replica acked LSN."
1189            );
1190            let _ = writeln!(body, "# TYPE reddb_replica_lag_records gauge");
1191            for r in &replicas {
1192                let _ = writeln!(
1193                    body,
1194                    "reddb_replica_lag_records{{replica_id=\"{}\"}} {}",
1195                    sanitize_label(&r.id),
1196                    current_lsn.saturating_sub(r.last_acked_lsn)
1197                );
1198            }
1199            let _ = writeln!(
1200                body,
1201                "# HELP reddb_replica_lag_seconds Wall-clock seconds since the replica was last seen."
1202            );
1203            let _ = writeln!(body, "# TYPE reddb_replica_lag_seconds gauge");
1204            let _ = writeln!(
1205                body,
1206                "# HELP reddb_slo_lag_budget_remaining_seconds Remaining per-replica lag budget; negative means SLO breach."
1207            );
1208            let _ = writeln!(body, "# TYPE reddb_slo_lag_budget_remaining_seconds gauge");
1209            for r in &replicas {
1210                let lag_ms = (now_ms as u128).saturating_sub(r.last_seen_at_unix_ms);
1211                let lag_secs = (lag_ms as f64) / 1000.0;
1212                let _ = writeln!(
1213                    body,
1214                    "reddb_replica_lag_seconds{{replica_id=\"{}\"}} {}",
1215                    sanitize_label(&r.id),
1216                    lag_secs
1217                );
1218                let _ = writeln!(
1219                    body,
1220                    "reddb_slo_lag_budget_remaining_seconds{{replica_id=\"{}\"}} {}",
1221                    sanitize_label(&r.id),
1222                    replica_lag_budget_secs - lag_secs
1223                );
1224            }
1225        }
1226
1227        // PLAN.md Phase 11.5 — replica apply error counters and
1228        // current health label. Counters are global across the
1229        // instance lifetime; the health label reflects whatever the
1230        // replica loop last persisted (`ok`, `connecting`, `gap`,
1231        // `divergence`, `apply_error`, `stalled_gap`).
1232        let _ = writeln!(
1233            body,
1234            "# HELP reddb_replica_apply_errors_total Replica WAL apply errors since process start, by kind."
1235        );
1236        let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
1237        for (kind, count) in self.runtime.replica_apply_error_counts() {
1238            let _ = writeln!(
1239                body,
1240                "reddb_replica_apply_errors_total{{kind=\"{}\"}} {}",
1241                kind.label(),
1242                count
1243            );
1244        }
1245        if let Some(health) = self.runtime.replica_apply_health() {
1246            let _ = writeln!(
1247                body,
1248                "# HELP reddb_replica_apply_health Replica apply state (label, value=1)."
1249            );
1250            let _ = writeln!(body, "# TYPE reddb_replica_apply_health gauge");
1251            let _ = writeln!(
1252                body,
1253                "reddb_replica_apply_health{{state=\"{}\"}} 1",
1254                sanitize_label(&health)
1255            );
1256        }
1257
1258        // PLAN.md Phase 4.4 — per-caller quota rejections. Empty
1259        // when the quota is unconfigured or no caller has been
1260        // throttled yet. Opportunistic eviction here keeps the
1261        // rejection map bounded on long-lived processes.
1262        self.runtime.quota_bucket().evict_idle();
1263        let rejections = self.runtime.quota_bucket().rejection_snapshot();
1264        if !rejections.is_empty() {
1265            let _ = writeln!(
1266                body,
1267                "# HELP reddb_quota_rejected_total Requests rejected by per-caller QPS quota."
1268            );
1269            let _ = writeln!(body, "# TYPE reddb_quota_rejected_total counter");
1270            for (principal, count) in &rejections {
1271                let _ = writeln!(
1272                    body,
1273                    "reddb_quota_rejected_total{{principal=\"{}\"}} {}",
1274                    sanitize_label(principal),
1275                    count
1276                );
1277            }
1278        }
1279
1280        // PLAN.md Phase 11.4 — commit waiter outcome counters and
1281        // last-wait gauge. Operators alert when `timed_out` rises
1282        // (policy too tight or replicas stalled) and watch the
1283        // last-wait gauge for p95 trends.
1284        let (reached, timed_out, not_required, last_micros) =
1285            self.runtime.commit_waiter_metrics_snapshot();
1286        let _ = writeln!(
1287            body,
1288            "# HELP reddb_commit_wait_total Commit-wait outcomes by kind."
1289        );
1290        let _ = writeln!(body, "# TYPE reddb_commit_wait_total counter");
1291        let _ = writeln!(
1292            body,
1293            "reddb_commit_wait_total{{outcome=\"reached\"}} {}",
1294            reached
1295        );
1296        let _ = writeln!(
1297            body,
1298            "reddb_commit_wait_total{{outcome=\"timed_out\"}} {}",
1299            timed_out
1300        );
1301        let _ = writeln!(
1302            body,
1303            "reddb_commit_wait_total{{outcome=\"not_required\"}} {}",
1304            not_required
1305        );
1306        let _ = writeln!(
1307            body,
1308            "# HELP reddb_commit_wait_last_seconds Wall-clock seconds of the most recent commit wait."
1309        );
1310        let _ = writeln!(body, "# TYPE reddb_commit_wait_last_seconds gauge");
1311        let _ = writeln!(
1312            body,
1313            "reddb_commit_wait_last_seconds {}",
1314            (last_micros as f64) / 1_000_000.0
1315        );
1316
1317        // PLAN.md Phase 11.4 — declared commit policy as a labeled
1318        // gauge so dashboards can confirm what the operator pinned.
1319        // The default `local` is emitted even when no replication is
1320        // configured, so the metric is always present.
1321        let policy = self.runtime.commit_policy();
1322        let _ = writeln!(
1323            body,
1324            "# HELP reddb_primary_commit_policy Active commit policy on the primary."
1325        );
1326        let _ = writeln!(body, "# TYPE reddb_primary_commit_policy gauge");
1327        let _ = writeln!(
1328            body,
1329            "reddb_primary_commit_policy{{policy=\"{}\"}} 1",
1330            policy.label()
1331        );
1332
1333        // Blob Cache observability for the SQL result-cache adapter.
1334        // Per-namespace label cardinality is acceptable while the MVP namespace
1335        // cap stays near 256; raising that cap should move per-namespace detail
1336        // to an on-demand admin query and keep scrape metrics rolled up.
1337        let blob_ns = "runtime.result_cache";
1338        let _ = writeln!(
1339            body,
1340            "# HELP reddb_cache_blob_get_total Blob Cache get outcomes by namespace."
1341        );
1342        let _ = writeln!(body, "# TYPE reddb_cache_blob_get_total counter");
1343        let _ = writeln!(
1344            body,
1345            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l1\"}} {}",
1346            blob_ns,
1347            result_blob_stats.hits()
1348        );
1349        let _ = writeln!(
1350            body,
1351            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l2\"}} 0",
1352            blob_ns
1353        );
1354        let _ = writeln!(
1355            body,
1356            "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"miss\"}} {}",
1357            blob_ns,
1358            result_blob_stats.misses()
1359        );
1360        let _ = writeln!(
1361            body,
1362            "# HELP reddb_cache_blob_put_total Blob Cache put outcomes by namespace."
1363        );
1364        let _ = writeln!(body, "# TYPE reddb_cache_blob_put_total counter");
1365        let _ = writeln!(
1366            body,
1367            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"ok\"}} {}",
1368            blob_ns,
1369            result_blob_stats.insertions()
1370        );
1371        let _ = writeln!(
1372            body,
1373            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"version_mismatch\"}} {}",
1374            blob_ns,
1375            result_blob_stats.version_mismatches()
1376        );
1377        let _ = writeln!(
1378            body,
1379            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"too_large\"}} 0",
1380            blob_ns
1381        );
1382        let _ = writeln!(
1383            body,
1384            "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"metadata_too_large\"}} 0",
1385            blob_ns
1386        );
1387        let _ = writeln!(
1388            body,
1389            "# HELP reddb_cache_blob_invalidate_total Blob Cache invalidations by namespace and kind."
1390        );
1391        let _ = writeln!(body, "# TYPE reddb_cache_blob_invalidate_total counter");
1392        for (kind, count) in [
1393            ("key", 0),
1394            ("prefix", 0),
1395            ("tag", 0),
1396            ("dependency", result_blob_stats.invalidations()),
1397            ("namespace", result_blob_stats.namespace_flushes()),
1398        ] {
1399            let _ = writeln!(
1400                body,
1401                "reddb_cache_blob_invalidate_total{{namespace=\"{}\",kind=\"{}\"}} {}",
1402                blob_ns, kind, count
1403            );
1404        }
1405        let _ = writeln!(
1406            body,
1407            "# HELP reddb_cache_blob_evict_total Blob Cache evictions by namespace and reason."
1408        );
1409        let _ = writeln!(body, "# TYPE reddb_cache_blob_evict_total counter");
1410        for (reason, count) in [
1411            ("capacity", result_blob_stats.evictions()),
1412            ("expiry", result_blob_stats.expirations()),
1413            ("policy", 0),
1414        ] {
1415            let _ = writeln!(
1416                body,
1417                "reddb_cache_blob_evict_total{{namespace=\"{}\",reason=\"{}\"}} {}",
1418                blob_ns, reason, count
1419            );
1420        }
1421        let _ = writeln!(
1422            body,
1423            "# HELP reddb_cache_blob_l1_bytes_in_use L1 bytes currently used by Blob Cache namespace."
1424        );
1425        let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_bytes_in_use gauge");
1426        let _ = writeln!(
1427            body,
1428            "reddb_cache_blob_l1_bytes_in_use{{namespace=\"{}\"}} {}",
1429            blob_ns,
1430            result_blob_stats.bytes_in_use()
1431        );
1432        let _ = writeln!(
1433            body,
1434            "# HELP reddb_cache_blob_l1_entries L1 entries currently held by Blob Cache namespace."
1435        );
1436        let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_entries gauge");
1437        let _ = writeln!(
1438            body,
1439            "reddb_cache_blob_l1_entries{{namespace=\"{}\"}} {}",
1440            blob_ns,
1441            result_blob_stats.entries()
1442        );
1443        let _ = writeln!(
1444            body,
1445            "# HELP reddb_cache_blob_l2_bytes_in_use L2 bytes currently used by Blob Cache namespace."
1446        );
1447        let _ = writeln!(body, "# TYPE reddb_cache_blob_l2_bytes_in_use gauge");
1448        let _ = writeln!(
1449            body,
1450            "reddb_cache_blob_l2_bytes_in_use{{namespace=\"{}\"}} {}",
1451            blob_ns,
1452            result_blob_stats.l2_bytes_in_use()
1453        );
1454        let _ = writeln!(
1455            body,
1456            "# HELP reddb_cache_blob_l2_full_rejections_total Blob Cache puts rejected because L2 is full."
1457        );
1458        let _ = writeln!(
1459            body,
1460            "# TYPE reddb_cache_blob_l2_full_rejections_total counter"
1461        );
1462        let _ = writeln!(
1463            body,
1464            "reddb_cache_blob_l2_full_rejections_total{{namespace=\"{}\"}} {}",
1465            blob_ns,
1466            result_blob_stats.l2_full_rejections()
1467        );
1468        let _ = writeln!(
1469            body,
1470            "# HELP reddb_cache_blob_version_mismatch_total Blob Cache CAS version mismatches by namespace."
1471        );
1472        let _ = writeln!(
1473            body,
1474            "# TYPE reddb_cache_blob_version_mismatch_total counter"
1475        );
1476        let _ = writeln!(
1477            body,
1478            "reddb_cache_blob_version_mismatch_total{{namespace=\"{}\"}} {}",
1479            blob_ns,
1480            result_blob_stats.version_mismatches()
1481        );
1482
1483        let _ = writeln!(
1484            body,
1485            "# HELP reddb_kv_ops_total Normal-KV operations since process start."
1486        );
1487        let _ = writeln!(body, "# TYPE reddb_kv_ops_total counter");
1488        for (verb, count) in [
1489            ("put", kv_stats.puts),
1490            ("get", kv_stats.gets),
1491            ("delete", kv_stats.deletes),
1492            ("incr", kv_stats.incrs),
1493        ] {
1494            let _ = writeln!(body, "reddb_kv_ops_total{{verb=\"{}\"}} {}", verb, count);
1495        }
1496        let _ = writeln!(
1497            body,
1498            "# HELP reddb_kv_cas_total Normal-KV CAS outcomes since process start."
1499        );
1500        let _ = writeln!(body, "# TYPE reddb_kv_cas_total counter");
1501        let _ = writeln!(
1502            body,
1503            "reddb_kv_cas_total{{outcome=\"success\"}} {}",
1504            kv_stats.cas_success
1505        );
1506        let _ = writeln!(
1507            body,
1508            "reddb_kv_cas_total{{outcome=\"conflict\"}} {}",
1509            kv_stats.cas_conflict
1510        );
1511        let _ = writeln!(
1512            body,
1513            "# HELP reddb_kv_watch_streams_active Active normal-KV WATCH streams."
1514        );
1515        let _ = writeln!(body, "# TYPE reddb_kv_watch_streams_active gauge");
1516        let _ = writeln!(
1517            body,
1518            "reddb_kv_watch_streams_active {}",
1519            kv_stats.watch_streams_active
1520        );
1521        let _ = writeln!(
1522            body,
1523            "# HELP reddb_kv_watch_events_emitted_total Normal-KV WATCH events emitted since process start."
1524        );
1525        let _ = writeln!(body, "# TYPE reddb_kv_watch_events_emitted_total counter");
1526        let _ = writeln!(
1527            body,
1528            "reddb_kv_watch_events_emitted_total {}",
1529            kv_stats.watch_events_emitted
1530        );
1531        let _ = writeln!(
1532            body,
1533            "# HELP reddb_kv_watch_drops_total Normal-KV WATCH events dropped by bounded subscriber buffers."
1534        );
1535        let _ = writeln!(body, "# TYPE reddb_kv_watch_drops_total counter");
1536        let _ = writeln!(body, "reddb_kv_watch_drops_total {}", kv_stats.watch_drops);
1537
1538        let _ = writeln!(
1539            body,
1540            "# HELP reddb_db_size_bytes On-disk size of the primary database file."
1541        );
1542        let _ = writeln!(body, "# TYPE reddb_db_size_bytes gauge");
1543        let _ = writeln!(body, "reddb_db_size_bytes {}", db_size_bytes);
1544
1545        if let Some(secs) = cold_start_secs {
1546            let _ = writeln!(
1547                body,
1548                "# HELP reddb_cold_start_duration_seconds Seconds from process start to /health/ready 200."
1549            );
1550            let _ = writeln!(body, "# TYPE reddb_cold_start_duration_seconds gauge");
1551            let _ = writeln!(body, "reddb_cold_start_duration_seconds {}", secs);
1552        }
1553
1554        // PLAN.md Phase 9.1 — per-phase cold-start breakdown.
1555        // Operators use this to identify which phase dominates the
1556        // cold-start budget (restore, WAL replay, index warmup).
1557        // Phases that haven't fired yet are simply absent — no zero
1558        // entries to confuse alert rules.
1559        let phases = lifecycle.cold_start_phases().durations_ms();
1560        if !phases.is_empty() {
1561            let _ = writeln!(
1562                body,
1563                "# HELP reddb_cold_start_phase_seconds Per-phase cold-start duration."
1564            );
1565            let _ = writeln!(body, "# TYPE reddb_cold_start_phase_seconds gauge");
1566            for (name, dur_ms) in phases {
1567                let _ = writeln!(
1568                    body,
1569                    "reddb_cold_start_phase_seconds{{phase=\"{}\"}} {}",
1570                    name,
1571                    (dur_ms as f64) / 1000.0
1572                );
1573            }
1574        }
1575
1576        // Operator-imposed limits (PLAN.md Phase 4.1). Emitted as
1577        // gauges so external dashboards can graph headroom against
1578        // current usage. `0` means "no cap pinned at boot"; we
1579        // still emit it so absence vs presence is unambiguous.
1580        let limits = self.runtime.resource_limits();
1581        if let Some(v) = limits.max_db_size_bytes {
1582            let _ = writeln!(
1583                body,
1584                "# HELP reddb_limit_db_size_bytes Operator-pinned cap on the primary DB file size."
1585            );
1586            let _ = writeln!(body, "# TYPE reddb_limit_db_size_bytes gauge");
1587            let _ = writeln!(body, "reddb_limit_db_size_bytes {}", v);
1588        }
1589        if let Some(v) = limits.max_connections {
1590            let _ = writeln!(body, "# TYPE reddb_limit_connections gauge");
1591            let _ = writeln!(body, "reddb_limit_connections {}", v);
1592        }
1593        if let Some(v) = limits.max_qps {
1594            let _ = writeln!(body, "# TYPE reddb_limit_qps gauge");
1595            let _ = writeln!(body, "reddb_limit_qps {}", v);
1596        }
1597        if let Some(v) = limits.max_batch_size {
1598            let _ = writeln!(body, "# TYPE reddb_limit_batch_size gauge");
1599            let _ = writeln!(body, "reddb_limit_batch_size {}", v);
1600        }
1601        if let Some(v) = limits.max_memory_bytes {
1602            let _ = writeln!(body, "# TYPE reddb_limit_memory_bytes gauge");
1603            let _ = writeln!(body, "reddb_limit_memory_bytes {}", v);
1604        }
1605
1606        // Events outbox metrics — issue #299
1607        {
1608            use crate::runtime::impl_queue::{
1609                EVENTS_DLQ_TOTAL, EVENTS_DRAIN_RETRIES_TOTAL, EVENTS_ENQUEUED_TOTAL,
1610            };
1611            let enqueued = EVENTS_ENQUEUED_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1612            let retries = EVENTS_DRAIN_RETRIES_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1613            let dlq_total = EVENTS_DLQ_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1614
1615            let _ = writeln!(
1616                body,
1617                "# HELP reddb_events_enqueued_total Total events successfully pushed to target queues."
1618            );
1619            let _ = writeln!(body, "# TYPE reddb_events_enqueued_total counter");
1620            let _ = writeln!(body, "reddb_events_enqueued_total {enqueued}");
1621
1622            let _ = writeln!(
1623                body,
1624                "# HELP reddb_events_drain_retries_total Total event push failures that triggered DLQ routing."
1625            );
1626            let _ = writeln!(body, "# TYPE reddb_events_drain_retries_total counter");
1627            let _ = writeln!(
1628                body,
1629                "reddb_events_drain_retries_total{{reason=\"queue_full\"}} {retries}"
1630            );
1631
1632            let _ = writeln!(
1633                body,
1634                "# HELP reddb_events_dlq_total Total events routed to dead-letter queues."
1635            );
1636            let _ = writeln!(body, "# TYPE reddb_events_dlq_total counter");
1637            let _ = writeln!(body, "reddb_events_dlq_total {dlq_total}");
1638        }
1639
1640        // AI provider and embedding metrics — issue #280.
1641        crate::runtime::ai::metrics::render_ai_metrics(&mut body);
1642
1643        HttpResponse {
1644            status: 200,
1645            content_type: "text/plain; version=0.0.4",
1646            body: body.into_bytes(),
1647            extra_headers: Vec::new(),
1648        }
1649    }
1650
1651    /// `GET /admin/status` — full structured snapshot of operator-
1652    /// relevant state (PLAN.md Phase 5.4). One JSON object that
1653    /// frontend dashboards / control-plane sidecars can poll
1654    /// without scraping multiple endpoints.
1655    pub(crate) fn handle_admin_status(&self) -> HttpResponse {
1656        let lifecycle = self.runtime.lifecycle();
1657        let phase = lifecycle.phase();
1658        let now_ms = std::time::SystemTime::now()
1659            .duration_since(std::time::UNIX_EPOCH)
1660            .map(|d| d.as_millis() as u64)
1661            .unwrap_or(0);
1662        let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
1663        let read_only = self.runtime.write_gate().is_read_only();
1664        let role = match self.runtime.write_gate().role() {
1665            crate::replication::ReplicationRole::Standalone => "standalone",
1666            crate::replication::ReplicationRole::Primary => "primary",
1667            crate::replication::ReplicationRole::Replica { .. } => "replica",
1668        };
1669        let db = self.runtime.db();
1670        let db_size_bytes = db
1671            .path()
1672            .and_then(|p| std::fs::metadata(p).ok())
1673            .map(|m| m.len())
1674            .unwrap_or(0);
1675        let backend_kind = db
1676            .options()
1677            .remote_backend
1678            .as_ref()
1679            .map(|b| b.name().to_string());
1680
1681        let mut object = Map::new();
1682        object.insert(
1683            "version".to_string(),
1684            JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
1685        );
1686        object.insert(
1687            "phase".to_string(),
1688            JsonValue::String(phase.as_str().to_string()),
1689        );
1690        object.insert(
1691            "uptime_secs".to_string(),
1692            JsonValue::Number((uptime_secs * 1000.0).round() / 1000.0),
1693        );
1694        object.insert(
1695            "started_at_unix_ms".to_string(),
1696            JsonValue::Number(lifecycle.started_at_ms() as f64),
1697        );
1698        if let Some(ready_at) = lifecycle.ready_at_ms() {
1699            object.insert(
1700                "ready_at_unix_ms".to_string(),
1701                JsonValue::Number(ready_at as f64),
1702            );
1703        }
1704        object.insert(
1705            "db_size_bytes".to_string(),
1706            JsonValue::Number(db_size_bytes as f64),
1707        );
1708        object.insert("read_only".to_string(), JsonValue::Bool(read_only));
1709        object.insert(
1710            "replication_role".to_string(),
1711            JsonValue::String(role.to_string()),
1712        );
1713        object.insert(
1714            "writer_lease".to_string(),
1715            JsonValue::String(self.runtime.write_gate().lease_state().label().to_string()),
1716        );
1717
1718        // PLAN.md Phase 6.3 — surface encryption-at-rest configuration
1719        // so dashboards / `red doctor` can flag a misconfigured key
1720        // (Err on parse) before it silently leaves data plaintext.
1721        let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
1722        let mut enc_obj = Map::new();
1723        enc_obj.insert(
1724            "state".to_string(),
1725            JsonValue::String(enc_state.to_string()),
1726        );
1727        if let Some(err) = enc_error {
1728            enc_obj.insert("error".to_string(), JsonValue::String(err));
1729        }
1730        object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc_obj));
1731
1732        // Backup posture (PLAN.md Phase 5.1). `last_backup` carries
1733        // the same shape /metrics emits so dashboards and alert rules
1734        // share a single contract.
1735        let backup = self.runtime.backup_status();
1736        let mut backup_obj = Map::new();
1737        if let Some(last) = backup.last_backup.as_ref() {
1738            backup_obj.insert(
1739                "last_success_unix_ms".to_string(),
1740                JsonValue::Number(last.timestamp as f64),
1741            );
1742            backup_obj.insert(
1743                "last_duration_ms".to_string(),
1744                JsonValue::Number(last.duration_ms as f64),
1745            );
1746            backup_obj.insert(
1747                "age_seconds".to_string(),
1748                JsonValue::Number(((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0),
1749            );
1750        }
1751        backup_obj.insert(
1752            "total_successes".to_string(),
1753            JsonValue::Number(backup.total_backups as f64),
1754        );
1755        backup_obj.insert(
1756            "total_failures".to_string(),
1757            JsonValue::Number(backup.total_failures as f64),
1758        );
1759        backup_obj.insert(
1760            "interval_secs".to_string(),
1761            JsonValue::Number(backup.interval_secs as f64),
1762        );
1763        object.insert("backup".to_string(), JsonValue::Object(backup_obj));
1764
1765        // WAL archive lag.
1766        let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1767        let mut wal_obj = Map::new();
1768        wal_obj.insert(
1769            "current_lsn".to_string(),
1770            JsonValue::Number(current_lsn as f64),
1771        );
1772        wal_obj.insert(
1773            "last_archived_lsn".to_string(),
1774            JsonValue::Number(last_archived_lsn as f64),
1775        );
1776        wal_obj.insert(
1777            "archive_lag_records".to_string(),
1778            JsonValue::Number(current_lsn.saturating_sub(last_archived_lsn) as f64),
1779        );
1780        object.insert("wal".to_string(), JsonValue::Object(wal_obj));
1781
1782        // PLAN.md Phase 11.5 — replica apply health + counters.
1783        // Always emit so dashboards have a stable shape; missing
1784        // health label means this isn't a replica or no apply has
1785        // happened yet.
1786        let mut replica_obj = Map::new();
1787        if let Some(health) = self.runtime.replica_apply_health() {
1788            replica_obj.insert("apply_health".to_string(), JsonValue::String(health));
1789        }
1790        let mut errors_obj = Map::new();
1791        for (kind, count) in self.runtime.replica_apply_error_counts() {
1792            errors_obj.insert(kind.label().to_string(), JsonValue::Number(count as f64));
1793        }
1794        replica_obj.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
1795        // Per-replica array (primary view). Empty on replica/standalone.
1796        let snaps = self.runtime.primary_replica_snapshots();
1797        if !snaps.is_empty() {
1798            let arr: Vec<JsonValue> = snaps
1799                .iter()
1800                .map(|r| {
1801                    let mut o = Map::new();
1802                    o.insert("id".to_string(), JsonValue::String(r.id.clone()));
1803                    o.insert(
1804                        "last_acked_lsn".to_string(),
1805                        JsonValue::Number(r.last_acked_lsn as f64),
1806                    );
1807                    o.insert(
1808                        "last_sent_lsn".to_string(),
1809                        JsonValue::Number(r.last_sent_lsn as f64),
1810                    );
1811                    o.insert(
1812                        "last_durable_lsn".to_string(),
1813                        JsonValue::Number(r.last_durable_lsn as f64),
1814                    );
1815                    o.insert(
1816                        "last_seen_at_unix_ms".to_string(),
1817                        JsonValue::Number(r.last_seen_at_unix_ms as f64),
1818                    );
1819                    o.insert(
1820                        "lag_records".to_string(),
1821                        JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
1822                    );
1823                    if let Some(region) = &r.region {
1824                        o.insert("region".to_string(), JsonValue::String(region.clone()));
1825                    }
1826                    JsonValue::Object(o)
1827                })
1828                .collect();
1829            replica_obj.insert("primary_view".to_string(), JsonValue::Array(arr));
1830        }
1831        replica_obj.insert(
1832            "commit_policy".to_string(),
1833            JsonValue::String(self.runtime.commit_policy().label().to_string()),
1834        );
1835        // PLAN.md Phase 11.4 — durable-LSN map per replica for
1836        // ack_n debugging. Empty until at least one ack lands.
1837        let durable = self.runtime.commit_waiter_snapshot();
1838        if !durable.is_empty() {
1839            let arr: Vec<JsonValue> = durable
1840                .into_iter()
1841                .map(|(id, lsn)| {
1842                    let mut o = Map::new();
1843                    o.insert("replica_id".to_string(), JsonValue::String(id));
1844                    o.insert("durable_lsn".to_string(), JsonValue::Number(lsn as f64));
1845                    JsonValue::Object(o)
1846                })
1847                .collect();
1848            replica_obj.insert("durable_view".to_string(), JsonValue::Array(arr));
1849        }
1850        object.insert("replica".to_string(), JsonValue::Object(replica_obj));
1851        if let Some(backend) = backend_kind {
1852            object.insert("remote_backend".to_string(), JsonValue::String(backend));
1853        }
1854        // PLAN.md Phase 4.1 — operator-imposed limits surface so
1855        // external dashboards can show headroom alongside usage.
1856        let limits = self.runtime.resource_limits();
1857        let mut limits_obj = Map::new();
1858        if let Some(v) = limits.max_db_size_bytes {
1859            limits_obj.insert("max_db_size_bytes".to_string(), JsonValue::Number(v as f64));
1860        }
1861        if let Some(v) = limits.max_connections {
1862            limits_obj.insert("max_connections".to_string(), JsonValue::Number(v as f64));
1863        }
1864        if let Some(v) = limits.max_qps {
1865            limits_obj.insert("max_qps".to_string(), JsonValue::Number(v as f64));
1866        }
1867        if let Some(v) = limits.max_batch_size {
1868            limits_obj.insert("max_batch_size".to_string(), JsonValue::Number(v as f64));
1869        }
1870        if let Some(v) = limits.max_memory_bytes {
1871            limits_obj.insert("max_memory_bytes".to_string(), JsonValue::Number(v as f64));
1872        }
1873        if let Some(d) = limits.max_query_duration {
1874            limits_obj.insert(
1875                "max_query_duration_ms".to_string(),
1876                JsonValue::Number(d.as_millis() as f64),
1877            );
1878        }
1879        if let Some(v) = limits.max_result_bytes {
1880            limits_obj.insert("max_result_bytes".to_string(), JsonValue::Number(v as f64));
1881        }
1882        object.insert("limits".to_string(), JsonValue::Object(limits_obj));
1883
1884        if let Some(report) = lifecycle.shutdown_report() {
1885            let mut shutdown_obj = Map::new();
1886            shutdown_obj.insert(
1887                "duration_ms".to_string(),
1888                JsonValue::Number(report.duration_ms as f64),
1889            );
1890            shutdown_obj.insert(
1891                "flushed_wal".to_string(),
1892                JsonValue::Bool(report.flushed_wal),
1893            );
1894            shutdown_obj.insert(
1895                "backup_uploaded".to_string(),
1896                JsonValue::Bool(report.backup_uploaded),
1897            );
1898            object.insert("shutdown".to_string(), JsonValue::Object(shutdown_obj));
1899        }
1900        json_response(200, JsonValue::Object(object))
1901    }
1902
1903    /// `POST /admin/drain` — flip to Draining phase. Subsequent
1904    /// `WriteGate`-checked writes will be rejected until shutdown
1905    /// completes or another phase override re-enables Ready.
1906    /// Idempotent.
1907    /// `POST /admin/failover/promote` — manual replica → primary
1908    /// promotion (PLAN.md Phase 11.6).
1909    ///
1910    /// Hard checks before bumping the lease generation:
1911    ///   * Caller is currently a replica (role guard) — primaries
1912    ///     don't promote themselves.
1913    ///   * Remote backend is configured (lease lives there).
1914    ///   * Replica apply health is `ok` — no unresolved WAL gap or
1915    ///     divergence. A replica that's behind cannot become the
1916    ///     authoritative writer.
1917    ///   * Lease can be acquired — `try_acquire` returns success.
1918    ///     Failure surfaces the existing holder so the operator
1919    ///     understands why.
1920    ///
1921    /// Body: `{"holder_id": "...", "ttl_ms": <u64>}`. `holder_id`
1922    /// defaults to `RED_LEASE_HOLDER_ID` env / `<hostname>-<pid>`.
1923    /// `ttl_ms` defaults to 60_000.
1924    ///
1925    /// On success the response includes the new lease's generation
1926    /// and acquired_at. **Promotion does NOT flip the running role
1927    /// to primary** — the operator's runbook is to restart the
1928    /// process with `RED_REPLICATION_MODE=primary` after a
1929    /// successful promotion. Auto-role-flip is a Phase 11.6 follow-
1930    /// up that requires draining live read traffic safely.
1931    pub(crate) fn handle_admin_failover_promote(&self, body: Vec<u8>) -> HttpResponse {
1932        // Role guard.
1933        if !matches!(
1934            self.runtime.write_gate().role(),
1935            crate::replication::ReplicationRole::Replica { .. }
1936        ) {
1937            return json_error(
1938                409,
1939                "promotion only allowed on a replica (current role is not Replica)",
1940            );
1941        }
1942
1943        // Backend guard.
1944        let Some(backend) = self.runtime.db().options().remote_backend_atomic.clone() else {
1945            return json_error(
1946                412,
1947                "promotion requires a CAS-capable remote backend (use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true)",
1948            );
1949        };
1950
1951        // Apply health guard. Anything other than `ok` / `healthy`
1952        // / `connecting` indicates the replica isn't current.
1953        let health = self.runtime.replica_apply_health().unwrap_or_default();
1954        if matches!(
1955            health.as_str(),
1956            "stalled_gap" | "divergence" | "apply_error"
1957        ) {
1958            return json_error(
1959                409,
1960                format!(
1961                    "promotion refused — replica apply state is `{health}`; resolve before promoting"
1962                ),
1963            );
1964        }
1965
1966        // Body parsing.
1967        let (holder_id, ttl_ms) = if body.is_empty() {
1968            (default_holder_id(), 60_000u64)
1969        } else {
1970            match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
1971                Ok(v) => {
1972                    let holder = v
1973                        .get("holder_id")
1974                        .and_then(|n| n.as_str())
1975                        .map(|s| s.to_string())
1976                        .unwrap_or_else(default_holder_id);
1977                    let ttl = v
1978                        .get("ttl_ms")
1979                        .and_then(|n| n.as_u64())
1980                        .filter(|t| *t > 0)
1981                        .unwrap_or(60_000);
1982                    (holder, ttl)
1983                }
1984                Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
1985            }
1986        };
1987
1988        let database_key = self
1989            .runtime
1990            .db()
1991            .options()
1992            .remote_key
1993            .clone()
1994            .unwrap_or_else(|| "main".to_string());
1995        let store = crate::replication::LeaseStore::new(backend);
1996
1997        match crate::runtime::lease_lifecycle::admin_promote_lease(
1998            &store,
1999            self.runtime.audit_log(),
2000            &database_key,
2001            &holder_id,
2002            ttl_ms,
2003        ) {
2004            Ok(lease) => {
2005                let mut object = Map::new();
2006                object.insert("ok".to_string(), JsonValue::Bool(true));
2007                object.insert("holder_id".to_string(), JsonValue::String(lease.holder_id));
2008                object.insert(
2009                    "generation".to_string(),
2010                    JsonValue::Number(lease.generation as f64),
2011                );
2012                object.insert(
2013                    "acquired_at_ms".to_string(),
2014                    JsonValue::Number(lease.acquired_at_ms as f64),
2015                );
2016                object.insert(
2017                    "expires_at_ms".to_string(),
2018                    JsonValue::Number(lease.expires_at_ms as f64),
2019                );
2020                object.insert(
2021                    "next_step".to_string(),
2022                    JsonValue::String(
2023                        "restart with RED_REPLICATION_MODE=primary to start accepting writes"
2024                            .to_string(),
2025                    ),
2026                );
2027                json_response(200, JsonValue::Object(object))
2028            }
2029            Err(err) => json_error(409, format!("promotion refused: {err}")),
2030        }
2031    }
2032
2033    /// `GET /admin/audit` — structured audit log query for compliance
2034    /// (SOC 2 / HIPAA / ISO 27001). Reads the active `.audit.log`
2035    /// plus rotated `.audit.log.<ms>.zst` archives, applies the
2036    /// query filters, and returns the matching events as a JSON
2037    /// object: `{"count": n, "events": [...]}`.
2038    ///
2039    /// Supported query params:
2040    ///   * `since` / `until` — RFC 3339 (`...Z`) or ms epoch.
2041    ///   * `principal` — exact match (e.g. `alice@acme`).
2042    ///   * `tenant` — exact match.
2043    ///   * `action` — prefix match (e.g. `auth/`, `admin/`).
2044    ///   * `outcome` — `success` / `denied` / `error`.
2045    ///   * `limit` — default 100, max 1000.
2046    ///   * `format` — `json` (default) or `jsonl`.
2047    ///
2048    /// Auth: relies on the `RED_ADMIN_TOKEN` gate already enforced
2049    /// for every `/admin/*` path in `is_authorized`. When that env
2050    /// var is unset the endpoint is open — same posture as every
2051    /// other admin endpoint.
2052    pub(crate) fn handle_admin_audit_query(
2053        &self,
2054        query: &std::collections::BTreeMap<String, String>,
2055    ) -> HttpResponse {
2056        use crate::runtime::audit_log::Outcome;
2057        use crate::runtime::audit_query::{
2058            events_to_json_array, parse_time_arg, run_query, AuditQuery,
2059        };
2060
2061        let mut q = AuditQuery::new();
2062        if let Some(s) = query.get("since") {
2063            q.since_ms = parse_time_arg(s);
2064            if q.since_ms.is_none() {
2065                return json_error(400, format!("invalid 'since' value: {s}"));
2066            }
2067        }
2068        if let Some(u) = query.get("until") {
2069            q.until_ms = parse_time_arg(u);
2070            if q.until_ms.is_none() {
2071                return json_error(400, format!("invalid 'until' value: {u}"));
2072            }
2073        }
2074        if let Some(p) = query.get("principal") {
2075            if !p.is_empty() {
2076                q.principal = Some(p.clone());
2077            }
2078        }
2079        if let Some(t) = query.get("tenant") {
2080            if !t.is_empty() {
2081                q.tenant = Some(t.clone());
2082            }
2083        }
2084        if let Some(a) = query.get("action") {
2085            if !a.is_empty() {
2086                q.action_prefix = Some(a.clone());
2087            }
2088        }
2089        if let Some(o) = query.get("outcome") {
2090            if let Some(parsed) = Outcome::parse(o) {
2091                q.outcome = Some(parsed);
2092            } else {
2093                return json_error(
2094                    400,
2095                    format!("invalid 'outcome' value: {o} (expected success|denied|error)"),
2096                );
2097            }
2098        }
2099        if let Some(l) = query.get("limit") {
2100            match l.parse::<usize>() {
2101                Ok(n) if n > 0 => q.limit = n.min(1000),
2102                _ => return json_error(400, format!("invalid 'limit' value: {l}")),
2103            }
2104        } else {
2105            q.limit = 100;
2106        }
2107
2108        let format = query
2109            .get("format")
2110            .map(|s| s.to_ascii_lowercase())
2111            .unwrap_or_default();
2112
2113        let path = self.runtime.audit_log().path().to_path_buf();
2114        let events = run_query(&path, &q);
2115
2116        if format == "jsonl" || format == "ndjson" {
2117            let mut body = String::new();
2118            for ev in &events {
2119                body.push_str(&ev.to_json_line(None));
2120                body.push('\n');
2121            }
2122            return HttpResponse {
2123                status: 200,
2124                content_type: "application/x-ndjson",
2125                body: body.into_bytes(),
2126                extra_headers: Vec::new(),
2127            };
2128        }
2129
2130        json_response(200, events_to_json_array(&events))
2131    }
2132
2133    pub(crate) fn handle_admin_drain(&self) -> HttpResponse {
2134        self.runtime.lifecycle().mark_draining();
2135        self.runtime.audit_log().record(
2136            "admin/drain",
2137            "operator",
2138            "instance",
2139            "ok",
2140            JsonValue::Null,
2141        );
2142        let mut object = Map::new();
2143        object.insert("ok".to_string(), JsonValue::Bool(true));
2144        object.insert(
2145            "phase".to_string(),
2146            JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
2147        );
2148        json_response(200, JsonValue::Object(object))
2149    }
2150
2151    /// `GET /health/live` — process is alive and responsive. Always
2152    /// 200 once the runtime is constructed; 503 only after Stopped.
2153    /// Never touches I/O.
2154    pub(crate) fn handle_health_live(&self) -> HttpResponse {
2155        let phase = self.runtime.lifecycle().phase();
2156        let alive = !matches!(phase, Phase::Stopped);
2157        let status = if alive { 200 } else { 503 };
2158        let mut object = Map::new();
2159        object.insert(
2160            "status".to_string(),
2161            JsonValue::String(if alive { "alive" } else { "stopped" }.to_string()),
2162        );
2163        object.insert(
2164            "phase".to_string(),
2165            JsonValue::String(phase.as_str().to_string()),
2166        );
2167        json_response(status, JsonValue::Object(object))
2168    }
2169
2170    /// `GET /health/ready` — runtime is fully past WAL replay /
2171    /// restore-from-remote and accepts queries.
2172    pub(crate) fn handle_health_ready(&self) -> HttpResponse {
2173        self.health_ready_response("ready")
2174    }
2175
2176    /// `GET /health/startup` — Kubernetes startup probe variant.
2177    /// Same readiness logic as `/health/ready`; orchestrator gives
2178    /// it a longer grace window before failing the pod.
2179    pub(crate) fn handle_health_startup(&self) -> HttpResponse {
2180        self.health_ready_response("startup")
2181    }
2182
2183    fn health_ready_response(&self, probe: &str) -> HttpResponse {
2184        let lifecycle = self.runtime.lifecycle();
2185        let phase = lifecycle.phase();
2186        let now = std::time::SystemTime::now()
2187            .duration_since(std::time::UNIX_EPOCH)
2188            .map(|d| d.as_millis() as u64)
2189            .unwrap_or(0);
2190        let started_at = lifecycle.started_at_ms();
2191        let since_secs = (now.saturating_sub(started_at) as f64) / 1000.0;
2192        let mut object = Map::new();
2193        object.insert("probe".to_string(), JsonValue::String(probe.to_string()));
2194        object.insert(
2195            "transport_listeners".to_string(),
2196            self.transport_readiness_json(),
2197        );
2198        object.insert(
2199            "phase".to_string(),
2200            JsonValue::String(phase.as_str().to_string()),
2201        );
2202        object.insert(
2203            "since_secs".to_string(),
2204            JsonValue::Number((since_secs * 1000.0).round() / 1000.0),
2205        );
2206        if let Some(ready_at) = lifecycle.ready_at_ms() {
2207            object.insert(
2208                "ready_at_unix_ms".to_string(),
2209                JsonValue::Number(ready_at as f64),
2210            );
2211        }
2212
2213        if phase.accepts_queries() {
2214            object.insert("status".to_string(), JsonValue::String("ready".to_string()));
2215            json_response(200, JsonValue::Object(object))
2216        } else {
2217            object.insert(
2218                "status".to_string(),
2219                JsonValue::String(phase.as_str().to_string()),
2220            );
2221            if let Some(reason) = lifecycle.not_ready_reason() {
2222                object.insert("reason".to_string(), JsonValue::String(reason));
2223            } else {
2224                object.insert(
2225                    "reason".to_string(),
2226                    JsonValue::String(match phase {
2227                        Phase::Starting => "starting".to_string(),
2228                        Phase::ShuttingDown => "shutting_down".to_string(),
2229                        Phase::Stopped => "stopped".to_string(),
2230                        Phase::Draining => "draining".to_string(),
2231                        Phase::Ready => "ready".to_string(),
2232                    }),
2233                );
2234            }
2235            json_response(503, JsonValue::Object(object))
2236        }
2237    }
2238
2239    // -----------------------------------------------------------------
2240    // IAM policy admin endpoints
2241    // -----------------------------------------------------------------
2242
2243    fn iam_audit(&self, action: &str, target: &str, outcome: &str) {
2244        self.runtime
2245            .audit_log()
2246            .record(action, "operator", target, outcome, JsonValue::Null);
2247    }
2248
2249    /// `PUT /admin/policies/:id` — install or replace an IAM policy.
2250    pub(crate) fn handle_iam_policy_put(&self, id: &str, body: Vec<u8>) -> HttpResponse {
2251        let Some(store) = self.auth_store.as_ref() else {
2252            return json_error(503, "auth store not configured");
2253        };
2254        let Ok(text) = std::str::from_utf8(&body) else {
2255            return json_error(400, "body must be utf-8 JSON");
2256        };
2257        let mut policy = match crate::auth::policies::Policy::from_json_str(text) {
2258            Ok(p) => p,
2259            Err(e) => return json_error(400, format!("policy parse: {e}")),
2260        };
2261        if policy.id != id {
2262            policy.id = id.to_string();
2263        }
2264        if let Err(e) = store.put_policy(policy) {
2265            return json_error(400, e.to_string());
2266        }
2267        self.runtime.invalidate_result_cache();
2268        self.iam_audit("iam/policy.put", id, "ok");
2269        let mut obj = Map::new();
2270        obj.insert("ok".to_string(), JsonValue::Bool(true));
2271        obj.insert("id".to_string(), JsonValue::String(id.to_string()));
2272        json_response(200, JsonValue::Object(obj))
2273    }
2274
2275    /// `GET /admin/policies/:id` — fetch a single policy as JSON.
2276    pub(crate) fn handle_iam_policy_get(&self, id: &str) -> HttpResponse {
2277        let Some(store) = self.auth_store.as_ref() else {
2278            return json_error(503, "auth store not configured");
2279        };
2280        let Some(p) = store.get_policy(id) else {
2281            return json_error(404, format!("policy `{id}` not found"));
2282        };
2283        let body = p.to_json_string();
2284        HttpResponse {
2285            status: 200,
2286            content_type: "application/json",
2287            body: body.into_bytes(),
2288            extra_headers: Vec::new(),
2289        }
2290    }
2291
2292    /// `GET /admin/policies` — list policies (id-sorted summary).
2293    pub(crate) fn handle_iam_policy_list(&self) -> HttpResponse {
2294        let Some(store) = self.auth_store.as_ref() else {
2295            return json_error(503, "auth store not configured");
2296        };
2297        let pols = store.list_policies();
2298        let items: Vec<JsonValue> = pols
2299            .iter()
2300            .map(|p| {
2301                let mut obj = Map::new();
2302                obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2303                obj.insert("version".to_string(), JsonValue::Number(p.version as f64));
2304                obj.insert(
2305                    "statements".to_string(),
2306                    JsonValue::Number(p.statements.len() as f64),
2307                );
2308                obj.insert(
2309                    "tenant".to_string(),
2310                    p.tenant
2311                        .as_deref()
2312                        .map(|t| JsonValue::String(t.to_string()))
2313                        .unwrap_or(JsonValue::Null),
2314                );
2315                JsonValue::Object(obj)
2316            })
2317            .collect();
2318        let mut envelope = Map::new();
2319        envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2320        envelope.insert("items".to_string(), JsonValue::Array(items));
2321        json_response(200, JsonValue::Object(envelope))
2322    }
2323
2324    /// `DELETE /admin/policies/:id` — drop a policy.
2325    pub(crate) fn handle_iam_policy_delete(&self, id: &str) -> HttpResponse {
2326        let Some(store) = self.auth_store.as_ref() else {
2327            return json_error(503, "auth store not configured");
2328        };
2329        match store.delete_policy(id) {
2330            Ok(()) => {
2331                self.runtime.invalidate_result_cache();
2332                self.iam_audit("iam/policy.drop", id, "ok");
2333                HttpResponse {
2334                    status: 204,
2335                    content_type: "application/json",
2336                    body: Vec::new(),
2337                    extra_headers: Vec::new(),
2338                }
2339            }
2340            Err(e) => json_error(404, e.to_string()),
2341        }
2342    }
2343
2344    /// `PUT /admin/users/:user/policies/:policy_id`. `:user` may
2345    /// optionally be tenant-qualified as `tenant.username`.
2346    pub(crate) fn handle_iam_attach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2347        let Some(store) = self.auth_store.as_ref() else {
2348            return json_error(503, "auth store not configured");
2349        };
2350        let uid = decode_user_arg(user);
2351        match store.attach_policy(
2352            crate::auth::store::PrincipalRef::User(uid.clone()),
2353            policy_id,
2354        ) {
2355            Ok(()) => {
2356                self.runtime.invalidate_result_cache();
2357                self.iam_audit(
2358                    "iam/policy.attach",
2359                    &format!("user:{uid}::{policy_id}"),
2360                    "ok",
2361                );
2362                let mut obj = Map::new();
2363                obj.insert("ok".to_string(), JsonValue::Bool(true));
2364                json_response(200, JsonValue::Object(obj))
2365            }
2366            Err(e) => json_error(400, e.to_string()),
2367        }
2368    }
2369
2370    /// `DELETE /admin/users/:user/policies/:policy_id`.
2371    pub(crate) fn handle_iam_detach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2372        let Some(store) = self.auth_store.as_ref() else {
2373            return json_error(503, "auth store not configured");
2374        };
2375        let uid = decode_user_arg(user);
2376        match store.detach_policy(
2377            crate::auth::store::PrincipalRef::User(uid.clone()),
2378            policy_id,
2379        ) {
2380            Ok(()) => {
2381                self.runtime.invalidate_result_cache();
2382                self.iam_audit(
2383                    "iam/policy.detach",
2384                    &format!("user:{uid}::{policy_id}"),
2385                    "ok",
2386                );
2387                HttpResponse {
2388                    status: 204,
2389                    content_type: "application/json",
2390                    body: Vec::new(),
2391                    extra_headers: Vec::new(),
2392                }
2393            }
2394            Err(e) => json_error(400, e.to_string()),
2395        }
2396    }
2397
2398    /// `PUT /admin/users/:user/groups/:group`.
2399    pub(crate) fn handle_iam_add_user_group(&self, user: &str, group: &str) -> HttpResponse {
2400        let Some(store) = self.auth_store.as_ref() else {
2401            return json_error(503, "auth store not configured");
2402        };
2403        let uid = decode_user_arg(user);
2404        match store.add_user_to_group(&uid, group) {
2405            Ok(()) => {
2406                self.runtime.invalidate_result_cache();
2407                self.iam_audit("iam/group.add", &format!("user:{uid}::group:{group}"), "ok");
2408                let mut obj = Map::new();
2409                obj.insert("ok".to_string(), JsonValue::Bool(true));
2410                json_response(200, JsonValue::Object(obj))
2411            }
2412            Err(e) => json_error(400, e.to_string()),
2413        }
2414    }
2415
2416    /// `DELETE /admin/users/:user/groups/:group`.
2417    pub(crate) fn handle_iam_remove_user_group(&self, user: &str, group: &str) -> HttpResponse {
2418        let Some(store) = self.auth_store.as_ref() else {
2419            return json_error(503, "auth store not configured");
2420        };
2421        let uid = decode_user_arg(user);
2422        match store.remove_user_from_group(&uid, group) {
2423            Ok(()) => {
2424                self.runtime.invalidate_result_cache();
2425                self.iam_audit(
2426                    "iam/group.remove",
2427                    &format!("user:{uid}::group:{group}"),
2428                    "ok",
2429                );
2430                HttpResponse {
2431                    status: 204,
2432                    content_type: "application/json",
2433                    body: Vec::new(),
2434                    extra_headers: Vec::new(),
2435                }
2436            }
2437            Err(e) => json_error(400, e.to_string()),
2438        }
2439    }
2440
2441    /// `PUT /admin/groups/:group/policies/:policy_id`.
2442    pub(crate) fn handle_iam_attach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2443        let Some(store) = self.auth_store.as_ref() else {
2444            return json_error(503, "auth store not configured");
2445        };
2446        match store.attach_policy(
2447            crate::auth::store::PrincipalRef::Group(group.to_string()),
2448            policy_id,
2449        ) {
2450            Ok(()) => {
2451                self.runtime.invalidate_result_cache();
2452                self.iam_audit(
2453                    "iam/policy.attach",
2454                    &format!("group:{group}::{policy_id}"),
2455                    "ok",
2456                );
2457                let mut obj = Map::new();
2458                obj.insert("ok".to_string(), JsonValue::Bool(true));
2459                json_response(200, JsonValue::Object(obj))
2460            }
2461            Err(e) => json_error(400, e.to_string()),
2462        }
2463    }
2464
2465    /// `DELETE /admin/groups/:group/policies/:policy_id`.
2466    pub(crate) fn handle_iam_detach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2467        let Some(store) = self.auth_store.as_ref() else {
2468            return json_error(503, "auth store not configured");
2469        };
2470        match store.detach_policy(
2471            crate::auth::store::PrincipalRef::Group(group.to_string()),
2472            policy_id,
2473        ) {
2474            Ok(()) => {
2475                self.runtime.invalidate_result_cache();
2476                self.iam_audit(
2477                    "iam/policy.detach",
2478                    &format!("group:{group}::{policy_id}"),
2479                    "ok",
2480                );
2481                HttpResponse {
2482                    status: 204,
2483                    content_type: "application/json",
2484                    body: Vec::new(),
2485                    extra_headers: Vec::new(),
2486                }
2487            }
2488            Err(e) => json_error(400, e.to_string()),
2489        }
2490    }
2491
2492    /// `GET /admin/users/:user/effective-permissions[?resource=kind:name]`.
2493    pub(crate) fn handle_iam_effective_permissions(
2494        &self,
2495        user: &str,
2496        query: &std::collections::BTreeMap<String, String>,
2497    ) -> HttpResponse {
2498        let Some(store) = self.auth_store.as_ref() else {
2499            return json_error(503, "auth store not configured");
2500        };
2501        let uid = decode_user_arg(user);
2502        let pols = store.effective_policies(&uid);
2503
2504        // Build a JSON array of policy summaries scoped to the user.
2505        // The optional `resource` query string parameter is parsed but
2506        // currently only echoed back — fine-grained matching falls
2507        // through to `simulate`.
2508        let resource_echo = query.get("resource").cloned();
2509        let items: Vec<JsonValue> = pols
2510            .iter()
2511            .map(|p| {
2512                let mut obj = Map::new();
2513                obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2514                obj.insert(
2515                    "statements".to_string(),
2516                    JsonValue::Number(p.statements.len() as f64),
2517                );
2518                JsonValue::Object(obj)
2519            })
2520            .collect();
2521        let mut envelope = Map::new();
2522        envelope.insert("user".to_string(), JsonValue::String(uid.to_string()));
2523        if let Some(r) = resource_echo {
2524            envelope.insert("resource".to_string(), JsonValue::String(r));
2525        }
2526        envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2527        envelope.insert("policies".to_string(), JsonValue::Array(items));
2528        json_response(200, JsonValue::Object(envelope))
2529    }
2530
2531    /// `POST /admin/policies/simulate` —
2532    /// body: `{principal, action, resource: {kind, name, tenant?}, ctx?}`.
2533    pub(crate) fn handle_iam_simulate(&self, body: Vec<u8>) -> HttpResponse {
2534        let Some(store) = self.auth_store.as_ref() else {
2535            return json_error(503, "auth store not configured");
2536        };
2537        let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
2538            std::str::from_utf8(&body).unwrap_or(""),
2539        ) {
2540            Ok(v) => v,
2541            Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
2542        };
2543        let obj = match parsed.as_object() {
2544            Some(o) => o,
2545            None => return json_error(400, "body must be a JSON object"),
2546        };
2547        let principal = match obj.get("principal").and_then(|v| v.as_str()) {
2548            Some(s) => decode_user_arg(s),
2549            None => return json_error(400, "missing `principal`"),
2550        };
2551        let action = match obj.get("action").and_then(|v| v.as_str()) {
2552            Some(s) => s.to_string(),
2553            None => return json_error(400, "missing `action`"),
2554        };
2555        let resource = match obj.get("resource") {
2556            Some(JsonValue::Object(r)) => {
2557                let kind = r
2558                    .get("kind")
2559                    .and_then(|v| v.as_str())
2560                    .unwrap_or("")
2561                    .to_string();
2562                let name = r
2563                    .get("name")
2564                    .and_then(|v| v.as_str())
2565                    .unwrap_or("")
2566                    .to_string();
2567                if kind.is_empty() || name.is_empty() {
2568                    return json_error(400, "resource needs kind+name");
2569                }
2570                let mut rr = crate::auth::policies::ResourceRef::new(kind, name);
2571                if let Some(t) = r.get("tenant").and_then(|v| v.as_str()) {
2572                    rr = rr.with_tenant(t.to_string());
2573                }
2574                rr
2575            }
2576            Some(JsonValue::String(s)) => match s.split_once(':') {
2577                Some((k, n)) => crate::auth::policies::ResourceRef::new(k, n),
2578                None => return json_error(400, "resource string must be `kind:name`"),
2579            },
2580            _ => return json_error(400, "missing `resource`"),
2581        };
2582        let mut sim_ctx = crate::auth::store::SimCtx::default();
2583        if let Some(c) = obj.get("ctx").and_then(|v| v.as_object()) {
2584            if let Some(t) = c.get("current_tenant").and_then(|v| v.as_str()) {
2585                sim_ctx.current_tenant = Some(t.to_string());
2586            }
2587            if let Some(true) = c.get("mfa").and_then(|v| v.as_bool()) {
2588                sim_ctx.mfa_present = true;
2589            }
2590            if let Some(ip) = c
2591                .get("source_ip")
2592                .or_else(|| c.get("peer_ip"))
2593                .and_then(|v| v.as_str())
2594            {
2595                if let Ok(addr) = ip.parse() {
2596                    sim_ctx.peer_ip = Some(addr);
2597                }
2598            }
2599            if let Some(ms) = c.get("now_ms").and_then(|v| v.as_u64()) {
2600                sim_ctx.now_ms = Some(ms as u128);
2601            }
2602        }
2603        let outcome = store.simulate(&principal, &action, &resource, sim_ctx);
2604        let (decision_str, matched_pid, matched_sid) =
2605            crate::runtime::impl_core::decision_to_strings(&outcome.decision);
2606
2607        self.iam_audit("iam/policy.simulate", &principal.to_string(), &decision_str);
2608
2609        let mut envelope = Map::new();
2610        envelope.insert("decision".to_string(), JsonValue::String(decision_str));
2611        envelope.insert(
2612            "matched_policy_id".to_string(),
2613            matched_pid
2614                .map(JsonValue::String)
2615                .unwrap_or(JsonValue::Null),
2616        );
2617        envelope.insert(
2618            "matched_sid".to_string(),
2619            matched_sid
2620                .map(JsonValue::String)
2621                .unwrap_or(JsonValue::Null),
2622        );
2623        envelope.insert("reason".to_string(), JsonValue::String(outcome.reason));
2624        let trail: Vec<JsonValue> = outcome
2625            .trail
2626            .into_iter()
2627            .map(|t| {
2628                let mut obj = Map::new();
2629                obj.insert("policy_id".to_string(), JsonValue::String(t.policy_id));
2630                obj.insert(
2631                    "sid".to_string(),
2632                    t.sid.map(JsonValue::String).unwrap_or(JsonValue::Null),
2633                );
2634                obj.insert("matched".to_string(), JsonValue::Bool(t.matched));
2635                obj.insert(
2636                    "effect".to_string(),
2637                    JsonValue::String(
2638                        match t.effect {
2639                            crate::auth::policies::Effect::Allow => "allow",
2640                            crate::auth::policies::Effect::Deny => "deny",
2641                        }
2642                        .to_string(),
2643                    ),
2644                );
2645                obj.insert(
2646                    "why_skipped".to_string(),
2647                    t.why_skipped
2648                        .map(|s| JsonValue::String(s.to_string()))
2649                        .unwrap_or(JsonValue::Null),
2650                );
2651                JsonValue::Object(obj)
2652            })
2653            .collect();
2654        envelope.insert("trail".to_string(), JsonValue::Array(trail));
2655        json_response(200, JsonValue::Object(envelope))
2656    }
2657}
2658
2659fn decode_user_arg(raw: &str) -> crate::auth::UserId {
2660    // Accepts `username` (platform tenant), `tenant.username` or
2661    // `tenant/username` to align with the SQL path / display form.
2662    if let Some((tenant, name)) = raw.split_once('/') {
2663        return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2664    }
2665    if let Some((tenant, name)) = raw.split_once('.') {
2666        return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2667    }
2668    crate::auth::UserId::platform(raw.to_string())
2669}
2670
2671#[cfg(test)]
2672mod tests {
2673    use super::*;
2674
2675    #[test]
2676    fn metrics_expose_result_blob_cache_label_set() {
2677        let runtime =
2678            crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2679                .expect("runtime");
2680        runtime
2681            .db()
2682            .store()
2683            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
2684
2685        runtime.execute_query("SELECT 1").expect("populate miss");
2686        runtime.execute_query("SELECT 1").expect("blob hit");
2687        runtime.invalidate_result_cache();
2688
2689        let server = RedDBServer::new(runtime);
2690        let response = server.handle_metrics();
2691        let body = String::from_utf8(response.body).expect("utf8 metrics");
2692
2693        for needle in [
2694            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l1\"}",
2695            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l2\"}",
2696            "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"miss\"}",
2697            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"ok\"}",
2698            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"version_mismatch\"}",
2699            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"too_large\"}",
2700            "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"metadata_too_large\"}",
2701            "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"dependency\"}",
2702            "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"namespace\"}",
2703            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"capacity\"}",
2704            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"expiry\"}",
2705            "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"policy\"}",
2706            "reddb_cache_blob_l1_bytes_in_use{namespace=\"runtime.result_cache\"}",
2707            "reddb_cache_blob_l1_entries{namespace=\"runtime.result_cache\"}",
2708            "reddb_cache_blob_l2_bytes_in_use{namespace=\"runtime.result_cache\"}",
2709            "reddb_cache_blob_l2_full_rejections_total{namespace=\"runtime.result_cache\"}",
2710            "reddb_cache_blob_version_mismatch_total{namespace=\"runtime.result_cache\"}",
2711        ] {
2712            assert!(body.contains(needle), "missing metric line for {needle}");
2713        }
2714    }
2715
2716    // -------------------------------------------------------------------
2717    // Issue #148 — Blob Cache admin endpoints (smoke + adversarial input)
2718    // -------------------------------------------------------------------
2719
2720    fn test_server() -> RedDBServer {
2721        let runtime =
2722            crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2723                .expect("runtime");
2724        RedDBServer::new(runtime)
2725    }
2726
2727    fn parse_body(resp: &HttpResponse) -> JsonValue {
2728        let s = std::str::from_utf8(&resp.body).expect("utf8 body");
2729        crate::serde_json::from_str::<JsonValue>(s).expect("JSON body")
2730    }
2731
2732    #[test]
2733    fn admin_blob_cache_sweep_happy_path_returns_well_formed_report() {
2734        let server = test_server();
2735        let body = br#"{"limit_entries": 100, "limit_millis": 50}"#.to_vec();
2736        let resp = server.handle_admin_blob_cache_sweep(body);
2737        assert_eq!(resp.status, 200);
2738        let parsed = parse_body(&resp);
2739        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2740        // Sweeper today is bounded scaffolding; report must still be
2741        // well-formed with all expected fields present.
2742        for field in [
2743            "entries_scanned",
2744            "entries_evicted",
2745            "bytes_reclaimed",
2746            "elapsed_ms",
2747            "truncated_due_to_limit",
2748        ] {
2749            assert!(
2750                parsed.get(field).is_some(),
2751                "missing field {field} in response: {parsed:?}"
2752            );
2753        }
2754    }
2755
2756    #[test]
2757    fn admin_blob_cache_sweep_empty_body_uses_unbounded_default() {
2758        let server = test_server();
2759        let resp = server.handle_admin_blob_cache_sweep(Vec::new());
2760        assert_eq!(resp.status, 200);
2761        let parsed = parse_body(&resp);
2762        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2763    }
2764
2765    #[test]
2766    fn admin_blob_cache_sweep_invalid_json_returns_400() {
2767        let server = test_server();
2768        let resp = server.handle_admin_blob_cache_sweep(b"not json".to_vec());
2769        assert_eq!(resp.status, 400);
2770    }
2771
2772    #[test]
2773    fn admin_blob_cache_flush_namespace_happy_path() {
2774        let server = test_server();
2775        let body = br#"{"namespace": "tenant-42:results"}"#.to_vec();
2776        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2777        assert_eq!(resp.status, 200);
2778        let parsed = parse_body(&resp);
2779        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2780        assert_eq!(
2781            parsed.get("namespace").and_then(|v| v.as_str()),
2782            Some("tenant-42:results")
2783        );
2784        assert!(parsed.get("elapsed_micros").is_some());
2785        assert!(parsed.get("generation_before").is_some());
2786        assert!(parsed.get("generation_after").is_some());
2787    }
2788
2789    #[test]
2790    fn admin_blob_cache_flush_namespace_missing_body_returns_400() {
2791        let server = test_server();
2792        let resp = server.handle_admin_blob_cache_flush_namespace(Vec::new());
2793        assert_eq!(resp.status, 400);
2794        let parsed = parse_body(&resp);
2795        assert!(parsed
2796            .get("error")
2797            .and_then(|v| v.as_str())
2798            .map(|s| s.contains("namespace"))
2799            .unwrap_or(false));
2800    }
2801
2802    #[test]
2803    fn admin_blob_cache_flush_namespace_missing_field_returns_400() {
2804        let server = test_server();
2805        let body = br#"{"other": "x"}"#.to_vec();
2806        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2807        assert_eq!(resp.status, 400);
2808    }
2809
2810    #[test]
2811    fn admin_blob_cache_flush_namespace_empty_string_returns_400() {
2812        let server = test_server();
2813        let body = br#"{"namespace": ""}"#.to_vec();
2814        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2815        assert_eq!(resp.status, 400);
2816    }
2817
2818    #[test]
2819    fn admin_blob_cache_flush_namespace_rejects_crlf_smuggling_attempt() {
2820        let server = test_server();
2821        // Classic CRLF smuggling shape — the namespace tries to splice
2822        // a fake audit line into structured logs.
2823        let body = br#"{"namespace": "real-ns\r\nfake-audit: spliced"}"#.to_vec();
2824        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2825        assert_eq!(resp.status, 400);
2826        let parsed = parse_body(&resp);
2827        let msg = parsed
2828            .get("error")
2829            .and_then(|v| v.as_str())
2830            .unwrap_or_default();
2831        assert!(msg.contains("CR/LF"), "unexpected error: {msg}");
2832    }
2833
2834    #[test]
2835    fn admin_blob_cache_flush_namespace_rejects_nul_byte() {
2836        let server = test_server();
2837        // JSON `` decodes to a literal NUL byte after parse;
2838        // the guard must reject it (NUL truncates downstream sinks
2839        // like proxies and log shippers). Build the body with a
2840        // string literal so the source file contains no raw NUL.
2841        let body = b"{\"namespace\": \"with-nul-\\u0000-here\"}".to_vec();
2842        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2843        assert_eq!(resp.status, 400);
2844        let parsed = parse_body(&resp);
2845        let msg = parsed
2846            .get("error")
2847            .and_then(|v| v.as_str())
2848            .unwrap_or_default();
2849        assert!(msg.contains("NUL"), "unexpected error: {msg}");
2850    }
2851
2852    #[test]
2853    fn admin_blob_cache_flush_namespace_response_round_trips_unicode() {
2854        let server = test_server();
2855        let body = r#"{"namespace": "日本語-ns-🦀"}"#.as_bytes().to_vec();
2856        let resp = server.handle_admin_blob_cache_flush_namespace(body);
2857        assert_eq!(resp.status, 200);
2858        let parsed = parse_body(&resp);
2859        assert_eq!(
2860            parsed.get("namespace").and_then(|v| v.as_str()),
2861            Some("日本語-ns-🦀")
2862        );
2863    }
2864
2865    // -------------------------------------------------------------------
2866    // Issue #195 — compare-and-set endpoint tests
2867    // -------------------------------------------------------------------
2868
2869    fn cas_body(namespace: &str, key: &str, new_value: &[u8], new_version: u64) -> Vec<u8> {
2870        let b64 = {
2871            let mut s = String::new();
2872            for chunk in new_value.chunks(3) {
2873                const CHARS: &[u8] =
2874                    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
2875                let b0 = chunk[0] as u32;
2876                let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
2877                let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
2878                let n = (b0 << 16) | (b1 << 8) | b2;
2879                s.push(CHARS[((n >> 18) & 63) as usize] as char);
2880                s.push(CHARS[((n >> 12) & 63) as usize] as char);
2881                s.push(if chunk.len() > 1 {
2882                    CHARS[((n >> 6) & 63) as usize] as char
2883                } else {
2884                    '='
2885                });
2886                s.push(if chunk.len() > 2 {
2887                    CHARS[(n & 63) as usize] as char
2888                } else {
2889                    '='
2890                });
2891            }
2892            s
2893        };
2894        format!(
2895            r#"{{"namespace":"{namespace}","key":"{key}","expected_version":0,"new_value_b64":"{b64}","new_version":{new_version}}}"#
2896        )
2897        .into_bytes()
2898    }
2899
2900    #[test]
2901    fn cas_happy_first_write() {
2902        let server = test_server();
2903        let body = cas_body("ns1", "k1", b"hello", 1);
2904        let resp = server.handle_admin_blob_cache_compare_and_set(body);
2905        assert_eq!(resp.status, 200);
2906        let parsed = parse_body(&resp);
2907        assert_eq!(
2908            parsed.get("committed").and_then(|v| v.as_bool()),
2909            Some(true)
2910        );
2911        assert_eq!(
2912            parsed.get("current_version").and_then(|v| v.as_u64()),
2913            Some(1)
2914        );
2915    }
2916
2917    #[test]
2918    fn cas_happy_update_increments_version() {
2919        let server = test_server();
2920        // First write at version 1.
2921        server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v1", 1));
2922        // Update to version 2 — existing (1) < new_version (2) → ok.
2923        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v2", 2));
2924        assert_eq!(resp.status, 200);
2925        let parsed = parse_body(&resp);
2926        assert_eq!(
2927            parsed.get("committed").and_then(|v| v.as_bool()),
2928            Some(true)
2929        );
2930        assert_eq!(
2931            parsed.get("current_version").and_then(|v| v.as_u64()),
2932            Some(2)
2933        );
2934    }
2935
2936    #[test]
2937    fn cas_conflict_same_version_returns_409() {
2938        let server = test_server();
2939        // Write version 5.
2940        server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v1", 5));
2941        // Try to write version 5 again — existing (5) >= new_version (5) → conflict.
2942        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v2", 5));
2943        assert_eq!(resp.status, 409);
2944        let parsed = parse_body(&resp);
2945        assert_eq!(
2946            parsed.get("committed").and_then(|v| v.as_bool()),
2947            Some(false)
2948        );
2949        assert_eq!(
2950            parsed.get("reason").and_then(|v| v.as_str()),
2951            Some("VersionMismatch")
2952        );
2953    }
2954
2955    #[test]
2956    fn cas_stale_expected_version_returns_409() {
2957        let server = test_server();
2958        // Write version 10.
2959        server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v1", 10));
2960        // Try version 9 (going backwards) — existing (10) >= new_version (9) → conflict.
2961        let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v2", 9));
2962        assert_eq!(resp.status, 409);
2963        let parsed = parse_body(&resp);
2964        assert_eq!(
2965            parsed.get("current_version").and_then(|v| v.as_u64()),
2966            Some(10)
2967        );
2968    }
2969
2970    #[test]
2971    fn cas_crlf_in_namespace_returns_400() {
2972        let server = test_server();
2973        // Embed CRLF via JSON unicode escapes.
2974        let body = b"{\"namespace\":\"real\\r\\ninjected\",\"key\":\"k\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
2975        let resp = server.handle_admin_blob_cache_compare_and_set(body);
2976        assert_eq!(resp.status, 400);
2977        let parsed = parse_body(&resp);
2978        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
2979        assert!(msg.contains("CR/LF"), "expected CR/LF error, got: {msg}");
2980    }
2981
2982    #[test]
2983    fn cas_nul_in_key_returns_400() {
2984        let server = test_server();
2985        let body = b"{\"namespace\":\"ns\",\"key\":\"k\\u0000nul\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
2986        let resp = server.handle_admin_blob_cache_compare_and_set(body);
2987        assert_eq!(resp.status, 400);
2988        let parsed = parse_body(&resp);
2989        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
2990        assert!(msg.contains("NUL"), "expected NUL error, got: {msg}");
2991    }
2992
2993    #[test]
2994    fn cas_bad_base64_returns_400() {
2995        let server = test_server();
2996        let body = br#"{"namespace":"ns","key":"k","expected_version":0,"new_value_b64":"!!!invalid!!!","new_version":1}"#.to_vec();
2997        let resp = server.handle_admin_blob_cache_compare_and_set(body);
2998        assert_eq!(resp.status, 400);
2999        let parsed = parse_body(&resp);
3000        let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3001        assert!(msg.contains("base64"), "expected base64 error, got: {msg}");
3002    }
3003
3004    #[test]
3005    fn cas_missing_bearer_returns_401_via_route() {
3006        use std::sync::Mutex;
3007        static GUARD: Mutex<()> = Mutex::new(());
3008        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3009
3010        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3011        unsafe {
3012            std::env::set_var("RED_ADMIN_TOKEN", "test-token-195");
3013        }
3014
3015        let server = test_server();
3016        let req = crate::server::transport::HttpRequest {
3017            method: "POST".to_string(),
3018            path: "/admin/cache/compare-and-set".to_string(),
3019            query: std::collections::BTreeMap::new(),
3020            headers: std::collections::BTreeMap::new(),
3021            body: cas_body("ns", "k", b"v", 1),
3022        };
3023        let resp = server.route(req);
3024        assert_eq!(resp.status, 401);
3025
3026        unsafe {
3027            match prev {
3028                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3029                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3030            }
3031        }
3032    }
3033
3034    #[test]
3035    fn cas_wrong_bearer_returns_401_via_route() {
3036        use std::sync::Mutex;
3037        static GUARD: Mutex<()> = Mutex::new(());
3038        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3039
3040        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3041        unsafe {
3042            std::env::set_var("RED_ADMIN_TOKEN", "correct-token");
3043        }
3044
3045        let server = test_server();
3046        let mut headers = std::collections::BTreeMap::new();
3047        headers.insert(
3048            "authorization".to_string(),
3049            "Bearer wrong-token".to_string(),
3050        );
3051        let req = crate::server::transport::HttpRequest {
3052            method: "POST".to_string(),
3053            path: "/admin/cache/compare-and-set".to_string(),
3054            query: std::collections::BTreeMap::new(),
3055            headers,
3056            body: cas_body("ns", "k", b"v", 1),
3057        };
3058        let resp = server.route(req);
3059        assert_eq!(resp.status, 401);
3060
3061        unsafe {
3062            match prev {
3063                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3064                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3065            }
3066        }
3067    }
3068
3069    #[test]
3070    fn cas_concurrent_race_exactly_one_commits() {
3071        use std::sync::{Arc, Mutex};
3072
3073        // RedDBServer may not be Sync, so we protect it with a Mutex and share
3074        // across threads. The BlobCache's check_version runs under a shard write
3075        // lock, so even serialised calls exercise the version-monotonicity guard.
3076        let server = Arc::new(Mutex::new(test_server()));
3077        let committed = Arc::new(Mutex::new(0u32));
3078        let conflicted = Arc::new(Mutex::new(0u32));
3079
3080        let handles: Vec<_> = (0..8)
3081            .map(|_| {
3082                let server = Arc::clone(&server);
3083                let committed = Arc::clone(&committed);
3084                let conflicted = Arc::clone(&conflicted);
3085                std::thread::spawn(move || {
3086                    // All threads try to write version 1 to the same key.
3087                    let body = cas_body("race-ns", "race-key", b"payload", 1);
3088                    let resp = {
3089                        let s = server.lock().unwrap();
3090                        s.handle_admin_blob_cache_compare_and_set(body)
3091                    };
3092                    match resp.status {
3093                        200 => *committed.lock().unwrap() += 1,
3094                        409 => *conflicted.lock().unwrap() += 1,
3095                        s => panic!("unexpected status {s}"),
3096                    }
3097                })
3098            })
3099            .collect();
3100
3101        for h in handles {
3102            h.join().expect("thread panicked");
3103        }
3104
3105        assert_eq!(
3106            *committed.lock().unwrap(),
3107            1,
3108            "exactly one CAS should commit (version 1 can only be written once)"
3109        );
3110    }
3111
3112    // -------------------------------------------------------------------
3113    // Routing-layer auth gate: when RED_ADMIN_TOKEN is set the routes
3114    // must reject unauthenticated requests with 401. We exercise the
3115    // route() entrypoint, not the handler directly, so the gate (which
3116    // lives in is_authorized()) is on the path.
3117    // -------------------------------------------------------------------
3118
3119    #[test]
3120    fn admin_blob_cache_routes_reject_unauth_when_admin_token_set() {
3121        // Serialize on a per-process mutex because RED_ADMIN_TOKEN is a
3122        // process-wide env var; running other admin-auth tests in
3123        // parallel would race the unset/set sequence.
3124        use std::sync::Mutex;
3125        static GUARD: Mutex<()> = Mutex::new(());
3126        let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3127
3128        let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3129        // SAFETY: env mutation is unsafe in 2024; we serialize via
3130        // GUARD above and restore the previous value at the end.
3131        unsafe {
3132            std::env::set_var("RED_ADMIN_TOKEN", "test-token-148");
3133        }
3134
3135        let server = test_server();
3136
3137        // Sweep without auth → 401.
3138        let req = crate::server::transport::HttpRequest {
3139            method: "POST".to_string(),
3140            path: "/admin/blob_cache/sweep".to_string(),
3141            query: std::collections::BTreeMap::new(),
3142            headers: std::collections::BTreeMap::new(),
3143            body: br#"{"limit_entries":1}"#.to_vec(),
3144        };
3145        let resp = server.route(req);
3146        assert_eq!(resp.status, 401, "sweep without admin token must be 401");
3147
3148        // Flush namespace without auth → 401.
3149        let req = crate::server::transport::HttpRequest {
3150            method: "POST".to_string(),
3151            path: "/admin/blob_cache/flush_namespace".to_string(),
3152            query: std::collections::BTreeMap::new(),
3153            headers: std::collections::BTreeMap::new(),
3154            body: br#"{"namespace":"x"}"#.to_vec(),
3155        };
3156        let resp = server.route(req);
3157        assert_eq!(resp.status, 401, "flush without admin token must be 401");
3158
3159        // With matching bearer → 200.
3160        let mut headers = std::collections::BTreeMap::new();
3161        headers.insert(
3162            "authorization".to_string(),
3163            "Bearer test-token-148".to_string(),
3164        );
3165        let req = crate::server::transport::HttpRequest {
3166            method: "POST".to_string(),
3167            path: "/admin/blob_cache/flush_namespace".to_string(),
3168            query: std::collections::BTreeMap::new(),
3169            headers,
3170            body: br#"{"namespace":"ok"}"#.to_vec(),
3171        };
3172        let resp = server.route(req);
3173        assert_eq!(resp.status, 200, "flush with admin token must be 200");
3174
3175        // Restore previous env state.
3176        unsafe {
3177            match prev {
3178                Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3179                None => std::env::remove_var("RED_ADMIN_TOKEN"),
3180            }
3181        }
3182    }
3183}