1use super::*;
16use crate::runtime::lifecycle::Phase;
17use std::path::{Path, PathBuf};
18
19pub(crate) fn runtime_state_path(data_path: &Path) -> PathBuf {
23 let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
24 parent.join(".runtime-state.json")
25}
26
27pub(crate) fn persist_runtime_readonly(state_path: &Path, enabled: bool) -> std::io::Result<()> {
31 use std::io::Write;
32 let mut object = crate::json::Map::new();
33 object.insert("read_only".to_string(), crate::json::Value::Bool(enabled));
34 let body = crate::serde_json::to_string_pretty(&crate::json::Value::Object(object))
35 .map_err(|err| std::io::Error::other(err.to_string()))?;
36 if let Some(parent) = state_path.parent() {
37 if !parent.as_os_str().is_empty() {
38 std::fs::create_dir_all(parent)?;
39 }
40 }
41 let tmp = state_path.with_extension("json.tmp");
42 {
43 let mut f = std::fs::File::create(&tmp)?;
44 f.write_all(body.as_bytes())?;
45 f.sync_all()?;
46 }
47 std::fs::rename(&tmp, state_path)?;
48 Ok(())
49}
50
51pub fn load_runtime_readonly(data_path: &Path) -> Option<bool> {
55 let state_path = runtime_state_path(data_path);
56 let bytes = std::fs::read(&state_path).ok()?;
57 let parsed: crate::json::Value = crate::json::from_slice(&bytes).ok()?;
58 parsed.get("read_only").and_then(|v| v.as_bool())
59}
60
61fn default_holder_id() -> String {
65 if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
66 return explicit;
67 }
68 let host = std::env::var("HOSTNAME")
69 .or_else(|_| std::env::var("HOST"))
70 .unwrap_or_else(|_| "unknown-host".to_string());
71 format!("{host}-{}", std::process::id())
72}
73
74fn sanitize_label(value: &str) -> String {
79 let mut out = String::with_capacity(value.len());
80 for ch in value.chars() {
81 match ch {
82 '"' => out.push_str("\\\""),
83 '\\' => out.push_str("\\\\"),
84 '\n' => out.push_str("\\n"),
85 '\r' => out.push_str("\\r"),
86 _ => out.push(ch),
87 }
88 }
89 out
90}
91
92fn b64_decode(input: &str) -> Result<Vec<u8>, String> {
95 let input = input.trim_end_matches('=');
96 let mut buf = Vec::with_capacity(input.len() * 3 / 4 + 1);
97
98 let lookup = |c: u8| -> Result<u32, String> {
99 match c {
100 b'A'..=b'Z' => Ok((c - b'A') as u32),
101 b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
102 b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
103 b'+' => Ok(62),
104 b'/' => Ok(63),
105 other => Err(format!("invalid base64 character: {}", other as char)),
106 }
107 };
108
109 let bytes: Vec<u8> = input.bytes().collect();
110 for chunk in bytes.chunks(4) {
111 let v: Vec<u32> = chunk.iter().map(|&b| lookup(b)).collect::<Result<_, _>>()?;
112 match v.len() {
113 4 => {
114 let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6) | v[3];
115 buf.push((n >> 16) as u8);
116 buf.push((n >> 8) as u8);
117 buf.push(n as u8);
118 }
119 3 => {
120 let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6);
121 buf.push((n >> 16) as u8);
122 buf.push((n >> 8) as u8);
123 }
124 2 => {
125 let n = (v[0] << 18) | (v[1] << 12);
126 buf.push((n >> 16) as u8);
127 }
128 _ => {}
129 }
130 }
131 Ok(buf)
132}
133
134fn reject_smuggling_bytes(field: &str, value: &str) -> Option<HttpResponse> {
137 for (idx, byte) in value.as_bytes().iter().enumerate() {
138 match *byte {
139 b'\0' => {
140 return Some(json_error(
141 400,
142 format!("field `{field}` contains forbidden NUL byte at index {idx}"),
143 ));
144 }
145 b'\r' | b'\n' => {
146 return Some(json_error(
147 400,
148 format!("field `{field}` contains forbidden CR/LF byte at index {idx}"),
149 ));
150 }
151 _ => {}
152 }
153 }
154 None
155}
156
157impl RedDBServer {
158 pub(crate) fn handle_admin_shutdown(&self) -> HttpResponse {
168 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
169 .ok()
170 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
171 .unwrap_or(true);
172
173 match self.runtime.graceful_shutdown(backup_on_shutdown) {
174 Ok(report) => {
175 let mut details = Map::new();
179 details.insert(
180 "backup_uploaded".to_string(),
181 JsonValue::Bool(report.backup_uploaded),
182 );
183 details.insert(
184 "duration_ms".to_string(),
185 JsonValue::Number(report.duration_ms as f64),
186 );
187 self.runtime.audit_log().record(
188 "admin/shutdown",
189 "operator",
190 "instance",
191 "ok",
192 JsonValue::Object(details),
193 );
194 let mut object = Map::new();
195 object.insert("ok".to_string(), JsonValue::Bool(true));
196 object.insert(
197 "phase".to_string(),
198 JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
199 );
200 object.insert(
201 "flushed_wal".to_string(),
202 JsonValue::Bool(report.flushed_wal),
203 );
204 object.insert(
205 "final_checkpoint".to_string(),
206 JsonValue::Bool(report.final_checkpoint),
207 );
208 object.insert(
209 "backup_uploaded".to_string(),
210 JsonValue::Bool(report.backup_uploaded),
211 );
212 object.insert(
213 "duration_ms".to_string(),
214 JsonValue::Number(report.duration_ms as f64),
215 );
216 json_response(200, JsonValue::Object(object))
217 }
218 Err(err) => json_error(500, err.to_string()),
219 }
220 }
221
222 pub(crate) fn handle_admin_restore(&self, body: Vec<u8>) -> HttpResponse {
229 if !self.runtime.write_gate().is_read_only() {
230 return json_error(
231 409,
232 "POST /admin/restore requires the runtime to be read_only or replica-role; \
233 toggle via RED_READONLY=true or POST /admin/readonly first",
234 );
235 }
236 let db = self.runtime.db();
237 let Some(backend) = db.options().remote_backend.clone() else {
238 return json_error(412, "no remote backend configured (RED_BACKEND=none)");
239 };
240 let Some(local_path) = db.path().map(|p| p.to_path_buf()) else {
241 return json_error(412, "in-memory runtime cannot be restored from remote");
242 };
243 let snapshot_prefix = db.options().default_snapshot_prefix();
244 let wal_prefix = db.options().default_wal_archive_prefix();
245 let target_time_ms = if body.is_empty() {
246 0u64
247 } else {
248 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
249 Ok(v) => v
250 .get("to_timestamp_ms")
251 .and_then(|n| n.as_u64())
252 .or_else(|| {
253 v.get("to_timestamp")
254 .and_then(|n| n.as_u64())
255 .map(|s| s.saturating_mul(1000))
256 })
257 .unwrap_or(0),
258 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
259 }
260 };
261 let recovery =
262 crate::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
263 match recovery.restore_to(target_time_ms, &local_path) {
264 Ok(report) => {
265 let mut details = Map::new();
266 details.insert(
267 "snapshot_used".to_string(),
268 JsonValue::Number(report.snapshot_used as f64),
269 );
270 details.insert(
271 "wal_segments_replayed".to_string(),
272 JsonValue::Number(report.wal_segments_replayed as f64),
273 );
274 details.insert(
275 "records_applied".to_string(),
276 JsonValue::Number(report.records_applied as f64),
277 );
278 details.insert(
279 "recovered_to_lsn".to_string(),
280 JsonValue::Number(report.recovered_to_lsn as f64),
281 );
282 details.insert(
283 "recovered_to_time".to_string(),
284 JsonValue::Number(report.recovered_to_time as f64),
285 );
286 self.runtime.audit_log().record(
287 "admin/restore",
288 "operator",
289 "instance",
290 "ok",
291 JsonValue::Object(details.clone()),
292 );
293 let mut object = Map::new();
294 object.insert("ok".to_string(), JsonValue::Bool(true));
295 for (k, v) in details {
296 object.insert(k, v);
297 }
298 json_response(200, JsonValue::Object(object))
299 }
300 Err(err) => {
301 self.runtime.audit_log().record(
302 "admin/restore",
303 "operator",
304 "instance",
305 &format!("err: {err}"),
306 JsonValue::Null,
307 );
308 json_error(500, err.to_string())
309 }
310 }
311 }
312
313 pub(crate) fn handle_admin_backup(
317 &self,
318 _query: &std::collections::BTreeMap<String, String>,
319 ) -> HttpResponse {
320 match self.runtime.trigger_backup() {
321 Ok(result) => {
322 let mut details = Map::new();
323 details.insert(
324 "snapshot_id".to_string(),
325 JsonValue::Number(result.snapshot_id as f64),
326 );
327 details.insert("uploaded".to_string(), JsonValue::Bool(result.uploaded));
328 details.insert(
329 "duration_ms".to_string(),
330 JsonValue::Number(result.duration_ms as f64),
331 );
332 self.runtime.audit_log().record(
333 "admin/backup",
334 "operator",
335 "instance",
336 "ok",
337 JsonValue::Object(details.clone()),
338 );
339 let mut object = Map::new();
340 object.insert("ok".to_string(), JsonValue::Bool(true));
341 for (k, v) in details {
342 object.insert(k, v);
343 }
344 json_response(200, JsonValue::Object(object))
345 }
346 Err(err) => {
347 self.runtime.audit_log().record(
348 "admin/backup",
349 "operator",
350 "instance",
351 &format!("err: {err}"),
352 JsonValue::Null,
353 );
354 json_error(500, err.to_string())
355 }
356 }
357 }
358
359 pub(crate) fn handle_admin_blob_cache_sweep(&self, body: Vec<u8>) -> HttpResponse {
381 use crate::storage::cache::sweeper::{BlobCacheSweeper, SweepLimit};
382
383 let (limit_entries, limit_millis) = if body.is_empty() {
384 (None, None)
385 } else {
386 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
387 Ok(v) => {
388 let entries = v
389 .get("limit_entries")
390 .and_then(|n| n.as_u64())
391 .map(|n| usize::try_from(n).unwrap_or(usize::MAX));
392 let millis = v
393 .get("limit_millis")
394 .and_then(|n| n.as_u64())
395 .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
396 (entries, millis)
397 }
398 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
399 }
400 };
401
402 let limit = match (limit_entries, limit_millis) {
403 (None, None) => SweepLimit::Either {
404 entries: usize::MAX,
405 millis: u32::MAX,
406 },
407 (Some(e), None) => SweepLimit::Entries(e),
408 (None, Some(m)) => SweepLimit::Millis(m),
409 (Some(e), Some(m)) => SweepLimit::Either {
410 entries: e,
411 millis: m,
412 },
413 };
414
415 let report = BlobCacheSweeper::sweep_expired(self.runtime.result_blob_cache(), limit);
416
417 let mut object = Map::new();
418 object.insert("ok".to_string(), JsonValue::Bool(true));
419 object.insert(
420 "entries_scanned".to_string(),
421 JsonValue::Number(report.entries_scanned as f64),
422 );
423 object.insert(
424 "entries_evicted".to_string(),
425 JsonValue::Number(report.entries_evicted as f64),
426 );
427 object.insert(
428 "bytes_reclaimed".to_string(),
429 JsonValue::Number(report.bytes_reclaimed as f64),
430 );
431 object.insert(
432 "elapsed_ms".to_string(),
433 JsonValue::Number(report.elapsed_ms as f64),
434 );
435 object.insert(
436 "truncated_due_to_limit".to_string(),
437 JsonValue::Bool(report.truncated_due_to_limit),
438 );
439
440 let mut details = Map::new();
443 details.insert(
444 "entries_evicted".to_string(),
445 JsonValue::Number(report.entries_evicted as f64),
446 );
447 details.insert(
448 "bytes_reclaimed".to_string(),
449 JsonValue::Number(report.bytes_reclaimed as f64),
450 );
451 details.insert(
452 "elapsed_ms".to_string(),
453 JsonValue::Number(report.elapsed_ms as f64),
454 );
455 self.runtime.audit_log().record(
456 "admin/blob_cache/sweep",
457 "operator",
458 "instance",
459 "ok",
460 JsonValue::Object(details),
461 );
462
463 json_response(200, JsonValue::Object(object))
464 }
465
466 pub(crate) fn handle_admin_blob_cache_flush_namespace(&self, body: Vec<u8>) -> HttpResponse {
490 use crate::storage::cache::sweeper::BlobCacheSweeper;
491
492 if body.is_empty() {
493 return json_error(400, "missing JSON body with required `namespace` field");
494 }
495 let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
496 Ok(v) => v,
497 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
498 };
499 let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
500 Some(n) => n.to_string(),
501 None => return json_error(400, "field `namespace` is required and must be a string"),
502 };
503 if namespace.is_empty() {
504 return json_error(400, "field `namespace` must not be empty");
505 }
506 for (idx, byte) in namespace.as_bytes().iter().enumerate() {
511 match *byte {
512 b'\0' => {
513 return json_error(
514 400,
515 format!("field `namespace` contains forbidden NUL byte at index {idx}"),
516 );
517 }
518 b'\r' | b'\n' => {
519 return json_error(
520 400,
521 format!("field `namespace` contains forbidden CR/LF byte at index {idx}"),
522 );
523 }
524 _ => {}
525 }
526 }
527
528 let report =
529 BlobCacheSweeper::flush_namespace(self.runtime.result_blob_cache(), &namespace);
530
531 let mut object = Map::new();
532 object.insert("ok".to_string(), JsonValue::Bool(true));
533 object.insert(
537 "namespace".to_string(),
538 crate::json_field::SerializedJsonField::tainted(&report.namespace),
539 );
540 object.insert(
541 "generation_before".to_string(),
542 JsonValue::Number(report.generation_before as f64),
543 );
544 object.insert(
545 "generation_after".to_string(),
546 JsonValue::Number(report.generation_after as f64),
547 );
548 object.insert(
549 "elapsed_micros".to_string(),
550 JsonValue::Number(report.elapsed_micros as f64),
551 );
552
553 let mut details = Map::new();
554 details.insert(
555 "namespace".to_string(),
556 crate::json_field::SerializedJsonField::tainted(&report.namespace),
557 );
558 details.insert(
559 "elapsed_micros".to_string(),
560 JsonValue::Number(report.elapsed_micros as f64),
561 );
562 self.runtime.audit_log().record(
563 "admin/blob_cache/flush_namespace",
564 "operator",
565 "instance",
566 "ok",
567 JsonValue::Object(details),
568 );
569
570 json_response(200, JsonValue::Object(object))
571 }
572
573 pub(crate) fn handle_admin_blob_cache_compare_and_set(&self, body: Vec<u8>) -> HttpResponse {
600 use crate::storage::cache::blob::{BlobCachePolicy, BlobCachePut, CacheError};
601
602 if body.is_empty() {
603 return json_error(400, "missing JSON body");
604 }
605 let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
606 Ok(v) => v,
607 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
608 };
609
610 let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
611 Some(n) if !n.is_empty() => n.to_string(),
612 Some(_) => return json_error(400, "field `namespace` must not be empty"),
613 None => return json_error(400, "field `namespace` is required and must be a string"),
614 };
615 let key = match parsed.get("key").and_then(|v| v.as_str()) {
616 Some(k) if !k.is_empty() => k.to_string(),
617 Some(_) => return json_error(400, "field `key` must not be empty"),
618 None => return json_error(400, "field `key` is required and must be a string"),
619 };
620 let new_value_b64 = match parsed.get("new_value_b64").and_then(|v| v.as_str()) {
621 Some(v) => v.to_string(),
622 None => {
623 return json_error(
624 400,
625 "field `new_value_b64` is required and must be a string",
626 )
627 }
628 };
629 let new_version = match parsed.get("new_version").and_then(|v| v.as_u64()) {
630 Some(v) => v,
631 None => {
632 return json_error(
633 400,
634 "field `new_version` is required and must be a non-negative integer",
635 )
636 }
637 };
638 if let Some(ev) = parsed.get("expected_version") {
640 if ev.as_u64().is_none() {
641 return json_error(
642 400,
643 "field `expected_version` must be a non-negative integer",
644 );
645 }
646 }
647 let ttl_ms = parsed.get("ttl_ms").and_then(|v| v.as_u64());
648
649 if let Some(err) = reject_smuggling_bytes("namespace", &namespace) {
651 return err;
652 }
653 if let Some(err) = reject_smuggling_bytes("key", &key) {
654 return err;
655 }
656
657 let bytes = match b64_decode(&new_value_b64) {
659 Ok(b) => b,
660 Err(e) => return json_error(400, format!("invalid base64 in `new_value_b64`: {e}")),
661 };
662
663 let policy = if let Some(ttl) = ttl_ms {
665 BlobCachePolicy::default().version(new_version).ttl_ms(ttl)
666 } else {
667 BlobCachePolicy::default().version(new_version)
668 };
669 let put = BlobCachePut::new(bytes).with_policy(policy);
670
671 match self.runtime.result_blob_cache().put(&namespace, &key, put) {
672 Ok(()) => {
673 let mut obj = Map::new();
674 obj.insert("committed".to_string(), JsonValue::Bool(true));
675 obj.insert(
676 "current_version".to_string(),
677 JsonValue::Number(new_version as f64),
678 );
679
680 let mut details = Map::new();
681 details.insert(
682 "namespace".to_string(),
683 crate::json_field::SerializedJsonField::tainted(&namespace),
684 );
685 details.insert(
686 "key".to_string(),
687 crate::json_field::SerializedJsonField::tainted(&key),
688 );
689 details.insert(
690 "new_version".to_string(),
691 JsonValue::Number(new_version as f64),
692 );
693 self.runtime.audit_log().record(
694 "admin/cache/compare_and_set",
695 "operator",
696 "instance",
697 "ok",
698 JsonValue::Object(details),
699 );
700
701 json_response(200, JsonValue::Object(obj))
702 }
703 Err(CacheError::VersionMismatch { existing, .. }) => {
704 let mut obj = Map::new();
705 obj.insert("committed".to_string(), JsonValue::Bool(false));
706 obj.insert(
707 "current_version".to_string(),
708 JsonValue::Number(existing as f64),
709 );
710 obj.insert(
711 "reason".to_string(),
712 JsonValue::String("VersionMismatch".to_string()),
713 );
714 json_response(409, JsonValue::Object(obj))
715 }
716 Err(err) => json_error(500, format!("cache put failed: {err:?}")),
717 }
718 }
719
720 pub(crate) fn handle_admin_blob_cache_stats(
727 &self,
728 _query: &std::collections::BTreeMap<String, String>,
729 ) -> HttpResponse {
730 let s = self.runtime.result_blob_cache().stats();
731 let mut obj = Map::new();
732 obj.insert("ok".to_string(), JsonValue::Bool(true));
733 obj.insert("hits".to_string(), JsonValue::Number(s.hits() as f64));
734 obj.insert("misses".to_string(), JsonValue::Number(s.misses() as f64));
735 obj.insert(
736 "insertions".to_string(),
737 JsonValue::Number(s.insertions() as f64),
738 );
739 obj.insert(
740 "evictions".to_string(),
741 JsonValue::Number(s.evictions() as f64),
742 );
743 obj.insert(
744 "expirations".to_string(),
745 JsonValue::Number(s.expirations() as f64),
746 );
747 obj.insert(
748 "invalidations".to_string(),
749 JsonValue::Number(s.invalidations() as f64),
750 );
751 obj.insert(
752 "namespace_flushes".to_string(),
753 JsonValue::Number(s.namespace_flushes() as f64),
754 );
755 obj.insert(
756 "version_mismatches".to_string(),
757 JsonValue::Number(s.version_mismatches() as f64),
758 );
759 obj.insert("entries".to_string(), JsonValue::Number(s.entries() as f64));
760 obj.insert(
761 "bytes_in_use".to_string(),
762 JsonValue::Number(s.bytes_in_use() as f64),
763 );
764 obj.insert(
765 "l1_bytes_max".to_string(),
766 JsonValue::Number(s.l1_bytes_max() as f64),
767 );
768 obj.insert(
769 "l2_bytes_in_use".to_string(),
770 JsonValue::Number(s.l2_bytes_in_use() as f64),
771 );
772 obj.insert(
773 "l2_bytes_max".to_string(),
774 JsonValue::Number(s.l2_bytes_max() as f64),
775 );
776 obj.insert(
777 "l2_full_rejections".to_string(),
778 JsonValue::Number(s.l2_full_rejections() as f64),
779 );
780 obj.insert(
781 "l2_metadata_reads".to_string(),
782 JsonValue::Number(s.l2_metadata_reads() as f64),
783 );
784 obj.insert(
785 "l2_negative_skips".to_string(),
786 JsonValue::Number(s.l2_negative_skips() as f64),
787 );
788 obj.insert(
789 "synopsis_metadata_reads".to_string(),
790 JsonValue::Number(s.synopsis_metadata_reads() as f64),
791 );
792 obj.insert(
793 "synopsis_bytes".to_string(),
794 JsonValue::Number(s.synopsis_bytes() as f64),
795 );
796 obj.insert(
797 "namespaces".to_string(),
798 JsonValue::Number(s.namespaces() as f64),
799 );
800 obj.insert(
801 "max_namespaces".to_string(),
802 JsonValue::Number(s.max_namespaces() as f64),
803 );
804 obj.insert(
805 "promotion_queued".to_string(),
806 JsonValue::Number(s.promotion_queued() as f64),
807 );
808 obj.insert(
809 "promotion_dropped".to_string(),
810 JsonValue::Number(s.promotion_dropped() as f64),
811 );
812 obj.insert(
813 "promotion_completed".to_string(),
814 JsonValue::Number(s.promotion_completed() as f64),
815 );
816 obj.insert(
817 "promotion_queue_depth".to_string(),
818 JsonValue::Number(s.promotion_queue_depth() as f64),
819 );
820 obj.insert(
821 "l2_compression_ratio_observed".to_string(),
822 JsonValue::Number(s.l2_compression_ratio_observed()),
823 );
824 obj.insert(
825 "l2_compression_skipped_total".to_string(),
826 JsonValue::Number(s.l2_compression_skipped_total() as f64),
827 );
828 obj.insert(
829 "l2_bytes_saved_total".to_string(),
830 JsonValue::Number(s.l2_bytes_saved_total() as f64),
831 );
832 json_response(200, JsonValue::Object(obj))
833 }
834
835 pub(crate) fn handle_admin_readonly(&self, body: Vec<u8>) -> HttpResponse {
850 let enabled = if body.is_empty() {
851 true
852 } else {
853 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
854 Ok(v) => v.get("enabled").and_then(|n| n.as_bool()).unwrap_or(true),
855 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
856 }
857 };
858
859 let previous = self.runtime.write_gate().set_read_only(enabled);
860
861 if let Some(data_path) = self.runtime.db().path() {
867 let state_path = runtime_state_path(data_path);
868 if let Err(err) = persist_runtime_readonly(&state_path, enabled) {
869 self.runtime.write_gate().set_read_only(previous);
870 return json_error(
871 500,
872 format!("read_only persisted to {state_path:?} failed: {err}"),
873 );
874 }
875 }
876
877 let mut details = Map::new();
878 details.insert("enabled".to_string(), JsonValue::Bool(enabled));
879 details.insert("previous".to_string(), JsonValue::Bool(previous));
880 self.runtime.audit_log().record(
881 "admin/readonly",
882 "operator",
883 "instance",
884 "ok",
885 JsonValue::Object(details),
886 );
887 let mut object = Map::new();
888 object.insert("ok".to_string(), JsonValue::Bool(true));
889 object.insert("read_only".to_string(), JsonValue::Bool(enabled));
890 object.insert("previous".to_string(), JsonValue::Bool(previous));
891 json_response(200, JsonValue::Object(object))
892 }
893
894 pub(crate) fn handle_metrics(&self) -> HttpResponse {
904 use std::fmt::Write;
905 let lifecycle = self.runtime.lifecycle();
906 let phase = lifecycle.phase();
907 let now_ms = std::time::SystemTime::now()
908 .duration_since(std::time::UNIX_EPOCH)
909 .map(|d| d.as_millis() as u64)
910 .unwrap_or(0);
911 let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
912 let cold_start_secs = lifecycle
913 .ready_at_ms()
914 .map(|ready_ms| (ready_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0);
915 let health_status: u8 = match phase {
916 Phase::Stopped => 0,
917 Phase::Starting | Phase::ShuttingDown => 0,
918 Phase::Draining => 1,
919 Phase::Ready => 2,
920 };
921 let read_only = self.runtime.write_gate().is_read_only();
922 let role = match self.runtime.write_gate().role() {
923 crate::replication::ReplicationRole::Standalone => "standalone",
924 crate::replication::ReplicationRole::Primary => "primary",
925 crate::replication::ReplicationRole::Replica { .. } => "replica",
926 };
927 let db_size_bytes = self
928 .runtime
929 .db()
930 .path()
931 .and_then(|p| std::fs::metadata(p).ok())
932 .map(|m| m.len())
933 .unwrap_or(0);
934 let runtime_stats = self.runtime.stats();
935 let result_blob_stats = runtime_stats.result_blob_cache;
936 let kv_stats = runtime_stats.kv;
937 let metrics_ingest = runtime_stats.metrics_ingest;
938
939 let mut body = String::with_capacity(1024);
940 let _ = writeln!(
941 body,
942 "# HELP reddb_uptime_seconds Seconds since the runtime was constructed."
943 );
944 let _ = writeln!(body, "# TYPE reddb_uptime_seconds gauge");
945 let _ = writeln!(body, "reddb_uptime_seconds {}", uptime_secs);
946
947 let _ = writeln!(
948 body,
949 "# HELP reddb_health_status 0=down/starting, 1=degraded/draining, 2=ready."
950 );
951 let _ = writeln!(body, "# TYPE reddb_health_status gauge");
952 let _ = writeln!(body, "reddb_health_status {}", health_status);
953
954 let _ = writeln!(
955 body,
956 "# HELP reddb_phase Lifecycle phase as a labeled gauge (always 1; phase in label)."
957 );
958 let _ = writeln!(body, "# TYPE reddb_phase gauge");
959 let _ = writeln!(body, "reddb_phase{{phase=\"{}\"}} 1", phase.as_str());
960
961 let _ = writeln!(
962 body,
963 "# HELP reddb_read_only 1 when public mutations are gated, 0 otherwise."
964 );
965 let _ = writeln!(body, "# TYPE reddb_read_only gauge");
966 let _ = writeln!(body, "reddb_read_only {}", if read_only { 1 } else { 0 });
967
968 let _ = writeln!(
969 body,
970 "# HELP reddb_replication_role Replication role of this instance."
971 );
972 let _ = writeln!(body, "# TYPE reddb_replication_role gauge");
973 let _ = writeln!(body, "reddb_replication_role{{role=\"{}\"}} 1", role);
974
975 let lease_state = self.runtime.write_gate().lease_state();
980 let _ = writeln!(
981 body,
982 "# HELP reddb_writer_lease_state Serverless writer-lease gate state (label)."
983 );
984 let _ = writeln!(body, "# TYPE reddb_writer_lease_state gauge");
985 let _ = writeln!(
986 body,
987 "reddb_writer_lease_state{{state=\"{}\"}} 1",
988 lease_state.label()
989 );
990
991 let backup_status = self.runtime.backup_status();
996 if let Some(last) = backup_status.last_backup.as_ref() {
997 let last_ts_secs = (last.timestamp as f64) / 1000.0;
998 let _ = writeln!(
999 body,
1000 "# HELP reddb_backup_last_success_timestamp_seconds Unix ts (s) of the most recent successful backup."
1001 );
1002 let _ = writeln!(
1003 body,
1004 "# TYPE reddb_backup_last_success_timestamp_seconds gauge"
1005 );
1006 let _ = writeln!(
1007 body,
1008 "reddb_backup_last_success_timestamp_seconds {}",
1009 last_ts_secs
1010 );
1011 let age_secs = ((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0;
1012 let _ = writeln!(
1013 body,
1014 "# HELP reddb_backup_age_seconds Seconds since last successful backup."
1015 );
1016 let _ = writeln!(body, "# TYPE reddb_backup_age_seconds gauge");
1017 let _ = writeln!(body, "reddb_backup_age_seconds {}", age_secs);
1018 let _ = writeln!(
1019 body,
1020 "# HELP reddb_backup_last_duration_seconds Wall-clock duration of the most recent backup."
1021 );
1022 let _ = writeln!(body, "# TYPE reddb_backup_last_duration_seconds gauge");
1023 let _ = writeln!(
1024 body,
1025 "reddb_backup_last_duration_seconds {}",
1026 (last.duration_ms as f64) / 1000.0
1027 );
1028 }
1029 let _ = writeln!(
1030 body,
1031 "# HELP reddb_backup_failures_total Total backup failures since process start."
1032 );
1033 let _ = writeln!(body, "# TYPE reddb_backup_failures_total counter");
1034 let _ = writeln!(
1035 body,
1036 "reddb_backup_failures_total {}",
1037 backup_status.total_failures
1038 );
1039 let _ = writeln!(
1040 body,
1041 "# HELP reddb_backup_total_total Total successful backups since process start."
1042 );
1043 let _ = writeln!(body, "# TYPE reddb_backup_total_total counter");
1044 let _ = writeln!(
1045 body,
1046 "reddb_backup_total_total {}",
1047 backup_status.total_backups
1048 );
1049
1050 let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1055 let lag = current_lsn.saturating_sub(last_archived_lsn);
1056 let _ = writeln!(
1057 body,
1058 "# HELP reddb_wal_current_lsn Current local LSN (most recent record visible to writers)."
1059 );
1060 let _ = writeln!(body, "# TYPE reddb_wal_current_lsn gauge");
1061 let _ = writeln!(body, "reddb_wal_current_lsn {}", current_lsn);
1062 let _ = writeln!(
1063 body,
1064 "# HELP reddb_wal_last_archived_lsn LSN of the most recently archived WAL segment."
1065 );
1066 let _ = writeln!(body, "# TYPE reddb_wal_last_archived_lsn gauge");
1067 let _ = writeln!(body, "reddb_wal_last_archived_lsn {}", last_archived_lsn);
1068 let _ = writeln!(
1069 body,
1070 "# HELP reddb_wal_archive_lag_records Records between current LSN and last archived LSN."
1071 );
1072 let _ = writeln!(body, "# TYPE reddb_wal_archive_lag_records gauge");
1073 let _ = writeln!(body, "reddb_wal_archive_lag_records {}", lag);
1074
1075 let _ = writeln!(
1076 body,
1077 "# HELP reddb_metrics_remote_write_samples_accepted_total Metrics remote-write samples accepted since process start."
1078 );
1079 let _ = writeln!(
1080 body,
1081 "# TYPE reddb_metrics_remote_write_samples_accepted_total counter"
1082 );
1083 let _ = writeln!(
1084 body,
1085 "reddb_metrics_remote_write_samples_accepted_total {}",
1086 metrics_ingest.samples_accepted
1087 );
1088 let _ = writeln!(
1089 body,
1090 "# HELP reddb_metrics_remote_write_series_accepted_total Metrics remote-write series accepted since process start."
1091 );
1092 let _ = writeln!(
1093 body,
1094 "# TYPE reddb_metrics_remote_write_series_accepted_total counter"
1095 );
1096 let _ = writeln!(
1097 body,
1098 "reddb_metrics_remote_write_series_accepted_total {}",
1099 metrics_ingest.series_accepted
1100 );
1101 let _ = writeln!(
1102 body,
1103 "# HELP reddb_metrics_remote_write_samples_rejected_total Metrics remote-write samples rejected since process start."
1104 );
1105 let _ = writeln!(
1106 body,
1107 "# TYPE reddb_metrics_remote_write_samples_rejected_total counter"
1108 );
1109 let _ = writeln!(
1110 body,
1111 "reddb_metrics_remote_write_samples_rejected_total {}",
1112 metrics_ingest.samples_rejected
1113 );
1114 let _ = writeln!(
1115 body,
1116 "# HELP reddb_metrics_remote_write_series_rejected_total Metrics remote-write series rejected since process start."
1117 );
1118 let _ = writeln!(
1119 body,
1120 "# TYPE reddb_metrics_remote_write_series_rejected_total counter"
1121 );
1122 let _ = writeln!(
1123 body,
1124 "reddb_metrics_remote_write_series_rejected_total {}",
1125 metrics_ingest.series_rejected
1126 );
1127 let _ = writeln!(
1128 body,
1129 "# HELP reddb_metrics_remote_write_series_rejected_by_reason_total Metrics remote-write series rejected since process start by reason."
1130 );
1131 let _ = writeln!(
1132 body,
1133 "# TYPE reddb_metrics_remote_write_series_rejected_by_reason_total counter"
1134 );
1135 let _ = writeln!(
1136 body,
1137 "reddb_metrics_remote_write_series_rejected_by_reason_total{{reason=\"cardinality_budget\"}} {}",
1138 metrics_ingest.series_rejected_cardinality_budget
1139 );
1140 let _ = writeln!(
1141 body,
1142 "# HELP reddb_metrics_tenant_activity_total Metrics adapter requests by tenant, namespace, and operation since process start."
1143 );
1144 let _ = writeln!(body, "# TYPE reddb_metrics_tenant_activity_total counter");
1145 for activity in self.runtime.metrics_tenant_activity_snapshot() {
1146 let _ = writeln!(
1147 body,
1148 "reddb_metrics_tenant_activity_total{{tenant=\"{}\",namespace=\"{}\",operation=\"{}\"}} {}",
1149 sanitize_label(&activity.tenant),
1150 sanitize_label(&activity.namespace),
1151 sanitize_label(&activity.operation),
1152 activity.count
1153 );
1154 }
1155
1156 let replicas = self.runtime.primary_replica_snapshots();
1161 let _ = writeln!(
1162 body,
1163 "# HELP reddb_replica_count Currently registered replicas."
1164 );
1165 let _ = writeln!(body, "# TYPE reddb_replica_count gauge");
1166 let _ = writeln!(body, "reddb_replica_count {}", replicas.len());
1167 if !replicas.is_empty() {
1168 let replica_lag_budget_secs = std::env::var("RED_SLO_REPLICA_LAG_BUDGET_SECONDS")
1169 .ok()
1170 .and_then(|value| value.parse::<f64>().ok())
1171 .filter(|value| value.is_finite() && *value >= 0.0)
1172 .unwrap_or(60.0);
1173 let _ = writeln!(
1174 body,
1175 "# HELP reddb_replica_ack_lsn Most recent LSN acked by each replica."
1176 );
1177 let _ = writeln!(body, "# TYPE reddb_replica_ack_lsn gauge");
1178 for r in &replicas {
1179 let _ = writeln!(
1180 body,
1181 "reddb_replica_ack_lsn{{replica_id=\"{}\"}} {}",
1182 sanitize_label(&r.id),
1183 r.last_acked_lsn
1184 );
1185 }
1186 let _ = writeln!(
1187 body,
1188 "# HELP reddb_replica_lag_records Distance from primary current LSN to replica acked LSN."
1189 );
1190 let _ = writeln!(body, "# TYPE reddb_replica_lag_records gauge");
1191 for r in &replicas {
1192 let _ = writeln!(
1193 body,
1194 "reddb_replica_lag_records{{replica_id=\"{}\"}} {}",
1195 sanitize_label(&r.id),
1196 current_lsn.saturating_sub(r.last_acked_lsn)
1197 );
1198 }
1199 let _ = writeln!(
1200 body,
1201 "# HELP reddb_replica_lag_seconds Wall-clock seconds since the replica was last seen."
1202 );
1203 let _ = writeln!(body, "# TYPE reddb_replica_lag_seconds gauge");
1204 let _ = writeln!(
1205 body,
1206 "# HELP reddb_slo_lag_budget_remaining_seconds Remaining per-replica lag budget; negative means SLO breach."
1207 );
1208 let _ = writeln!(body, "# TYPE reddb_slo_lag_budget_remaining_seconds gauge");
1209 for r in &replicas {
1210 let lag_ms = (now_ms as u128).saturating_sub(r.last_seen_at_unix_ms);
1211 let lag_secs = (lag_ms as f64) / 1000.0;
1212 let _ = writeln!(
1213 body,
1214 "reddb_replica_lag_seconds{{replica_id=\"{}\"}} {}",
1215 sanitize_label(&r.id),
1216 lag_secs
1217 );
1218 let _ = writeln!(
1219 body,
1220 "reddb_slo_lag_budget_remaining_seconds{{replica_id=\"{}\"}} {}",
1221 sanitize_label(&r.id),
1222 replica_lag_budget_secs - lag_secs
1223 );
1224 }
1225 }
1226
1227 let _ = writeln!(
1233 body,
1234 "# HELP reddb_replica_apply_errors_total Replica WAL apply errors since process start, by kind."
1235 );
1236 let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
1237 for (kind, count) in self.runtime.replica_apply_error_counts() {
1238 let _ = writeln!(
1239 body,
1240 "reddb_replica_apply_errors_total{{kind=\"{}\"}} {}",
1241 kind.label(),
1242 count
1243 );
1244 }
1245 if let Some(health) = self.runtime.replica_apply_health() {
1246 let _ = writeln!(
1247 body,
1248 "# HELP reddb_replica_apply_health Replica apply state (label, value=1)."
1249 );
1250 let _ = writeln!(body, "# TYPE reddb_replica_apply_health gauge");
1251 let _ = writeln!(
1252 body,
1253 "reddb_replica_apply_health{{state=\"{}\"}} 1",
1254 sanitize_label(&health)
1255 );
1256 }
1257
1258 self.runtime.quota_bucket().evict_idle();
1263 let rejections = self.runtime.quota_bucket().rejection_snapshot();
1264 if !rejections.is_empty() {
1265 let _ = writeln!(
1266 body,
1267 "# HELP reddb_quota_rejected_total Requests rejected by per-caller QPS quota."
1268 );
1269 let _ = writeln!(body, "# TYPE reddb_quota_rejected_total counter");
1270 for (principal, count) in &rejections {
1271 let _ = writeln!(
1272 body,
1273 "reddb_quota_rejected_total{{principal=\"{}\"}} {}",
1274 sanitize_label(principal),
1275 count
1276 );
1277 }
1278 }
1279
1280 let (reached, timed_out, not_required, last_micros) =
1285 self.runtime.commit_waiter_metrics_snapshot();
1286 let _ = writeln!(
1287 body,
1288 "# HELP reddb_commit_wait_total Commit-wait outcomes by kind."
1289 );
1290 let _ = writeln!(body, "# TYPE reddb_commit_wait_total counter");
1291 let _ = writeln!(
1292 body,
1293 "reddb_commit_wait_total{{outcome=\"reached\"}} {}",
1294 reached
1295 );
1296 let _ = writeln!(
1297 body,
1298 "reddb_commit_wait_total{{outcome=\"timed_out\"}} {}",
1299 timed_out
1300 );
1301 let _ = writeln!(
1302 body,
1303 "reddb_commit_wait_total{{outcome=\"not_required\"}} {}",
1304 not_required
1305 );
1306 let _ = writeln!(
1307 body,
1308 "# HELP reddb_commit_wait_last_seconds Wall-clock seconds of the most recent commit wait."
1309 );
1310 let _ = writeln!(body, "# TYPE reddb_commit_wait_last_seconds gauge");
1311 let _ = writeln!(
1312 body,
1313 "reddb_commit_wait_last_seconds {}",
1314 (last_micros as f64) / 1_000_000.0
1315 );
1316
1317 let policy = self.runtime.commit_policy();
1322 let _ = writeln!(
1323 body,
1324 "# HELP reddb_primary_commit_policy Active commit policy on the primary."
1325 );
1326 let _ = writeln!(body, "# TYPE reddb_primary_commit_policy gauge");
1327 let _ = writeln!(
1328 body,
1329 "reddb_primary_commit_policy{{policy=\"{}\"}} 1",
1330 policy.label()
1331 );
1332
1333 let blob_ns = "runtime.result_cache";
1338 let _ = writeln!(
1339 body,
1340 "# HELP reddb_cache_blob_get_total Blob Cache get outcomes by namespace."
1341 );
1342 let _ = writeln!(body, "# TYPE reddb_cache_blob_get_total counter");
1343 let _ = writeln!(
1344 body,
1345 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l1\"}} {}",
1346 blob_ns,
1347 result_blob_stats.hits()
1348 );
1349 let _ = writeln!(
1350 body,
1351 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l2\"}} 0",
1352 blob_ns
1353 );
1354 let _ = writeln!(
1355 body,
1356 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"miss\"}} {}",
1357 blob_ns,
1358 result_blob_stats.misses()
1359 );
1360 let _ = writeln!(
1361 body,
1362 "# HELP reddb_cache_blob_put_total Blob Cache put outcomes by namespace."
1363 );
1364 let _ = writeln!(body, "# TYPE reddb_cache_blob_put_total counter");
1365 let _ = writeln!(
1366 body,
1367 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"ok\"}} {}",
1368 blob_ns,
1369 result_blob_stats.insertions()
1370 );
1371 let _ = writeln!(
1372 body,
1373 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"version_mismatch\"}} {}",
1374 blob_ns,
1375 result_blob_stats.version_mismatches()
1376 );
1377 let _ = writeln!(
1378 body,
1379 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"too_large\"}} 0",
1380 blob_ns
1381 );
1382 let _ = writeln!(
1383 body,
1384 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"metadata_too_large\"}} 0",
1385 blob_ns
1386 );
1387 let _ = writeln!(
1388 body,
1389 "# HELP reddb_cache_blob_invalidate_total Blob Cache invalidations by namespace and kind."
1390 );
1391 let _ = writeln!(body, "# TYPE reddb_cache_blob_invalidate_total counter");
1392 for (kind, count) in [
1393 ("key", 0),
1394 ("prefix", 0),
1395 ("tag", 0),
1396 ("dependency", result_blob_stats.invalidations()),
1397 ("namespace", result_blob_stats.namespace_flushes()),
1398 ] {
1399 let _ = writeln!(
1400 body,
1401 "reddb_cache_blob_invalidate_total{{namespace=\"{}\",kind=\"{}\"}} {}",
1402 blob_ns, kind, count
1403 );
1404 }
1405 let _ = writeln!(
1406 body,
1407 "# HELP reddb_cache_blob_evict_total Blob Cache evictions by namespace and reason."
1408 );
1409 let _ = writeln!(body, "# TYPE reddb_cache_blob_evict_total counter");
1410 for (reason, count) in [
1411 ("capacity", result_blob_stats.evictions()),
1412 ("expiry", result_blob_stats.expirations()),
1413 ("policy", 0),
1414 ] {
1415 let _ = writeln!(
1416 body,
1417 "reddb_cache_blob_evict_total{{namespace=\"{}\",reason=\"{}\"}} {}",
1418 blob_ns, reason, count
1419 );
1420 }
1421 let _ = writeln!(
1422 body,
1423 "# HELP reddb_cache_blob_l1_bytes_in_use L1 bytes currently used by Blob Cache namespace."
1424 );
1425 let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_bytes_in_use gauge");
1426 let _ = writeln!(
1427 body,
1428 "reddb_cache_blob_l1_bytes_in_use{{namespace=\"{}\"}} {}",
1429 blob_ns,
1430 result_blob_stats.bytes_in_use()
1431 );
1432 let _ = writeln!(
1433 body,
1434 "# HELP reddb_cache_blob_l1_entries L1 entries currently held by Blob Cache namespace."
1435 );
1436 let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_entries gauge");
1437 let _ = writeln!(
1438 body,
1439 "reddb_cache_blob_l1_entries{{namespace=\"{}\"}} {}",
1440 blob_ns,
1441 result_blob_stats.entries()
1442 );
1443 let _ = writeln!(
1444 body,
1445 "# HELP reddb_cache_blob_l2_bytes_in_use L2 bytes currently used by Blob Cache namespace."
1446 );
1447 let _ = writeln!(body, "# TYPE reddb_cache_blob_l2_bytes_in_use gauge");
1448 let _ = writeln!(
1449 body,
1450 "reddb_cache_blob_l2_bytes_in_use{{namespace=\"{}\"}} {}",
1451 blob_ns,
1452 result_blob_stats.l2_bytes_in_use()
1453 );
1454 let _ = writeln!(
1455 body,
1456 "# HELP reddb_cache_blob_l2_full_rejections_total Blob Cache puts rejected because L2 is full."
1457 );
1458 let _ = writeln!(
1459 body,
1460 "# TYPE reddb_cache_blob_l2_full_rejections_total counter"
1461 );
1462 let _ = writeln!(
1463 body,
1464 "reddb_cache_blob_l2_full_rejections_total{{namespace=\"{}\"}} {}",
1465 blob_ns,
1466 result_blob_stats.l2_full_rejections()
1467 );
1468 let _ = writeln!(
1469 body,
1470 "# HELP reddb_cache_blob_version_mismatch_total Blob Cache CAS version mismatches by namespace."
1471 );
1472 let _ = writeln!(
1473 body,
1474 "# TYPE reddb_cache_blob_version_mismatch_total counter"
1475 );
1476 let _ = writeln!(
1477 body,
1478 "reddb_cache_blob_version_mismatch_total{{namespace=\"{}\"}} {}",
1479 blob_ns,
1480 result_blob_stats.version_mismatches()
1481 );
1482
1483 let _ = writeln!(
1484 body,
1485 "# HELP reddb_kv_ops_total Normal-KV operations since process start."
1486 );
1487 let _ = writeln!(body, "# TYPE reddb_kv_ops_total counter");
1488 for (verb, count) in [
1489 ("put", kv_stats.puts),
1490 ("get", kv_stats.gets),
1491 ("delete", kv_stats.deletes),
1492 ("incr", kv_stats.incrs),
1493 ] {
1494 let _ = writeln!(body, "reddb_kv_ops_total{{verb=\"{}\"}} {}", verb, count);
1495 }
1496 let _ = writeln!(
1497 body,
1498 "# HELP reddb_kv_cas_total Normal-KV CAS outcomes since process start."
1499 );
1500 let _ = writeln!(body, "# TYPE reddb_kv_cas_total counter");
1501 let _ = writeln!(
1502 body,
1503 "reddb_kv_cas_total{{outcome=\"success\"}} {}",
1504 kv_stats.cas_success
1505 );
1506 let _ = writeln!(
1507 body,
1508 "reddb_kv_cas_total{{outcome=\"conflict\"}} {}",
1509 kv_stats.cas_conflict
1510 );
1511 let _ = writeln!(
1512 body,
1513 "# HELP reddb_kv_watch_streams_active Active normal-KV WATCH streams."
1514 );
1515 let _ = writeln!(body, "# TYPE reddb_kv_watch_streams_active gauge");
1516 let _ = writeln!(
1517 body,
1518 "reddb_kv_watch_streams_active {}",
1519 kv_stats.watch_streams_active
1520 );
1521 let _ = writeln!(
1522 body,
1523 "# HELP reddb_kv_watch_events_emitted_total Normal-KV WATCH events emitted since process start."
1524 );
1525 let _ = writeln!(body, "# TYPE reddb_kv_watch_events_emitted_total counter");
1526 let _ = writeln!(
1527 body,
1528 "reddb_kv_watch_events_emitted_total {}",
1529 kv_stats.watch_events_emitted
1530 );
1531 let _ = writeln!(
1532 body,
1533 "# HELP reddb_kv_watch_drops_total Normal-KV WATCH events dropped by bounded subscriber buffers."
1534 );
1535 let _ = writeln!(body, "# TYPE reddb_kv_watch_drops_total counter");
1536 let _ = writeln!(body, "reddb_kv_watch_drops_total {}", kv_stats.watch_drops);
1537
1538 let _ = writeln!(
1539 body,
1540 "# HELP reddb_db_size_bytes On-disk size of the primary database file."
1541 );
1542 let _ = writeln!(body, "# TYPE reddb_db_size_bytes gauge");
1543 let _ = writeln!(body, "reddb_db_size_bytes {}", db_size_bytes);
1544
1545 if let Some(secs) = cold_start_secs {
1546 let _ = writeln!(
1547 body,
1548 "# HELP reddb_cold_start_duration_seconds Seconds from process start to /health/ready 200."
1549 );
1550 let _ = writeln!(body, "# TYPE reddb_cold_start_duration_seconds gauge");
1551 let _ = writeln!(body, "reddb_cold_start_duration_seconds {}", secs);
1552 }
1553
1554 let phases = lifecycle.cold_start_phases().durations_ms();
1560 if !phases.is_empty() {
1561 let _ = writeln!(
1562 body,
1563 "# HELP reddb_cold_start_phase_seconds Per-phase cold-start duration."
1564 );
1565 let _ = writeln!(body, "# TYPE reddb_cold_start_phase_seconds gauge");
1566 for (name, dur_ms) in phases {
1567 let _ = writeln!(
1568 body,
1569 "reddb_cold_start_phase_seconds{{phase=\"{}\"}} {}",
1570 name,
1571 (dur_ms as f64) / 1000.0
1572 );
1573 }
1574 }
1575
1576 let limits = self.runtime.resource_limits();
1581 if let Some(v) = limits.max_db_size_bytes {
1582 let _ = writeln!(
1583 body,
1584 "# HELP reddb_limit_db_size_bytes Operator-pinned cap on the primary DB file size."
1585 );
1586 let _ = writeln!(body, "# TYPE reddb_limit_db_size_bytes gauge");
1587 let _ = writeln!(body, "reddb_limit_db_size_bytes {}", v);
1588 }
1589 if let Some(v) = limits.max_connections {
1590 let _ = writeln!(body, "# TYPE reddb_limit_connections gauge");
1591 let _ = writeln!(body, "reddb_limit_connections {}", v);
1592 }
1593 if let Some(v) = limits.max_qps {
1594 let _ = writeln!(body, "# TYPE reddb_limit_qps gauge");
1595 let _ = writeln!(body, "reddb_limit_qps {}", v);
1596 }
1597 if let Some(v) = limits.max_batch_size {
1598 let _ = writeln!(body, "# TYPE reddb_limit_batch_size gauge");
1599 let _ = writeln!(body, "reddb_limit_batch_size {}", v);
1600 }
1601 if let Some(v) = limits.max_memory_bytes {
1602 let _ = writeln!(body, "# TYPE reddb_limit_memory_bytes gauge");
1603 let _ = writeln!(body, "reddb_limit_memory_bytes {}", v);
1604 }
1605
1606 {
1613 let queue_telemetry = self.runtime.queue_telemetry_snapshot();
1614 let _ = writeln!(
1615 body,
1616 "# HELP queue_delivered_total Messages handed to a consumer (per queue/group/mode)."
1617 );
1618 let _ = writeln!(body, "# TYPE queue_delivered_total counter");
1619 for ((queue, group, mode), n) in &queue_telemetry.delivered {
1620 let _ = writeln!(
1621 body,
1622 "queue_delivered_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
1623 sanitize_label(queue),
1624 sanitize_label(group),
1625 sanitize_label(mode),
1626 n
1627 );
1628 }
1629 let _ = writeln!(
1630 body,
1631 "# HELP queue_acked_total Messages acknowledged (per queue/group/mode)."
1632 );
1633 let _ = writeln!(body, "# TYPE queue_acked_total counter");
1634 for ((queue, group, mode), n) in &queue_telemetry.acked {
1635 let _ = writeln!(
1636 body,
1637 "queue_acked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\"}} {}",
1638 sanitize_label(queue),
1639 sanitize_label(group),
1640 sanitize_label(mode),
1641 n
1642 );
1643 }
1644 let _ = writeln!(
1645 body,
1646 "# HELP queue_nacked_total Messages negatively-acknowledged (per queue/group/mode/outcome)."
1647 );
1648 let _ = writeln!(body, "# TYPE queue_nacked_total counter");
1649 for ((queue, group, mode, outcome), n) in &queue_telemetry.nacked {
1650 let _ = writeln!(
1651 body,
1652 "queue_nacked_total{{queue=\"{}\",group=\"{}\",mode=\"{}\",outcome=\"{}\"}} {}",
1653 sanitize_label(queue),
1654 sanitize_label(group),
1655 sanitize_label(mode),
1656 outcome,
1657 n
1658 );
1659 }
1660 let pending = self.runtime.queue_pending_counts();
1661 let _ = writeln!(
1662 body,
1663 "# HELP queue_pending_gauge In-flight (delivered, not yet acked) messages per queue/group."
1664 );
1665 let _ = writeln!(body, "# TYPE queue_pending_gauge gauge");
1666 for ((queue, group), n) in &pending {
1667 let _ = writeln!(
1668 body,
1669 "queue_pending_gauge{{queue=\"{}\",group=\"{}\"}} {}",
1670 sanitize_label(queue),
1671 sanitize_label(group),
1672 n
1673 );
1674 }
1675 }
1676
1677 {
1679 use crate::runtime::impl_queue::{
1680 EVENTS_DLQ_TOTAL, EVENTS_DRAIN_RETRIES_TOTAL, EVENTS_ENQUEUED_TOTAL,
1681 };
1682 let enqueued = EVENTS_ENQUEUED_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1683 let retries = EVENTS_DRAIN_RETRIES_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1684 let dlq_total = EVENTS_DLQ_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1685
1686 let _ = writeln!(
1687 body,
1688 "# HELP reddb_events_enqueued_total Total events successfully pushed to target queues."
1689 );
1690 let _ = writeln!(body, "# TYPE reddb_events_enqueued_total counter");
1691 let _ = writeln!(body, "reddb_events_enqueued_total {enqueued}");
1692
1693 let _ = writeln!(
1694 body,
1695 "# HELP reddb_events_drain_retries_total Total event push failures that triggered DLQ routing."
1696 );
1697 let _ = writeln!(body, "# TYPE reddb_events_drain_retries_total counter");
1698 let _ = writeln!(
1699 body,
1700 "reddb_events_drain_retries_total{{reason=\"queue_full\"}} {retries}"
1701 );
1702
1703 let _ = writeln!(
1704 body,
1705 "# HELP reddb_events_dlq_total Total events routed to dead-letter queues."
1706 );
1707 let _ = writeln!(body, "# TYPE reddb_events_dlq_total counter");
1708 let _ = writeln!(body, "reddb_events_dlq_total {dlq_total}");
1709 }
1710
1711 crate::runtime::ai::metrics::render_ai_metrics(&mut body);
1713
1714 self.http_metrics().render(&mut body, self.http_limiter());
1720
1721 HttpResponse {
1722 status: 200,
1723 content_type: "text/plain; version=0.0.4",
1724 body: body.into_bytes(),
1725 extra_headers: Vec::new(),
1726 }
1727 }
1728
1729 pub(crate) fn handle_admin_status(&self) -> HttpResponse {
1734 let lifecycle = self.runtime.lifecycle();
1735 let phase = lifecycle.phase();
1736 let now_ms = std::time::SystemTime::now()
1737 .duration_since(std::time::UNIX_EPOCH)
1738 .map(|d| d.as_millis() as u64)
1739 .unwrap_or(0);
1740 let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
1741 let read_only = self.runtime.write_gate().is_read_only();
1742 let role = match self.runtime.write_gate().role() {
1743 crate::replication::ReplicationRole::Standalone => "standalone",
1744 crate::replication::ReplicationRole::Primary => "primary",
1745 crate::replication::ReplicationRole::Replica { .. } => "replica",
1746 };
1747 let db = self.runtime.db();
1748 let db_size_bytes = db
1749 .path()
1750 .and_then(|p| std::fs::metadata(p).ok())
1751 .map(|m| m.len())
1752 .unwrap_or(0);
1753 let backend_kind = db
1754 .options()
1755 .remote_backend
1756 .as_ref()
1757 .map(|b| b.name().to_string());
1758
1759 let mut object = Map::new();
1760 object.insert(
1761 "version".to_string(),
1762 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
1763 );
1764 object.insert(
1765 "phase".to_string(),
1766 JsonValue::String(phase.as_str().to_string()),
1767 );
1768 object.insert(
1769 "uptime_secs".to_string(),
1770 JsonValue::Number((uptime_secs * 1000.0).round() / 1000.0),
1771 );
1772 object.insert(
1773 "started_at_unix_ms".to_string(),
1774 JsonValue::Number(lifecycle.started_at_ms() as f64),
1775 );
1776 if let Some(ready_at) = lifecycle.ready_at_ms() {
1777 object.insert(
1778 "ready_at_unix_ms".to_string(),
1779 JsonValue::Number(ready_at as f64),
1780 );
1781 }
1782 object.insert(
1783 "db_size_bytes".to_string(),
1784 JsonValue::Number(db_size_bytes as f64),
1785 );
1786 object.insert("read_only".to_string(), JsonValue::Bool(read_only));
1787 object.insert(
1788 "replication_role".to_string(),
1789 JsonValue::String(role.to_string()),
1790 );
1791 object.insert(
1792 "writer_lease".to_string(),
1793 JsonValue::String(self.runtime.write_gate().lease_state().label().to_string()),
1794 );
1795
1796 let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
1800 let mut enc_obj = Map::new();
1801 enc_obj.insert(
1802 "state".to_string(),
1803 JsonValue::String(enc_state.to_string()),
1804 );
1805 if let Some(err) = enc_error {
1806 enc_obj.insert("error".to_string(), JsonValue::String(err));
1807 }
1808 object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc_obj));
1809
1810 let backup = self.runtime.backup_status();
1814 let mut backup_obj = Map::new();
1815 if let Some(last) = backup.last_backup.as_ref() {
1816 backup_obj.insert(
1817 "last_success_unix_ms".to_string(),
1818 JsonValue::Number(last.timestamp as f64),
1819 );
1820 backup_obj.insert(
1821 "last_duration_ms".to_string(),
1822 JsonValue::Number(last.duration_ms as f64),
1823 );
1824 backup_obj.insert(
1825 "age_seconds".to_string(),
1826 JsonValue::Number(((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0),
1827 );
1828 }
1829 backup_obj.insert(
1830 "total_successes".to_string(),
1831 JsonValue::Number(backup.total_backups as f64),
1832 );
1833 backup_obj.insert(
1834 "total_failures".to_string(),
1835 JsonValue::Number(backup.total_failures as f64),
1836 );
1837 backup_obj.insert(
1838 "interval_secs".to_string(),
1839 JsonValue::Number(backup.interval_secs as f64),
1840 );
1841 object.insert("backup".to_string(), JsonValue::Object(backup_obj));
1842
1843 let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1845 let mut wal_obj = Map::new();
1846 wal_obj.insert(
1847 "current_lsn".to_string(),
1848 JsonValue::Number(current_lsn as f64),
1849 );
1850 wal_obj.insert(
1851 "last_archived_lsn".to_string(),
1852 JsonValue::Number(last_archived_lsn as f64),
1853 );
1854 wal_obj.insert(
1855 "archive_lag_records".to_string(),
1856 JsonValue::Number(current_lsn.saturating_sub(last_archived_lsn) as f64),
1857 );
1858 object.insert("wal".to_string(), JsonValue::Object(wal_obj));
1859
1860 let mut replica_obj = Map::new();
1865 if let Some(health) = self.runtime.replica_apply_health() {
1866 replica_obj.insert("apply_health".to_string(), JsonValue::String(health));
1867 }
1868 let mut errors_obj = Map::new();
1869 for (kind, count) in self.runtime.replica_apply_error_counts() {
1870 errors_obj.insert(kind.label().to_string(), JsonValue::Number(count as f64));
1871 }
1872 replica_obj.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
1873 let snaps = self.runtime.primary_replica_snapshots();
1875 if !snaps.is_empty() {
1876 let arr: Vec<JsonValue> = snaps
1877 .iter()
1878 .map(|r| {
1879 let mut o = Map::new();
1880 o.insert("id".to_string(), JsonValue::String(r.id.clone()));
1881 o.insert(
1882 "last_acked_lsn".to_string(),
1883 JsonValue::Number(r.last_acked_lsn as f64),
1884 );
1885 o.insert(
1886 "last_sent_lsn".to_string(),
1887 JsonValue::Number(r.last_sent_lsn as f64),
1888 );
1889 o.insert(
1890 "last_durable_lsn".to_string(),
1891 JsonValue::Number(r.last_durable_lsn as f64),
1892 );
1893 o.insert(
1894 "last_seen_at_unix_ms".to_string(),
1895 JsonValue::Number(r.last_seen_at_unix_ms as f64),
1896 );
1897 o.insert(
1898 "lag_records".to_string(),
1899 JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
1900 );
1901 if let Some(region) = &r.region {
1902 o.insert("region".to_string(), JsonValue::String(region.clone()));
1903 }
1904 JsonValue::Object(o)
1905 })
1906 .collect();
1907 replica_obj.insert("primary_view".to_string(), JsonValue::Array(arr));
1908 }
1909 replica_obj.insert(
1910 "commit_policy".to_string(),
1911 JsonValue::String(self.runtime.commit_policy().label().to_string()),
1912 );
1913 let durable = self.runtime.commit_waiter_snapshot();
1916 if !durable.is_empty() {
1917 let arr: Vec<JsonValue> = durable
1918 .into_iter()
1919 .map(|(id, lsn)| {
1920 let mut o = Map::new();
1921 o.insert("replica_id".to_string(), JsonValue::String(id));
1922 o.insert("durable_lsn".to_string(), JsonValue::Number(lsn as f64));
1923 JsonValue::Object(o)
1924 })
1925 .collect();
1926 replica_obj.insert("durable_view".to_string(), JsonValue::Array(arr));
1927 }
1928 object.insert("replica".to_string(), JsonValue::Object(replica_obj));
1929 if let Some(backend) = backend_kind {
1930 object.insert("remote_backend".to_string(), JsonValue::String(backend));
1931 }
1932 let limits = self.runtime.resource_limits();
1935 let mut limits_obj = Map::new();
1936 if let Some(v) = limits.max_db_size_bytes {
1937 limits_obj.insert("max_db_size_bytes".to_string(), JsonValue::Number(v as f64));
1938 }
1939 if let Some(v) = limits.max_connections {
1940 limits_obj.insert("max_connections".to_string(), JsonValue::Number(v as f64));
1941 }
1942 if let Some(v) = limits.max_qps {
1943 limits_obj.insert("max_qps".to_string(), JsonValue::Number(v as f64));
1944 }
1945 if let Some(v) = limits.max_batch_size {
1946 limits_obj.insert("max_batch_size".to_string(), JsonValue::Number(v as f64));
1947 }
1948 if let Some(v) = limits.max_memory_bytes {
1949 limits_obj.insert("max_memory_bytes".to_string(), JsonValue::Number(v as f64));
1950 }
1951 if let Some(d) = limits.max_query_duration {
1952 limits_obj.insert(
1953 "max_query_duration_ms".to_string(),
1954 JsonValue::Number(d.as_millis() as f64),
1955 );
1956 }
1957 if let Some(v) = limits.max_result_bytes {
1958 limits_obj.insert("max_result_bytes".to_string(), JsonValue::Number(v as f64));
1959 }
1960 object.insert("limits".to_string(), JsonValue::Object(limits_obj));
1961
1962 if let Some(report) = lifecycle.shutdown_report() {
1963 let mut shutdown_obj = Map::new();
1964 shutdown_obj.insert(
1965 "duration_ms".to_string(),
1966 JsonValue::Number(report.duration_ms as f64),
1967 );
1968 shutdown_obj.insert(
1969 "flushed_wal".to_string(),
1970 JsonValue::Bool(report.flushed_wal),
1971 );
1972 shutdown_obj.insert(
1973 "backup_uploaded".to_string(),
1974 JsonValue::Bool(report.backup_uploaded),
1975 );
1976 object.insert("shutdown".to_string(), JsonValue::Object(shutdown_obj));
1977 }
1978 json_response(200, JsonValue::Object(object))
1979 }
1980
1981 pub(crate) fn handle_admin_failover_promote(&self, body: Vec<u8>) -> HttpResponse {
2010 if !matches!(
2012 self.runtime.write_gate().role(),
2013 crate::replication::ReplicationRole::Replica { .. }
2014 ) {
2015 return json_error(
2016 409,
2017 "promotion only allowed on a replica (current role is not Replica)",
2018 );
2019 }
2020
2021 let Some(backend) = self.runtime.db().options().remote_backend_atomic.clone() else {
2023 return json_error(
2024 412,
2025 "promotion requires a CAS-capable remote backend (use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true)",
2026 );
2027 };
2028
2029 let health = self.runtime.replica_apply_health().unwrap_or_default();
2032 if matches!(
2033 health.as_str(),
2034 "stalled_gap" | "divergence" | "apply_error"
2035 ) {
2036 return json_error(
2037 409,
2038 format!(
2039 "promotion refused — replica apply state is `{health}`; resolve before promoting"
2040 ),
2041 );
2042 }
2043
2044 let (holder_id, ttl_ms) = if body.is_empty() {
2046 (default_holder_id(), 60_000u64)
2047 } else {
2048 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
2049 Ok(v) => {
2050 let holder = v
2051 .get("holder_id")
2052 .and_then(|n| n.as_str())
2053 .map(|s| s.to_string())
2054 .unwrap_or_else(default_holder_id);
2055 let ttl = v
2056 .get("ttl_ms")
2057 .and_then(|n| n.as_u64())
2058 .filter(|t| *t > 0)
2059 .unwrap_or(60_000);
2060 (holder, ttl)
2061 }
2062 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
2063 }
2064 };
2065
2066 let database_key = self
2067 .runtime
2068 .db()
2069 .options()
2070 .remote_key
2071 .clone()
2072 .unwrap_or_else(|| "main".to_string());
2073 let store = crate::replication::LeaseStore::new(backend);
2074
2075 match crate::runtime::lease_lifecycle::admin_promote_lease(
2076 &store,
2077 self.runtime.audit_log(),
2078 &database_key,
2079 &holder_id,
2080 ttl_ms,
2081 ) {
2082 Ok(lease) => {
2083 let mut object = Map::new();
2084 object.insert("ok".to_string(), JsonValue::Bool(true));
2085 object.insert("holder_id".to_string(), JsonValue::String(lease.holder_id));
2086 object.insert(
2087 "generation".to_string(),
2088 JsonValue::Number(lease.generation as f64),
2089 );
2090 object.insert(
2091 "acquired_at_ms".to_string(),
2092 JsonValue::Number(lease.acquired_at_ms as f64),
2093 );
2094 object.insert(
2095 "expires_at_ms".to_string(),
2096 JsonValue::Number(lease.expires_at_ms as f64),
2097 );
2098 object.insert(
2099 "next_step".to_string(),
2100 JsonValue::String(
2101 "restart with RED_REPLICATION_MODE=primary to start accepting writes"
2102 .to_string(),
2103 ),
2104 );
2105 json_response(200, JsonValue::Object(object))
2106 }
2107 Err(err) => json_error(409, format!("promotion refused: {err}")),
2108 }
2109 }
2110
2111 pub(crate) fn handle_admin_audit_query(
2131 &self,
2132 query: &std::collections::BTreeMap<String, String>,
2133 ) -> HttpResponse {
2134 use crate::runtime::audit_log::Outcome;
2135 use crate::runtime::audit_query::{
2136 events_to_json_array, parse_time_arg, run_query, AuditQuery,
2137 };
2138
2139 let mut q = AuditQuery::new();
2140 if let Some(s) = query.get("since") {
2141 q.since_ms = parse_time_arg(s);
2142 if q.since_ms.is_none() {
2143 return json_error(400, format!("invalid 'since' value: {s}"));
2144 }
2145 }
2146 if let Some(u) = query.get("until") {
2147 q.until_ms = parse_time_arg(u);
2148 if q.until_ms.is_none() {
2149 return json_error(400, format!("invalid 'until' value: {u}"));
2150 }
2151 }
2152 if let Some(p) = query.get("principal") {
2153 if !p.is_empty() {
2154 q.principal = Some(p.clone());
2155 }
2156 }
2157 if let Some(t) = query.get("tenant") {
2158 if !t.is_empty() {
2159 q.tenant = Some(t.clone());
2160 }
2161 }
2162 if let Some(a) = query.get("action") {
2163 if !a.is_empty() {
2164 q.action_prefix = Some(a.clone());
2165 }
2166 }
2167 if let Some(o) = query.get("outcome") {
2168 if let Some(parsed) = Outcome::parse(o) {
2169 q.outcome = Some(parsed);
2170 } else {
2171 return json_error(
2172 400,
2173 format!("invalid 'outcome' value: {o} (expected success|denied|error)"),
2174 );
2175 }
2176 }
2177 if let Some(l) = query.get("limit") {
2178 match l.parse::<usize>() {
2179 Ok(n) if n > 0 => q.limit = n.min(1000),
2180 _ => return json_error(400, format!("invalid 'limit' value: {l}")),
2181 }
2182 } else {
2183 q.limit = 100;
2184 }
2185
2186 let format = query
2187 .get("format")
2188 .map(|s| s.to_ascii_lowercase())
2189 .unwrap_or_default();
2190
2191 let path = self.runtime.audit_log().path().to_path_buf();
2192 let events = run_query(&path, &q);
2193
2194 if format == "jsonl" || format == "ndjson" {
2195 let mut body = String::new();
2196 for ev in &events {
2197 body.push_str(&ev.to_json_line(None));
2198 body.push('\n');
2199 }
2200 return HttpResponse {
2201 status: 200,
2202 content_type: "application/x-ndjson",
2203 body: body.into_bytes(),
2204 extra_headers: Vec::new(),
2205 };
2206 }
2207
2208 json_response(200, events_to_json_array(&events))
2209 }
2210
2211 pub(crate) fn handle_admin_drain(&self) -> HttpResponse {
2212 self.runtime.lifecycle().mark_draining();
2213 self.runtime.audit_log().record(
2214 "admin/drain",
2215 "operator",
2216 "instance",
2217 "ok",
2218 JsonValue::Null,
2219 );
2220 let mut object = Map::new();
2221 object.insert("ok".to_string(), JsonValue::Bool(true));
2222 object.insert(
2223 "phase".to_string(),
2224 JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
2225 );
2226 json_response(200, JsonValue::Object(object))
2227 }
2228
2229 pub(crate) fn handle_health_live(&self) -> HttpResponse {
2233 let phase = self.runtime.lifecycle().phase();
2234 let alive = !matches!(phase, Phase::Stopped);
2235 let status = if alive { 200 } else { 503 };
2236 let mut object = Map::new();
2237 object.insert(
2238 "status".to_string(),
2239 JsonValue::String(if alive { "alive" } else { "stopped" }.to_string()),
2240 );
2241 object.insert(
2242 "phase".to_string(),
2243 JsonValue::String(phase.as_str().to_string()),
2244 );
2245 json_response(status, JsonValue::Object(object))
2246 }
2247
2248 pub(crate) fn handle_health_ready(&self) -> HttpResponse {
2251 self.health_ready_response("ready")
2252 }
2253
2254 pub(crate) fn handle_health_startup(&self) -> HttpResponse {
2258 self.health_ready_response("startup")
2259 }
2260
2261 fn health_ready_response(&self, probe: &str) -> HttpResponse {
2262 let lifecycle = self.runtime.lifecycle();
2263 let phase = lifecycle.phase();
2264 let now = std::time::SystemTime::now()
2265 .duration_since(std::time::UNIX_EPOCH)
2266 .map(|d| d.as_millis() as u64)
2267 .unwrap_or(0);
2268 let started_at = lifecycle.started_at_ms();
2269 let since_secs = (now.saturating_sub(started_at) as f64) / 1000.0;
2270 let mut object = Map::new();
2271 object.insert("probe".to_string(), JsonValue::String(probe.to_string()));
2272 object.insert(
2273 "transport_listeners".to_string(),
2274 self.transport_readiness_json(),
2275 );
2276 object.insert(
2277 "phase".to_string(),
2278 JsonValue::String(phase.as_str().to_string()),
2279 );
2280 object.insert(
2281 "since_secs".to_string(),
2282 JsonValue::Number((since_secs * 1000.0).round() / 1000.0),
2283 );
2284 if let Some(ready_at) = lifecycle.ready_at_ms() {
2285 object.insert(
2286 "ready_at_unix_ms".to_string(),
2287 JsonValue::Number(ready_at as f64),
2288 );
2289 }
2290
2291 if phase.accepts_queries() {
2292 object.insert("status".to_string(), JsonValue::String("ready".to_string()));
2293 json_response(200, JsonValue::Object(object))
2294 } else {
2295 object.insert(
2296 "status".to_string(),
2297 JsonValue::String(phase.as_str().to_string()),
2298 );
2299 if let Some(reason) = lifecycle.not_ready_reason() {
2300 object.insert("reason".to_string(), JsonValue::String(reason));
2301 } else {
2302 object.insert(
2303 "reason".to_string(),
2304 JsonValue::String(match phase {
2305 Phase::Starting => "starting".to_string(),
2306 Phase::ShuttingDown => "shutting_down".to_string(),
2307 Phase::Stopped => "stopped".to_string(),
2308 Phase::Draining => "draining".to_string(),
2309 Phase::Ready => "ready".to_string(),
2310 }),
2311 );
2312 }
2313 json_response(503, JsonValue::Object(object))
2314 }
2315 }
2316
2317 fn iam_audit(&self, action: &str, target: &str, outcome: &str) {
2322 self.runtime
2323 .audit_log()
2324 .record(action, "operator", target, outcome, JsonValue::Null);
2325 }
2326
2327 pub(crate) fn handle_iam_policy_put(&self, id: &str, body: Vec<u8>) -> HttpResponse {
2329 let Some(store) = self.auth_store.as_ref() else {
2330 return json_error(503, "auth store not configured");
2331 };
2332 let Ok(text) = std::str::from_utf8(&body) else {
2333 return json_error(400, "body must be utf-8 JSON");
2334 };
2335 let mut policy = match crate::auth::policies::Policy::from_json_str(text) {
2336 Ok(p) => p,
2337 Err(e) => return json_error(400, format!("policy parse: {e}")),
2338 };
2339 if policy.id != id {
2340 policy.id = id.to_string();
2341 }
2342 if let Err(e) = store.put_policy(policy) {
2343 return json_error(400, e.to_string());
2344 }
2345 self.runtime.invalidate_result_cache();
2346 self.iam_audit("iam/policy.put", id, "ok");
2347 let mut obj = Map::new();
2348 obj.insert("ok".to_string(), JsonValue::Bool(true));
2349 obj.insert("id".to_string(), JsonValue::String(id.to_string()));
2350 json_response(200, JsonValue::Object(obj))
2351 }
2352
2353 pub(crate) fn handle_iam_policy_get(&self, id: &str) -> HttpResponse {
2355 let Some(store) = self.auth_store.as_ref() else {
2356 return json_error(503, "auth store not configured");
2357 };
2358 let Some(p) = store.get_policy(id) else {
2359 return json_error(404, format!("policy `{id}` not found"));
2360 };
2361 let body = p.to_json_string();
2362 HttpResponse {
2363 status: 200,
2364 content_type: "application/json",
2365 body: body.into_bytes(),
2366 extra_headers: Vec::new(),
2367 }
2368 }
2369
2370 pub(crate) fn handle_iam_policy_list(&self) -> HttpResponse {
2372 let Some(store) = self.auth_store.as_ref() else {
2373 return json_error(503, "auth store not configured");
2374 };
2375 let pols = store.list_policies();
2376 let items: Vec<JsonValue> = pols
2377 .iter()
2378 .map(|p| {
2379 let mut obj = Map::new();
2380 obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2381 obj.insert("version".to_string(), JsonValue::Number(p.version as f64));
2382 obj.insert(
2383 "statements".to_string(),
2384 JsonValue::Number(p.statements.len() as f64),
2385 );
2386 obj.insert(
2387 "tenant".to_string(),
2388 p.tenant
2389 .as_deref()
2390 .map(|t| JsonValue::String(t.to_string()))
2391 .unwrap_or(JsonValue::Null),
2392 );
2393 JsonValue::Object(obj)
2394 })
2395 .collect();
2396 let mut envelope = Map::new();
2397 envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2398 envelope.insert("items".to_string(), JsonValue::Array(items));
2399 json_response(200, JsonValue::Object(envelope))
2400 }
2401
2402 pub(crate) fn handle_iam_policy_delete(&self, id: &str) -> HttpResponse {
2404 let Some(store) = self.auth_store.as_ref() else {
2405 return json_error(503, "auth store not configured");
2406 };
2407 match store.delete_policy(id) {
2408 Ok(()) => {
2409 self.runtime.invalidate_result_cache();
2410 self.iam_audit("iam/policy.drop", id, "ok");
2411 HttpResponse {
2412 status: 204,
2413 content_type: "application/json",
2414 body: Vec::new(),
2415 extra_headers: Vec::new(),
2416 }
2417 }
2418 Err(e) => json_error(404, e.to_string()),
2419 }
2420 }
2421
2422 pub(crate) fn handle_iam_attach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2425 let Some(store) = self.auth_store.as_ref() else {
2426 return json_error(503, "auth store not configured");
2427 };
2428 let uid = decode_user_arg(user);
2429 match store.attach_policy(
2430 crate::auth::store::PrincipalRef::User(uid.clone()),
2431 policy_id,
2432 ) {
2433 Ok(()) => {
2434 self.runtime.invalidate_result_cache();
2435 self.iam_audit(
2436 "iam/policy.attach",
2437 &format!("user:{uid}::{policy_id}"),
2438 "ok",
2439 );
2440 let mut obj = Map::new();
2441 obj.insert("ok".to_string(), JsonValue::Bool(true));
2442 json_response(200, JsonValue::Object(obj))
2443 }
2444 Err(e) => json_error(400, e.to_string()),
2445 }
2446 }
2447
2448 pub(crate) fn handle_iam_detach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2450 let Some(store) = self.auth_store.as_ref() else {
2451 return json_error(503, "auth store not configured");
2452 };
2453 let uid = decode_user_arg(user);
2454 match store.detach_policy(
2455 crate::auth::store::PrincipalRef::User(uid.clone()),
2456 policy_id,
2457 ) {
2458 Ok(()) => {
2459 self.runtime.invalidate_result_cache();
2460 self.iam_audit(
2461 "iam/policy.detach",
2462 &format!("user:{uid}::{policy_id}"),
2463 "ok",
2464 );
2465 HttpResponse {
2466 status: 204,
2467 content_type: "application/json",
2468 body: Vec::new(),
2469 extra_headers: Vec::new(),
2470 }
2471 }
2472 Err(e) => json_error(400, e.to_string()),
2473 }
2474 }
2475
2476 pub(crate) fn handle_iam_add_user_group(&self, user: &str, group: &str) -> HttpResponse {
2478 let Some(store) = self.auth_store.as_ref() else {
2479 return json_error(503, "auth store not configured");
2480 };
2481 let uid = decode_user_arg(user);
2482 match store.add_user_to_group(&uid, group) {
2483 Ok(()) => {
2484 self.runtime.invalidate_result_cache();
2485 self.iam_audit("iam/group.add", &format!("user:{uid}::group:{group}"), "ok");
2486 let mut obj = Map::new();
2487 obj.insert("ok".to_string(), JsonValue::Bool(true));
2488 json_response(200, JsonValue::Object(obj))
2489 }
2490 Err(e) => json_error(400, e.to_string()),
2491 }
2492 }
2493
2494 pub(crate) fn handle_iam_remove_user_group(&self, user: &str, group: &str) -> HttpResponse {
2496 let Some(store) = self.auth_store.as_ref() else {
2497 return json_error(503, "auth store not configured");
2498 };
2499 let uid = decode_user_arg(user);
2500 match store.remove_user_from_group(&uid, group) {
2501 Ok(()) => {
2502 self.runtime.invalidate_result_cache();
2503 self.iam_audit(
2504 "iam/group.remove",
2505 &format!("user:{uid}::group:{group}"),
2506 "ok",
2507 );
2508 HttpResponse {
2509 status: 204,
2510 content_type: "application/json",
2511 body: Vec::new(),
2512 extra_headers: Vec::new(),
2513 }
2514 }
2515 Err(e) => json_error(400, e.to_string()),
2516 }
2517 }
2518
2519 pub(crate) fn handle_iam_attach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2521 let Some(store) = self.auth_store.as_ref() else {
2522 return json_error(503, "auth store not configured");
2523 };
2524 match store.attach_policy(
2525 crate::auth::store::PrincipalRef::Group(group.to_string()),
2526 policy_id,
2527 ) {
2528 Ok(()) => {
2529 self.runtime.invalidate_result_cache();
2530 self.iam_audit(
2531 "iam/policy.attach",
2532 &format!("group:{group}::{policy_id}"),
2533 "ok",
2534 );
2535 let mut obj = Map::new();
2536 obj.insert("ok".to_string(), JsonValue::Bool(true));
2537 json_response(200, JsonValue::Object(obj))
2538 }
2539 Err(e) => json_error(400, e.to_string()),
2540 }
2541 }
2542
2543 pub(crate) fn handle_iam_detach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2545 let Some(store) = self.auth_store.as_ref() else {
2546 return json_error(503, "auth store not configured");
2547 };
2548 match store.detach_policy(
2549 crate::auth::store::PrincipalRef::Group(group.to_string()),
2550 policy_id,
2551 ) {
2552 Ok(()) => {
2553 self.runtime.invalidate_result_cache();
2554 self.iam_audit(
2555 "iam/policy.detach",
2556 &format!("group:{group}::{policy_id}"),
2557 "ok",
2558 );
2559 HttpResponse {
2560 status: 204,
2561 content_type: "application/json",
2562 body: Vec::new(),
2563 extra_headers: Vec::new(),
2564 }
2565 }
2566 Err(e) => json_error(400, e.to_string()),
2567 }
2568 }
2569
2570 pub(crate) fn handle_iam_effective_permissions(
2572 &self,
2573 user: &str,
2574 query: &std::collections::BTreeMap<String, String>,
2575 ) -> HttpResponse {
2576 let Some(store) = self.auth_store.as_ref() else {
2577 return json_error(503, "auth store not configured");
2578 };
2579 let uid = decode_user_arg(user);
2580 let pols = store.effective_policies(&uid);
2581
2582 let resource_echo = query.get("resource").cloned();
2587 let items: Vec<JsonValue> = pols
2588 .iter()
2589 .map(|p| {
2590 let mut obj = Map::new();
2591 obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2592 obj.insert(
2593 "statements".to_string(),
2594 JsonValue::Number(p.statements.len() as f64),
2595 );
2596 JsonValue::Object(obj)
2597 })
2598 .collect();
2599 let mut envelope = Map::new();
2600 envelope.insert("user".to_string(), JsonValue::String(uid.to_string()));
2601 if let Some(r) = resource_echo {
2602 envelope.insert("resource".to_string(), JsonValue::String(r));
2603 }
2604 envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2605 envelope.insert("policies".to_string(), JsonValue::Array(items));
2606 json_response(200, JsonValue::Object(envelope))
2607 }
2608
2609 pub(crate) fn handle_iam_simulate(&self, body: Vec<u8>) -> HttpResponse {
2612 let Some(store) = self.auth_store.as_ref() else {
2613 return json_error(503, "auth store not configured");
2614 };
2615 let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
2616 std::str::from_utf8(&body).unwrap_or(""),
2617 ) {
2618 Ok(v) => v,
2619 Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
2620 };
2621 let obj = match parsed.as_object() {
2622 Some(o) => o,
2623 None => return json_error(400, "body must be a JSON object"),
2624 };
2625 let principal = match obj.get("principal").and_then(|v| v.as_str()) {
2626 Some(s) => decode_user_arg(s),
2627 None => return json_error(400, "missing `principal`"),
2628 };
2629 let action = match obj.get("action").and_then(|v| v.as_str()) {
2630 Some(s) => s.to_string(),
2631 None => return json_error(400, "missing `action`"),
2632 };
2633 let resource = match obj.get("resource") {
2634 Some(JsonValue::Object(r)) => {
2635 let kind = r
2636 .get("kind")
2637 .and_then(|v| v.as_str())
2638 .unwrap_or("")
2639 .to_string();
2640 let name = r
2641 .get("name")
2642 .and_then(|v| v.as_str())
2643 .unwrap_or("")
2644 .to_string();
2645 if kind.is_empty() || name.is_empty() {
2646 return json_error(400, "resource needs kind+name");
2647 }
2648 let mut rr = crate::auth::policies::ResourceRef::new(kind, name);
2649 if let Some(t) = r.get("tenant").and_then(|v| v.as_str()) {
2650 rr = rr.with_tenant(t.to_string());
2651 }
2652 rr
2653 }
2654 Some(JsonValue::String(s)) => match s.split_once(':') {
2655 Some((k, n)) => crate::auth::policies::ResourceRef::new(k, n),
2656 None => return json_error(400, "resource string must be `kind:name`"),
2657 },
2658 _ => return json_error(400, "missing `resource`"),
2659 };
2660 let mut sim_ctx = crate::auth::store::SimCtx::default();
2661 if let Some(c) = obj.get("ctx").and_then(|v| v.as_object()) {
2662 if let Some(t) = c.get("current_tenant").and_then(|v| v.as_str()) {
2663 sim_ctx.current_tenant = Some(t.to_string());
2664 }
2665 if let Some(true) = c.get("mfa").and_then(|v| v.as_bool()) {
2666 sim_ctx.mfa_present = true;
2667 }
2668 if let Some(ip) = c
2669 .get("source_ip")
2670 .or_else(|| c.get("peer_ip"))
2671 .and_then(|v| v.as_str())
2672 {
2673 if let Ok(addr) = ip.parse() {
2674 sim_ctx.peer_ip = Some(addr);
2675 }
2676 }
2677 if let Some(ms) = c.get("now_ms").and_then(|v| v.as_u64()) {
2678 sim_ctx.now_ms = Some(ms as u128);
2679 }
2680 }
2681 let outcome = store.simulate(&principal, &action, &resource, sim_ctx);
2682 let (decision_str, matched_pid, matched_sid) =
2683 crate::runtime::impl_core::decision_to_strings(&outcome.decision);
2684
2685 self.iam_audit("iam/policy.simulate", &principal.to_string(), &decision_str);
2686
2687 let mut envelope = Map::new();
2688 envelope.insert("decision".to_string(), JsonValue::String(decision_str));
2689 envelope.insert(
2690 "matched_policy_id".to_string(),
2691 matched_pid
2692 .map(JsonValue::String)
2693 .unwrap_or(JsonValue::Null),
2694 );
2695 envelope.insert(
2696 "matched_sid".to_string(),
2697 matched_sid
2698 .map(JsonValue::String)
2699 .unwrap_or(JsonValue::Null),
2700 );
2701 envelope.insert("reason".to_string(), JsonValue::String(outcome.reason));
2702 let trail: Vec<JsonValue> = outcome
2703 .trail
2704 .into_iter()
2705 .map(|t| {
2706 let mut obj = Map::new();
2707 obj.insert("policy_id".to_string(), JsonValue::String(t.policy_id));
2708 obj.insert(
2709 "sid".to_string(),
2710 t.sid.map(JsonValue::String).unwrap_or(JsonValue::Null),
2711 );
2712 obj.insert("matched".to_string(), JsonValue::Bool(t.matched));
2713 obj.insert(
2714 "effect".to_string(),
2715 JsonValue::String(
2716 match t.effect {
2717 crate::auth::policies::Effect::Allow => "allow",
2718 crate::auth::policies::Effect::Deny => "deny",
2719 }
2720 .to_string(),
2721 ),
2722 );
2723 obj.insert(
2724 "why_skipped".to_string(),
2725 t.why_skipped
2726 .map(|s| JsonValue::String(s.to_string()))
2727 .unwrap_or(JsonValue::Null),
2728 );
2729 JsonValue::Object(obj)
2730 })
2731 .collect();
2732 envelope.insert("trail".to_string(), JsonValue::Array(trail));
2733 json_response(200, JsonValue::Object(envelope))
2734 }
2735}
2736
2737fn decode_user_arg(raw: &str) -> crate::auth::UserId {
2738 if let Some((tenant, name)) = raw.split_once('/') {
2741 return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2742 }
2743 if let Some((tenant, name)) = raw.split_once('.') {
2744 return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2745 }
2746 crate::auth::UserId::platform(raw.to_string())
2747}
2748
2749#[cfg(test)]
2750mod tests {
2751 use super::*;
2752
2753 #[test]
2754 fn metrics_expose_result_blob_cache_label_set() {
2755 let runtime =
2756 crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2757 .expect("runtime");
2758 runtime
2759 .db()
2760 .store()
2761 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
2762
2763 runtime.execute_query("SELECT 1").expect("populate miss");
2764 runtime.execute_query("SELECT 1").expect("blob hit");
2765 runtime.invalidate_result_cache();
2766
2767 let server = RedDBServer::new(runtime);
2768 let response = server.handle_metrics();
2769 let body = String::from_utf8(response.body).expect("utf8 metrics");
2770
2771 for needle in [
2772 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l1\"}",
2773 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l2\"}",
2774 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"miss\"}",
2775 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"ok\"}",
2776 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"version_mismatch\"}",
2777 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"too_large\"}",
2778 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"metadata_too_large\"}",
2779 "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"dependency\"}",
2780 "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"namespace\"}",
2781 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"capacity\"}",
2782 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"expiry\"}",
2783 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"policy\"}",
2784 "reddb_cache_blob_l1_bytes_in_use{namespace=\"runtime.result_cache\"}",
2785 "reddb_cache_blob_l1_entries{namespace=\"runtime.result_cache\"}",
2786 "reddb_cache_blob_l2_bytes_in_use{namespace=\"runtime.result_cache\"}",
2787 "reddb_cache_blob_l2_full_rejections_total{namespace=\"runtime.result_cache\"}",
2788 "reddb_cache_blob_version_mismatch_total{namespace=\"runtime.result_cache\"}",
2789 ] {
2790 assert!(body.contains(needle), "missing metric line for {needle}");
2791 }
2792 }
2793
2794 fn test_server() -> RedDBServer {
2799 let runtime =
2800 crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2801 .expect("runtime");
2802 RedDBServer::new(runtime)
2803 }
2804
2805 fn parse_body(resp: &HttpResponse) -> JsonValue {
2806 let s = std::str::from_utf8(&resp.body).expect("utf8 body");
2807 crate::serde_json::from_str::<JsonValue>(s).expect("JSON body")
2808 }
2809
2810 #[test]
2811 fn admin_blob_cache_sweep_happy_path_returns_well_formed_report() {
2812 let server = test_server();
2813 let body = br#"{"limit_entries": 100, "limit_millis": 50}"#.to_vec();
2814 let resp = server.handle_admin_blob_cache_sweep(body);
2815 assert_eq!(resp.status, 200);
2816 let parsed = parse_body(&resp);
2817 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2818 for field in [
2821 "entries_scanned",
2822 "entries_evicted",
2823 "bytes_reclaimed",
2824 "elapsed_ms",
2825 "truncated_due_to_limit",
2826 ] {
2827 assert!(
2828 parsed.get(field).is_some(),
2829 "missing field {field} in response: {parsed:?}"
2830 );
2831 }
2832 }
2833
2834 #[test]
2835 fn admin_blob_cache_sweep_empty_body_uses_unbounded_default() {
2836 let server = test_server();
2837 let resp = server.handle_admin_blob_cache_sweep(Vec::new());
2838 assert_eq!(resp.status, 200);
2839 let parsed = parse_body(&resp);
2840 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2841 }
2842
2843 #[test]
2844 fn admin_blob_cache_sweep_invalid_json_returns_400() {
2845 let server = test_server();
2846 let resp = server.handle_admin_blob_cache_sweep(b"not json".to_vec());
2847 assert_eq!(resp.status, 400);
2848 }
2849
2850 #[test]
2851 fn admin_blob_cache_flush_namespace_happy_path() {
2852 let server = test_server();
2853 let body = br#"{"namespace": "tenant-42:results"}"#.to_vec();
2854 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2855 assert_eq!(resp.status, 200);
2856 let parsed = parse_body(&resp);
2857 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2858 assert_eq!(
2859 parsed.get("namespace").and_then(|v| v.as_str()),
2860 Some("tenant-42:results")
2861 );
2862 assert!(parsed.get("elapsed_micros").is_some());
2863 assert!(parsed.get("generation_before").is_some());
2864 assert!(parsed.get("generation_after").is_some());
2865 }
2866
2867 #[test]
2868 fn admin_blob_cache_flush_namespace_missing_body_returns_400() {
2869 let server = test_server();
2870 let resp = server.handle_admin_blob_cache_flush_namespace(Vec::new());
2871 assert_eq!(resp.status, 400);
2872 let parsed = parse_body(&resp);
2873 assert!(parsed
2874 .get("error")
2875 .and_then(|v| v.as_str())
2876 .map(|s| s.contains("namespace"))
2877 .unwrap_or(false));
2878 }
2879
2880 #[test]
2881 fn admin_blob_cache_flush_namespace_missing_field_returns_400() {
2882 let server = test_server();
2883 let body = br#"{"other": "x"}"#.to_vec();
2884 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2885 assert_eq!(resp.status, 400);
2886 }
2887
2888 #[test]
2889 fn admin_blob_cache_flush_namespace_empty_string_returns_400() {
2890 let server = test_server();
2891 let body = br#"{"namespace": ""}"#.to_vec();
2892 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2893 assert_eq!(resp.status, 400);
2894 }
2895
2896 #[test]
2897 fn admin_blob_cache_flush_namespace_rejects_crlf_smuggling_attempt() {
2898 let server = test_server();
2899 let body = br#"{"namespace": "real-ns\r\nfake-audit: spliced"}"#.to_vec();
2902 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2903 assert_eq!(resp.status, 400);
2904 let parsed = parse_body(&resp);
2905 let msg = parsed
2906 .get("error")
2907 .and_then(|v| v.as_str())
2908 .unwrap_or_default();
2909 assert!(msg.contains("CR/LF"), "unexpected error: {msg}");
2910 }
2911
2912 #[test]
2913 fn admin_blob_cache_flush_namespace_rejects_nul_byte() {
2914 let server = test_server();
2915 let body = b"{\"namespace\": \"with-nul-\\u0000-here\"}".to_vec();
2920 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2921 assert_eq!(resp.status, 400);
2922 let parsed = parse_body(&resp);
2923 let msg = parsed
2924 .get("error")
2925 .and_then(|v| v.as_str())
2926 .unwrap_or_default();
2927 assert!(msg.contains("NUL"), "unexpected error: {msg}");
2928 }
2929
2930 #[test]
2931 fn admin_blob_cache_flush_namespace_response_round_trips_unicode() {
2932 let server = test_server();
2933 let body = r#"{"namespace": "日本語-ns-🦀"}"#.as_bytes().to_vec();
2934 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2935 assert_eq!(resp.status, 200);
2936 let parsed = parse_body(&resp);
2937 assert_eq!(
2938 parsed.get("namespace").and_then(|v| v.as_str()),
2939 Some("日本語-ns-🦀")
2940 );
2941 }
2942
2943 fn cas_body(namespace: &str, key: &str, new_value: &[u8], new_version: u64) -> Vec<u8> {
2948 let b64 = {
2949 let mut s = String::new();
2950 for chunk in new_value.chunks(3) {
2951 const CHARS: &[u8] =
2952 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
2953 let b0 = chunk[0] as u32;
2954 let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
2955 let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
2956 let n = (b0 << 16) | (b1 << 8) | b2;
2957 s.push(CHARS[((n >> 18) & 63) as usize] as char);
2958 s.push(CHARS[((n >> 12) & 63) as usize] as char);
2959 s.push(if chunk.len() > 1 {
2960 CHARS[((n >> 6) & 63) as usize] as char
2961 } else {
2962 '='
2963 });
2964 s.push(if chunk.len() > 2 {
2965 CHARS[(n & 63) as usize] as char
2966 } else {
2967 '='
2968 });
2969 }
2970 s
2971 };
2972 format!(
2973 r#"{{"namespace":"{namespace}","key":"{key}","expected_version":0,"new_value_b64":"{b64}","new_version":{new_version}}}"#
2974 )
2975 .into_bytes()
2976 }
2977
2978 #[test]
2979 fn cas_happy_first_write() {
2980 let server = test_server();
2981 let body = cas_body("ns1", "k1", b"hello", 1);
2982 let resp = server.handle_admin_blob_cache_compare_and_set(body);
2983 assert_eq!(resp.status, 200);
2984 let parsed = parse_body(&resp);
2985 assert_eq!(
2986 parsed.get("committed").and_then(|v| v.as_bool()),
2987 Some(true)
2988 );
2989 assert_eq!(
2990 parsed.get("current_version").and_then(|v| v.as_u64()),
2991 Some(1)
2992 );
2993 }
2994
2995 #[test]
2996 fn cas_happy_update_increments_version() {
2997 let server = test_server();
2998 server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v1", 1));
3000 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v2", 2));
3002 assert_eq!(resp.status, 200);
3003 let parsed = parse_body(&resp);
3004 assert_eq!(
3005 parsed.get("committed").and_then(|v| v.as_bool()),
3006 Some(true)
3007 );
3008 assert_eq!(
3009 parsed.get("current_version").and_then(|v| v.as_u64()),
3010 Some(2)
3011 );
3012 }
3013
3014 #[test]
3015 fn cas_conflict_same_version_returns_409() {
3016 let server = test_server();
3017 server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v1", 5));
3019 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v2", 5));
3021 assert_eq!(resp.status, 409);
3022 let parsed = parse_body(&resp);
3023 assert_eq!(
3024 parsed.get("committed").and_then(|v| v.as_bool()),
3025 Some(false)
3026 );
3027 assert_eq!(
3028 parsed.get("reason").and_then(|v| v.as_str()),
3029 Some("VersionMismatch")
3030 );
3031 }
3032
3033 #[test]
3034 fn cas_stale_expected_version_returns_409() {
3035 let server = test_server();
3036 server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v1", 10));
3038 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v2", 9));
3040 assert_eq!(resp.status, 409);
3041 let parsed = parse_body(&resp);
3042 assert_eq!(
3043 parsed.get("current_version").and_then(|v| v.as_u64()),
3044 Some(10)
3045 );
3046 }
3047
3048 #[test]
3049 fn cas_crlf_in_namespace_returns_400() {
3050 let server = test_server();
3051 let body = b"{\"namespace\":\"real\\r\\ninjected\",\"key\":\"k\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
3053 let resp = server.handle_admin_blob_cache_compare_and_set(body);
3054 assert_eq!(resp.status, 400);
3055 let parsed = parse_body(&resp);
3056 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3057 assert!(msg.contains("CR/LF"), "expected CR/LF error, got: {msg}");
3058 }
3059
3060 #[test]
3061 fn cas_nul_in_key_returns_400() {
3062 let server = test_server();
3063 let body = b"{\"namespace\":\"ns\",\"key\":\"k\\u0000nul\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
3064 let resp = server.handle_admin_blob_cache_compare_and_set(body);
3065 assert_eq!(resp.status, 400);
3066 let parsed = parse_body(&resp);
3067 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3068 assert!(msg.contains("NUL"), "expected NUL error, got: {msg}");
3069 }
3070
3071 #[test]
3072 fn cas_bad_base64_returns_400() {
3073 let server = test_server();
3074 let body = br#"{"namespace":"ns","key":"k","expected_version":0,"new_value_b64":"!!!invalid!!!","new_version":1}"#.to_vec();
3075 let resp = server.handle_admin_blob_cache_compare_and_set(body);
3076 assert_eq!(resp.status, 400);
3077 let parsed = parse_body(&resp);
3078 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3079 assert!(msg.contains("base64"), "expected base64 error, got: {msg}");
3080 }
3081
3082 #[test]
3083 fn cas_missing_bearer_returns_401_via_route() {
3084 use std::sync::Mutex;
3085 static GUARD: Mutex<()> = Mutex::new(());
3086 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3087
3088 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3089 unsafe {
3090 std::env::set_var("RED_ADMIN_TOKEN", "test-token-195");
3091 }
3092
3093 let server = test_server();
3094 let req = crate::server::transport::HttpRequest {
3095 method: "POST".to_string(),
3096 path: "/admin/cache/compare-and-set".to_string(),
3097 query: std::collections::BTreeMap::new(),
3098 headers: std::collections::BTreeMap::new(),
3099 body: cas_body("ns", "k", b"v", 1),
3100 };
3101 let resp = server.route(req);
3102 assert_eq!(resp.status, 401);
3103
3104 unsafe {
3105 match prev {
3106 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3107 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3108 }
3109 }
3110 }
3111
3112 #[test]
3113 fn cas_wrong_bearer_returns_401_via_route() {
3114 use std::sync::Mutex;
3115 static GUARD: Mutex<()> = Mutex::new(());
3116 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3117
3118 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3119 unsafe {
3120 std::env::set_var("RED_ADMIN_TOKEN", "correct-token");
3121 }
3122
3123 let server = test_server();
3124 let mut headers = std::collections::BTreeMap::new();
3125 headers.insert(
3126 "authorization".to_string(),
3127 "Bearer wrong-token".to_string(),
3128 );
3129 let req = crate::server::transport::HttpRequest {
3130 method: "POST".to_string(),
3131 path: "/admin/cache/compare-and-set".to_string(),
3132 query: std::collections::BTreeMap::new(),
3133 headers,
3134 body: cas_body("ns", "k", b"v", 1),
3135 };
3136 let resp = server.route(req);
3137 assert_eq!(resp.status, 401);
3138
3139 unsafe {
3140 match prev {
3141 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3142 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3143 }
3144 }
3145 }
3146
3147 #[test]
3148 fn cas_concurrent_race_exactly_one_commits() {
3149 use std::sync::{Arc, Mutex};
3150
3151 let server = Arc::new(Mutex::new(test_server()));
3155 let committed = Arc::new(Mutex::new(0u32));
3156 let conflicted = Arc::new(Mutex::new(0u32));
3157
3158 let handles: Vec<_> = (0..8)
3159 .map(|_| {
3160 let server = Arc::clone(&server);
3161 let committed = Arc::clone(&committed);
3162 let conflicted = Arc::clone(&conflicted);
3163 std::thread::spawn(move || {
3164 let body = cas_body("race-ns", "race-key", b"payload", 1);
3166 let resp = {
3167 let s = server.lock().unwrap();
3168 s.handle_admin_blob_cache_compare_and_set(body)
3169 };
3170 match resp.status {
3171 200 => *committed.lock().unwrap() += 1,
3172 409 => *conflicted.lock().unwrap() += 1,
3173 s => panic!("unexpected status {s}"),
3174 }
3175 })
3176 })
3177 .collect();
3178
3179 for h in handles {
3180 h.join().expect("thread panicked");
3181 }
3182
3183 assert_eq!(
3184 *committed.lock().unwrap(),
3185 1,
3186 "exactly one CAS should commit (version 1 can only be written once)"
3187 );
3188 }
3189
3190 #[test]
3198 fn admin_blob_cache_routes_reject_unauth_when_admin_token_set() {
3199 use std::sync::Mutex;
3203 static GUARD: Mutex<()> = Mutex::new(());
3204 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3205
3206 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3207 unsafe {
3210 std::env::set_var("RED_ADMIN_TOKEN", "test-token-148");
3211 }
3212
3213 let server = test_server();
3214
3215 let req = crate::server::transport::HttpRequest {
3217 method: "POST".to_string(),
3218 path: "/admin/blob_cache/sweep".to_string(),
3219 query: std::collections::BTreeMap::new(),
3220 headers: std::collections::BTreeMap::new(),
3221 body: br#"{"limit_entries":1}"#.to_vec(),
3222 };
3223 let resp = server.route(req);
3224 assert_eq!(resp.status, 401, "sweep without admin token must be 401");
3225
3226 let req = crate::server::transport::HttpRequest {
3228 method: "POST".to_string(),
3229 path: "/admin/blob_cache/flush_namespace".to_string(),
3230 query: std::collections::BTreeMap::new(),
3231 headers: std::collections::BTreeMap::new(),
3232 body: br#"{"namespace":"x"}"#.to_vec(),
3233 };
3234 let resp = server.route(req);
3235 assert_eq!(resp.status, 401, "flush without admin token must be 401");
3236
3237 let mut headers = std::collections::BTreeMap::new();
3239 headers.insert(
3240 "authorization".to_string(),
3241 "Bearer test-token-148".to_string(),
3242 );
3243 let req = crate::server::transport::HttpRequest {
3244 method: "POST".to_string(),
3245 path: "/admin/blob_cache/flush_namespace".to_string(),
3246 query: std::collections::BTreeMap::new(),
3247 headers,
3248 body: br#"{"namespace":"ok"}"#.to_vec(),
3249 };
3250 let resp = server.route(req);
3251 assert_eq!(resp.status, 200, "flush with admin token must be 200");
3252
3253 unsafe {
3255 match prev {
3256 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3257 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3258 }
3259 }
3260 }
3261}