1use super::*;
16use crate::runtime::lifecycle::Phase;
17use std::path::{Path, PathBuf};
18
19pub(crate) fn runtime_state_path(data_path: &Path) -> PathBuf {
23 let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
24 parent.join(".runtime-state.json")
25}
26
27pub(crate) fn persist_runtime_readonly(state_path: &Path, enabled: bool) -> std::io::Result<()> {
31 use std::io::Write;
32 let mut object = crate::json::Map::new();
33 object.insert("read_only".to_string(), crate::json::Value::Bool(enabled));
34 let body = crate::serde_json::to_string_pretty(&crate::json::Value::Object(object))
35 .map_err(|err| std::io::Error::other(err.to_string()))?;
36 if let Some(parent) = state_path.parent() {
37 if !parent.as_os_str().is_empty() {
38 std::fs::create_dir_all(parent)?;
39 }
40 }
41 let tmp = state_path.with_extension("json.tmp");
42 {
43 let mut f = std::fs::File::create(&tmp)?;
44 f.write_all(body.as_bytes())?;
45 f.sync_all()?;
46 }
47 std::fs::rename(&tmp, state_path)?;
48 Ok(())
49}
50
51pub fn load_runtime_readonly(data_path: &Path) -> Option<bool> {
55 let state_path = runtime_state_path(data_path);
56 let bytes = std::fs::read(&state_path).ok()?;
57 let parsed: crate::json::Value = crate::json::from_slice(&bytes).ok()?;
58 parsed.get("read_only").and_then(|v| v.as_bool())
59}
60
61fn default_holder_id() -> String {
65 if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
66 return explicit;
67 }
68 let host = std::env::var("HOSTNAME")
69 .or_else(|_| std::env::var("HOST"))
70 .unwrap_or_else(|_| "unknown-host".to_string());
71 format!("{host}-{}", std::process::id())
72}
73
74fn sanitize_label(value: &str) -> String {
79 let mut out = String::with_capacity(value.len());
80 for ch in value.chars() {
81 match ch {
82 '"' => out.push_str("\\\""),
83 '\\' => out.push_str("\\\\"),
84 '\n' => out.push_str("\\n"),
85 '\r' => out.push_str("\\r"),
86 _ => out.push(ch),
87 }
88 }
89 out
90}
91
92fn b64_decode(input: &str) -> Result<Vec<u8>, String> {
95 let input = input.trim_end_matches('=');
96 let mut buf = Vec::with_capacity(input.len() * 3 / 4 + 1);
97
98 let lookup = |c: u8| -> Result<u32, String> {
99 match c {
100 b'A'..=b'Z' => Ok((c - b'A') as u32),
101 b'a'..=b'z' => Ok((c - b'a' + 26) as u32),
102 b'0'..=b'9' => Ok((c - b'0' + 52) as u32),
103 b'+' => Ok(62),
104 b'/' => Ok(63),
105 other => Err(format!("invalid base64 character: {}", other as char)),
106 }
107 };
108
109 let bytes: Vec<u8> = input.bytes().collect();
110 for chunk in bytes.chunks(4) {
111 let v: Vec<u32> = chunk.iter().map(|&b| lookup(b)).collect::<Result<_, _>>()?;
112 match v.len() {
113 4 => {
114 let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6) | v[3];
115 buf.push((n >> 16) as u8);
116 buf.push((n >> 8) as u8);
117 buf.push(n as u8);
118 }
119 3 => {
120 let n = (v[0] << 18) | (v[1] << 12) | (v[2] << 6);
121 buf.push((n >> 16) as u8);
122 buf.push((n >> 8) as u8);
123 }
124 2 => {
125 let n = (v[0] << 18) | (v[1] << 12);
126 buf.push((n >> 16) as u8);
127 }
128 _ => {}
129 }
130 }
131 Ok(buf)
132}
133
134fn reject_smuggling_bytes(field: &str, value: &str) -> Option<HttpResponse> {
137 for (idx, byte) in value.as_bytes().iter().enumerate() {
138 match *byte {
139 b'\0' => {
140 return Some(json_error(
141 400,
142 format!("field `{field}` contains forbidden NUL byte at index {idx}"),
143 ));
144 }
145 b'\r' | b'\n' => {
146 return Some(json_error(
147 400,
148 format!("field `{field}` contains forbidden CR/LF byte at index {idx}"),
149 ));
150 }
151 _ => {}
152 }
153 }
154 None
155}
156
157impl RedDBServer {
158 pub(crate) fn handle_admin_shutdown(&self) -> HttpResponse {
168 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
169 .ok()
170 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
171 .unwrap_or(true);
172
173 match self.runtime.graceful_shutdown(backup_on_shutdown) {
174 Ok(report) => {
175 let mut details = Map::new();
179 details.insert(
180 "backup_uploaded".to_string(),
181 JsonValue::Bool(report.backup_uploaded),
182 );
183 details.insert(
184 "duration_ms".to_string(),
185 JsonValue::Number(report.duration_ms as f64),
186 );
187 self.runtime.audit_log().record(
188 "admin/shutdown",
189 "operator",
190 "instance",
191 "ok",
192 JsonValue::Object(details),
193 );
194 let mut object = Map::new();
195 object.insert("ok".to_string(), JsonValue::Bool(true));
196 object.insert(
197 "phase".to_string(),
198 JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
199 );
200 object.insert(
201 "flushed_wal".to_string(),
202 JsonValue::Bool(report.flushed_wal),
203 );
204 object.insert(
205 "final_checkpoint".to_string(),
206 JsonValue::Bool(report.final_checkpoint),
207 );
208 object.insert(
209 "backup_uploaded".to_string(),
210 JsonValue::Bool(report.backup_uploaded),
211 );
212 object.insert(
213 "duration_ms".to_string(),
214 JsonValue::Number(report.duration_ms as f64),
215 );
216 json_response(200, JsonValue::Object(object))
217 }
218 Err(err) => json_error(500, err.to_string()),
219 }
220 }
221
222 pub(crate) fn handle_admin_restore(&self, body: Vec<u8>) -> HttpResponse {
229 if !self.runtime.write_gate().is_read_only() {
230 return json_error(
231 409,
232 "POST /admin/restore requires the runtime to be read_only or replica-role; \
233 toggle via RED_READONLY=true or POST /admin/readonly first",
234 );
235 }
236 let db = self.runtime.db();
237 let Some(backend) = db.options().remote_backend.clone() else {
238 return json_error(412, "no remote backend configured (RED_BACKEND=none)");
239 };
240 let Some(local_path) = db.path().map(|p| p.to_path_buf()) else {
241 return json_error(412, "in-memory runtime cannot be restored from remote");
242 };
243 let snapshot_prefix = db.options().default_snapshot_prefix();
244 let wal_prefix = db.options().default_wal_archive_prefix();
245 let target_time_ms = if body.is_empty() {
246 0u64
247 } else {
248 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
249 Ok(v) => v
250 .get("to_timestamp_ms")
251 .and_then(|n| n.as_u64())
252 .or_else(|| {
253 v.get("to_timestamp")
254 .and_then(|n| n.as_u64())
255 .map(|s| s.saturating_mul(1000))
256 })
257 .unwrap_or(0),
258 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
259 }
260 };
261 let recovery =
262 crate::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
263 match recovery.restore_to(target_time_ms, &local_path) {
264 Ok(report) => {
265 let mut details = Map::new();
266 details.insert(
267 "snapshot_used".to_string(),
268 JsonValue::Number(report.snapshot_used as f64),
269 );
270 details.insert(
271 "wal_segments_replayed".to_string(),
272 JsonValue::Number(report.wal_segments_replayed as f64),
273 );
274 details.insert(
275 "records_applied".to_string(),
276 JsonValue::Number(report.records_applied as f64),
277 );
278 details.insert(
279 "recovered_to_lsn".to_string(),
280 JsonValue::Number(report.recovered_to_lsn as f64),
281 );
282 details.insert(
283 "recovered_to_time".to_string(),
284 JsonValue::Number(report.recovered_to_time as f64),
285 );
286 self.runtime.audit_log().record(
287 "admin/restore",
288 "operator",
289 "instance",
290 "ok",
291 JsonValue::Object(details.clone()),
292 );
293 let mut object = Map::new();
294 object.insert("ok".to_string(), JsonValue::Bool(true));
295 for (k, v) in details {
296 object.insert(k, v);
297 }
298 json_response(200, JsonValue::Object(object))
299 }
300 Err(err) => {
301 self.runtime.audit_log().record(
302 "admin/restore",
303 "operator",
304 "instance",
305 &format!("err: {err}"),
306 JsonValue::Null,
307 );
308 json_error(500, err.to_string())
309 }
310 }
311 }
312
313 pub(crate) fn handle_admin_backup(
317 &self,
318 _query: &std::collections::BTreeMap<String, String>,
319 ) -> HttpResponse {
320 match self.runtime.trigger_backup() {
321 Ok(result) => {
322 let mut details = Map::new();
323 details.insert(
324 "snapshot_id".to_string(),
325 JsonValue::Number(result.snapshot_id as f64),
326 );
327 details.insert("uploaded".to_string(), JsonValue::Bool(result.uploaded));
328 details.insert(
329 "duration_ms".to_string(),
330 JsonValue::Number(result.duration_ms as f64),
331 );
332 self.runtime.audit_log().record(
333 "admin/backup",
334 "operator",
335 "instance",
336 "ok",
337 JsonValue::Object(details.clone()),
338 );
339 let mut object = Map::new();
340 object.insert("ok".to_string(), JsonValue::Bool(true));
341 for (k, v) in details {
342 object.insert(k, v);
343 }
344 json_response(200, JsonValue::Object(object))
345 }
346 Err(err) => {
347 self.runtime.audit_log().record(
348 "admin/backup",
349 "operator",
350 "instance",
351 &format!("err: {err}"),
352 JsonValue::Null,
353 );
354 json_error(500, err.to_string())
355 }
356 }
357 }
358
359 pub(crate) fn handle_admin_blob_cache_sweep(&self, body: Vec<u8>) -> HttpResponse {
381 use crate::storage::cache::sweeper::{BlobCacheSweeper, SweepLimit};
382
383 let (limit_entries, limit_millis) = if body.is_empty() {
384 (None, None)
385 } else {
386 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
387 Ok(v) => {
388 let entries = v
389 .get("limit_entries")
390 .and_then(|n| n.as_u64())
391 .map(|n| usize::try_from(n).unwrap_or(usize::MAX));
392 let millis = v
393 .get("limit_millis")
394 .and_then(|n| n.as_u64())
395 .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
396 (entries, millis)
397 }
398 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
399 }
400 };
401
402 let limit = match (limit_entries, limit_millis) {
403 (None, None) => SweepLimit::Either {
404 entries: usize::MAX,
405 millis: u32::MAX,
406 },
407 (Some(e), None) => SweepLimit::Entries(e),
408 (None, Some(m)) => SweepLimit::Millis(m),
409 (Some(e), Some(m)) => SweepLimit::Either {
410 entries: e,
411 millis: m,
412 },
413 };
414
415 let report = BlobCacheSweeper::sweep_expired(self.runtime.result_blob_cache(), limit);
416
417 let mut object = Map::new();
418 object.insert("ok".to_string(), JsonValue::Bool(true));
419 object.insert(
420 "entries_scanned".to_string(),
421 JsonValue::Number(report.entries_scanned as f64),
422 );
423 object.insert(
424 "entries_evicted".to_string(),
425 JsonValue::Number(report.entries_evicted as f64),
426 );
427 object.insert(
428 "bytes_reclaimed".to_string(),
429 JsonValue::Number(report.bytes_reclaimed as f64),
430 );
431 object.insert(
432 "elapsed_ms".to_string(),
433 JsonValue::Number(report.elapsed_ms as f64),
434 );
435 object.insert(
436 "truncated_due_to_limit".to_string(),
437 JsonValue::Bool(report.truncated_due_to_limit),
438 );
439
440 let mut details = Map::new();
443 details.insert(
444 "entries_evicted".to_string(),
445 JsonValue::Number(report.entries_evicted as f64),
446 );
447 details.insert(
448 "bytes_reclaimed".to_string(),
449 JsonValue::Number(report.bytes_reclaimed as f64),
450 );
451 details.insert(
452 "elapsed_ms".to_string(),
453 JsonValue::Number(report.elapsed_ms as f64),
454 );
455 self.runtime.audit_log().record(
456 "admin/blob_cache/sweep",
457 "operator",
458 "instance",
459 "ok",
460 JsonValue::Object(details),
461 );
462
463 json_response(200, JsonValue::Object(object))
464 }
465
466 pub(crate) fn handle_admin_blob_cache_flush_namespace(&self, body: Vec<u8>) -> HttpResponse {
490 use crate::storage::cache::sweeper::BlobCacheSweeper;
491
492 if body.is_empty() {
493 return json_error(400, "missing JSON body with required `namespace` field");
494 }
495 let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
496 Ok(v) => v,
497 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
498 };
499 let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
500 Some(n) => n.to_string(),
501 None => return json_error(400, "field `namespace` is required and must be a string"),
502 };
503 if namespace.is_empty() {
504 return json_error(400, "field `namespace` must not be empty");
505 }
506 for (idx, byte) in namespace.as_bytes().iter().enumerate() {
511 match *byte {
512 b'\0' => {
513 return json_error(
514 400,
515 format!("field `namespace` contains forbidden NUL byte at index {idx}"),
516 );
517 }
518 b'\r' | b'\n' => {
519 return json_error(
520 400,
521 format!("field `namespace` contains forbidden CR/LF byte at index {idx}"),
522 );
523 }
524 _ => {}
525 }
526 }
527
528 let report =
529 BlobCacheSweeper::flush_namespace(self.runtime.result_blob_cache(), &namespace);
530
531 let mut object = Map::new();
532 object.insert("ok".to_string(), JsonValue::Bool(true));
533 object.insert(
537 "namespace".to_string(),
538 crate::json_field::SerializedJsonField::tainted(&report.namespace),
539 );
540 object.insert(
541 "generation_before".to_string(),
542 JsonValue::Number(report.generation_before as f64),
543 );
544 object.insert(
545 "generation_after".to_string(),
546 JsonValue::Number(report.generation_after as f64),
547 );
548 object.insert(
549 "elapsed_micros".to_string(),
550 JsonValue::Number(report.elapsed_micros as f64),
551 );
552
553 let mut details = Map::new();
554 details.insert(
555 "namespace".to_string(),
556 crate::json_field::SerializedJsonField::tainted(&report.namespace),
557 );
558 details.insert(
559 "elapsed_micros".to_string(),
560 JsonValue::Number(report.elapsed_micros as f64),
561 );
562 self.runtime.audit_log().record(
563 "admin/blob_cache/flush_namespace",
564 "operator",
565 "instance",
566 "ok",
567 JsonValue::Object(details),
568 );
569
570 json_response(200, JsonValue::Object(object))
571 }
572
573 pub(crate) fn handle_admin_blob_cache_compare_and_set(&self, body: Vec<u8>) -> HttpResponse {
600 use crate::storage::cache::blob::{BlobCachePolicy, BlobCachePut, CacheError};
601
602 if body.is_empty() {
603 return json_error(400, "missing JSON body");
604 }
605 let parsed: crate::serde_json::Value = match crate::serde_json::from_slice(&body) {
606 Ok(v) => v,
607 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
608 };
609
610 let namespace = match parsed.get("namespace").and_then(|v| v.as_str()) {
611 Some(n) if !n.is_empty() => n.to_string(),
612 Some(_) => return json_error(400, "field `namespace` must not be empty"),
613 None => return json_error(400, "field `namespace` is required and must be a string"),
614 };
615 let key = match parsed.get("key").and_then(|v| v.as_str()) {
616 Some(k) if !k.is_empty() => k.to_string(),
617 Some(_) => return json_error(400, "field `key` must not be empty"),
618 None => return json_error(400, "field `key` is required and must be a string"),
619 };
620 let new_value_b64 = match parsed.get("new_value_b64").and_then(|v| v.as_str()) {
621 Some(v) => v.to_string(),
622 None => {
623 return json_error(
624 400,
625 "field `new_value_b64` is required and must be a string",
626 )
627 }
628 };
629 let new_version = match parsed.get("new_version").and_then(|v| v.as_u64()) {
630 Some(v) => v,
631 None => {
632 return json_error(
633 400,
634 "field `new_version` is required and must be a non-negative integer",
635 )
636 }
637 };
638 if let Some(ev) = parsed.get("expected_version") {
640 if ev.as_u64().is_none() {
641 return json_error(
642 400,
643 "field `expected_version` must be a non-negative integer",
644 );
645 }
646 }
647 let ttl_ms = parsed.get("ttl_ms").and_then(|v| v.as_u64());
648
649 if let Some(err) = reject_smuggling_bytes("namespace", &namespace) {
651 return err;
652 }
653 if let Some(err) = reject_smuggling_bytes("key", &key) {
654 return err;
655 }
656
657 let bytes = match b64_decode(&new_value_b64) {
659 Ok(b) => b,
660 Err(e) => return json_error(400, format!("invalid base64 in `new_value_b64`: {e}")),
661 };
662
663 let policy = if let Some(ttl) = ttl_ms {
665 BlobCachePolicy::default().version(new_version).ttl_ms(ttl)
666 } else {
667 BlobCachePolicy::default().version(new_version)
668 };
669 let put = BlobCachePut::new(bytes).with_policy(policy);
670
671 match self.runtime.result_blob_cache().put(&namespace, &key, put) {
672 Ok(()) => {
673 let mut obj = Map::new();
674 obj.insert("committed".to_string(), JsonValue::Bool(true));
675 obj.insert(
676 "current_version".to_string(),
677 JsonValue::Number(new_version as f64),
678 );
679
680 let mut details = Map::new();
681 details.insert(
682 "namespace".to_string(),
683 crate::json_field::SerializedJsonField::tainted(&namespace),
684 );
685 details.insert(
686 "key".to_string(),
687 crate::json_field::SerializedJsonField::tainted(&key),
688 );
689 details.insert(
690 "new_version".to_string(),
691 JsonValue::Number(new_version as f64),
692 );
693 self.runtime.audit_log().record(
694 "admin/cache/compare_and_set",
695 "operator",
696 "instance",
697 "ok",
698 JsonValue::Object(details),
699 );
700
701 json_response(200, JsonValue::Object(obj))
702 }
703 Err(CacheError::VersionMismatch { existing, .. }) => {
704 let mut obj = Map::new();
705 obj.insert("committed".to_string(), JsonValue::Bool(false));
706 obj.insert(
707 "current_version".to_string(),
708 JsonValue::Number(existing as f64),
709 );
710 obj.insert(
711 "reason".to_string(),
712 JsonValue::String("VersionMismatch".to_string()),
713 );
714 json_response(409, JsonValue::Object(obj))
715 }
716 Err(err) => json_error(500, format!("cache put failed: {err:?}")),
717 }
718 }
719
720 pub(crate) fn handle_admin_blob_cache_stats(
727 &self,
728 _query: &std::collections::BTreeMap<String, String>,
729 ) -> HttpResponse {
730 let s = self.runtime.result_blob_cache().stats();
731 let mut obj = Map::new();
732 obj.insert("ok".to_string(), JsonValue::Bool(true));
733 obj.insert("hits".to_string(), JsonValue::Number(s.hits() as f64));
734 obj.insert("misses".to_string(), JsonValue::Number(s.misses() as f64));
735 obj.insert(
736 "insertions".to_string(),
737 JsonValue::Number(s.insertions() as f64),
738 );
739 obj.insert(
740 "evictions".to_string(),
741 JsonValue::Number(s.evictions() as f64),
742 );
743 obj.insert(
744 "expirations".to_string(),
745 JsonValue::Number(s.expirations() as f64),
746 );
747 obj.insert(
748 "invalidations".to_string(),
749 JsonValue::Number(s.invalidations() as f64),
750 );
751 obj.insert(
752 "namespace_flushes".to_string(),
753 JsonValue::Number(s.namespace_flushes() as f64),
754 );
755 obj.insert(
756 "version_mismatches".to_string(),
757 JsonValue::Number(s.version_mismatches() as f64),
758 );
759 obj.insert("entries".to_string(), JsonValue::Number(s.entries() as f64));
760 obj.insert(
761 "bytes_in_use".to_string(),
762 JsonValue::Number(s.bytes_in_use() as f64),
763 );
764 obj.insert(
765 "l1_bytes_max".to_string(),
766 JsonValue::Number(s.l1_bytes_max() as f64),
767 );
768 obj.insert(
769 "l2_bytes_in_use".to_string(),
770 JsonValue::Number(s.l2_bytes_in_use() as f64),
771 );
772 obj.insert(
773 "l2_bytes_max".to_string(),
774 JsonValue::Number(s.l2_bytes_max() as f64),
775 );
776 obj.insert(
777 "l2_full_rejections".to_string(),
778 JsonValue::Number(s.l2_full_rejections() as f64),
779 );
780 obj.insert(
781 "l2_metadata_reads".to_string(),
782 JsonValue::Number(s.l2_metadata_reads() as f64),
783 );
784 obj.insert(
785 "l2_negative_skips".to_string(),
786 JsonValue::Number(s.l2_negative_skips() as f64),
787 );
788 obj.insert(
789 "synopsis_metadata_reads".to_string(),
790 JsonValue::Number(s.synopsis_metadata_reads() as f64),
791 );
792 obj.insert(
793 "synopsis_bytes".to_string(),
794 JsonValue::Number(s.synopsis_bytes() as f64),
795 );
796 obj.insert(
797 "namespaces".to_string(),
798 JsonValue::Number(s.namespaces() as f64),
799 );
800 obj.insert(
801 "max_namespaces".to_string(),
802 JsonValue::Number(s.max_namespaces() as f64),
803 );
804 obj.insert(
805 "promotion_queued".to_string(),
806 JsonValue::Number(s.promotion_queued() as f64),
807 );
808 obj.insert(
809 "promotion_dropped".to_string(),
810 JsonValue::Number(s.promotion_dropped() as f64),
811 );
812 obj.insert(
813 "promotion_completed".to_string(),
814 JsonValue::Number(s.promotion_completed() as f64),
815 );
816 obj.insert(
817 "promotion_queue_depth".to_string(),
818 JsonValue::Number(s.promotion_queue_depth() as f64),
819 );
820 obj.insert(
821 "l2_compression_ratio_observed".to_string(),
822 JsonValue::Number(s.l2_compression_ratio_observed()),
823 );
824 obj.insert(
825 "l2_compression_skipped_total".to_string(),
826 JsonValue::Number(s.l2_compression_skipped_total() as f64),
827 );
828 obj.insert(
829 "l2_bytes_saved_total".to_string(),
830 JsonValue::Number(s.l2_bytes_saved_total() as f64),
831 );
832 json_response(200, JsonValue::Object(obj))
833 }
834
835 pub(crate) fn handle_admin_readonly(&self, body: Vec<u8>) -> HttpResponse {
850 let enabled = if body.is_empty() {
851 true
852 } else {
853 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
854 Ok(v) => v.get("enabled").and_then(|n| n.as_bool()).unwrap_or(true),
855 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
856 }
857 };
858
859 let previous = self.runtime.write_gate().set_read_only(enabled);
860
861 if let Some(data_path) = self.runtime.db().path() {
867 let state_path = runtime_state_path(data_path);
868 if let Err(err) = persist_runtime_readonly(&state_path, enabled) {
869 self.runtime.write_gate().set_read_only(previous);
870 return json_error(
871 500,
872 format!("read_only persisted to {state_path:?} failed: {err}"),
873 );
874 }
875 }
876
877 let mut details = Map::new();
878 details.insert("enabled".to_string(), JsonValue::Bool(enabled));
879 details.insert("previous".to_string(), JsonValue::Bool(previous));
880 self.runtime.audit_log().record(
881 "admin/readonly",
882 "operator",
883 "instance",
884 "ok",
885 JsonValue::Object(details),
886 );
887 let mut object = Map::new();
888 object.insert("ok".to_string(), JsonValue::Bool(true));
889 object.insert("read_only".to_string(), JsonValue::Bool(enabled));
890 object.insert("previous".to_string(), JsonValue::Bool(previous));
891 json_response(200, JsonValue::Object(object))
892 }
893
894 pub(crate) fn handle_metrics(&self) -> HttpResponse {
904 use std::fmt::Write;
905 let lifecycle = self.runtime.lifecycle();
906 let phase = lifecycle.phase();
907 let now_ms = std::time::SystemTime::now()
908 .duration_since(std::time::UNIX_EPOCH)
909 .map(|d| d.as_millis() as u64)
910 .unwrap_or(0);
911 let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
912 let cold_start_secs = lifecycle
913 .ready_at_ms()
914 .map(|ready_ms| (ready_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0);
915 let health_status: u8 = match phase {
916 Phase::Stopped => 0,
917 Phase::Starting | Phase::ShuttingDown => 0,
918 Phase::Draining => 1,
919 Phase::Ready => 2,
920 };
921 let read_only = self.runtime.write_gate().is_read_only();
922 let role = match self.runtime.write_gate().role() {
923 crate::replication::ReplicationRole::Standalone => "standalone",
924 crate::replication::ReplicationRole::Primary => "primary",
925 crate::replication::ReplicationRole::Replica { .. } => "replica",
926 };
927 let db_size_bytes = self
928 .runtime
929 .db()
930 .path()
931 .and_then(|p| std::fs::metadata(p).ok())
932 .map(|m| m.len())
933 .unwrap_or(0);
934 let runtime_stats = self.runtime.stats();
935 let result_blob_stats = runtime_stats.result_blob_cache;
936 let kv_stats = runtime_stats.kv;
937 let metrics_ingest = runtime_stats.metrics_ingest;
938
939 let mut body = String::with_capacity(1024);
940 let _ = writeln!(
941 body,
942 "# HELP reddb_uptime_seconds Seconds since the runtime was constructed."
943 );
944 let _ = writeln!(body, "# TYPE reddb_uptime_seconds gauge");
945 let _ = writeln!(body, "reddb_uptime_seconds {}", uptime_secs);
946
947 let _ = writeln!(
948 body,
949 "# HELP reddb_health_status 0=down/starting, 1=degraded/draining, 2=ready."
950 );
951 let _ = writeln!(body, "# TYPE reddb_health_status gauge");
952 let _ = writeln!(body, "reddb_health_status {}", health_status);
953
954 let _ = writeln!(
955 body,
956 "# HELP reddb_phase Lifecycle phase as a labeled gauge (always 1; phase in label)."
957 );
958 let _ = writeln!(body, "# TYPE reddb_phase gauge");
959 let _ = writeln!(body, "reddb_phase{{phase=\"{}\"}} 1", phase.as_str());
960
961 let _ = writeln!(
962 body,
963 "# HELP reddb_read_only 1 when public mutations are gated, 0 otherwise."
964 );
965 let _ = writeln!(body, "# TYPE reddb_read_only gauge");
966 let _ = writeln!(body, "reddb_read_only {}", if read_only { 1 } else { 0 });
967
968 let _ = writeln!(
969 body,
970 "# HELP reddb_replication_role Replication role of this instance."
971 );
972 let _ = writeln!(body, "# TYPE reddb_replication_role gauge");
973 let _ = writeln!(body, "reddb_replication_role{{role=\"{}\"}} 1", role);
974
975 let lease_state = self.runtime.write_gate().lease_state();
980 let _ = writeln!(
981 body,
982 "# HELP reddb_writer_lease_state Serverless writer-lease gate state (label)."
983 );
984 let _ = writeln!(body, "# TYPE reddb_writer_lease_state gauge");
985 let _ = writeln!(
986 body,
987 "reddb_writer_lease_state{{state=\"{}\"}} 1",
988 lease_state.label()
989 );
990
991 let backup_status = self.runtime.backup_status();
996 if let Some(last) = backup_status.last_backup.as_ref() {
997 let last_ts_secs = (last.timestamp as f64) / 1000.0;
998 let _ = writeln!(
999 body,
1000 "# HELP reddb_backup_last_success_timestamp_seconds Unix ts (s) of the most recent successful backup."
1001 );
1002 let _ = writeln!(
1003 body,
1004 "# TYPE reddb_backup_last_success_timestamp_seconds gauge"
1005 );
1006 let _ = writeln!(
1007 body,
1008 "reddb_backup_last_success_timestamp_seconds {}",
1009 last_ts_secs
1010 );
1011 let age_secs = ((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0;
1012 let _ = writeln!(
1013 body,
1014 "# HELP reddb_backup_age_seconds Seconds since last successful backup."
1015 );
1016 let _ = writeln!(body, "# TYPE reddb_backup_age_seconds gauge");
1017 let _ = writeln!(body, "reddb_backup_age_seconds {}", age_secs);
1018 let _ = writeln!(
1019 body,
1020 "# HELP reddb_backup_last_duration_seconds Wall-clock duration of the most recent backup."
1021 );
1022 let _ = writeln!(body, "# TYPE reddb_backup_last_duration_seconds gauge");
1023 let _ = writeln!(
1024 body,
1025 "reddb_backup_last_duration_seconds {}",
1026 (last.duration_ms as f64) / 1000.0
1027 );
1028 }
1029 let _ = writeln!(
1030 body,
1031 "# HELP reddb_backup_failures_total Total backup failures since process start."
1032 );
1033 let _ = writeln!(body, "# TYPE reddb_backup_failures_total counter");
1034 let _ = writeln!(
1035 body,
1036 "reddb_backup_failures_total {}",
1037 backup_status.total_failures
1038 );
1039 let _ = writeln!(
1040 body,
1041 "# HELP reddb_backup_total_total Total successful backups since process start."
1042 );
1043 let _ = writeln!(body, "# TYPE reddb_backup_total_total counter");
1044 let _ = writeln!(
1045 body,
1046 "reddb_backup_total_total {}",
1047 backup_status.total_backups
1048 );
1049
1050 let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1055 let lag = current_lsn.saturating_sub(last_archived_lsn);
1056 let _ = writeln!(
1057 body,
1058 "# HELP reddb_wal_current_lsn Current local LSN (most recent record visible to writers)."
1059 );
1060 let _ = writeln!(body, "# TYPE reddb_wal_current_lsn gauge");
1061 let _ = writeln!(body, "reddb_wal_current_lsn {}", current_lsn);
1062 let _ = writeln!(
1063 body,
1064 "# HELP reddb_wal_last_archived_lsn LSN of the most recently archived WAL segment."
1065 );
1066 let _ = writeln!(body, "# TYPE reddb_wal_last_archived_lsn gauge");
1067 let _ = writeln!(body, "reddb_wal_last_archived_lsn {}", last_archived_lsn);
1068 let _ = writeln!(
1069 body,
1070 "# HELP reddb_wal_archive_lag_records Records between current LSN and last archived LSN."
1071 );
1072 let _ = writeln!(body, "# TYPE reddb_wal_archive_lag_records gauge");
1073 let _ = writeln!(body, "reddb_wal_archive_lag_records {}", lag);
1074
1075 let _ = writeln!(
1076 body,
1077 "# HELP reddb_metrics_remote_write_samples_accepted_total Metrics remote-write samples accepted since process start."
1078 );
1079 let _ = writeln!(
1080 body,
1081 "# TYPE reddb_metrics_remote_write_samples_accepted_total counter"
1082 );
1083 let _ = writeln!(
1084 body,
1085 "reddb_metrics_remote_write_samples_accepted_total {}",
1086 metrics_ingest.samples_accepted
1087 );
1088 let _ = writeln!(
1089 body,
1090 "# HELP reddb_metrics_remote_write_series_accepted_total Metrics remote-write series accepted since process start."
1091 );
1092 let _ = writeln!(
1093 body,
1094 "# TYPE reddb_metrics_remote_write_series_accepted_total counter"
1095 );
1096 let _ = writeln!(
1097 body,
1098 "reddb_metrics_remote_write_series_accepted_total {}",
1099 metrics_ingest.series_accepted
1100 );
1101 let _ = writeln!(
1102 body,
1103 "# HELP reddb_metrics_remote_write_samples_rejected_total Metrics remote-write samples rejected since process start."
1104 );
1105 let _ = writeln!(
1106 body,
1107 "# TYPE reddb_metrics_remote_write_samples_rejected_total counter"
1108 );
1109 let _ = writeln!(
1110 body,
1111 "reddb_metrics_remote_write_samples_rejected_total {}",
1112 metrics_ingest.samples_rejected
1113 );
1114 let _ = writeln!(
1115 body,
1116 "# HELP reddb_metrics_remote_write_series_rejected_total Metrics remote-write series rejected since process start."
1117 );
1118 let _ = writeln!(
1119 body,
1120 "# TYPE reddb_metrics_remote_write_series_rejected_total counter"
1121 );
1122 let _ = writeln!(
1123 body,
1124 "reddb_metrics_remote_write_series_rejected_total {}",
1125 metrics_ingest.series_rejected
1126 );
1127 let _ = writeln!(
1128 body,
1129 "# HELP reddb_metrics_remote_write_series_rejected_by_reason_total Metrics remote-write series rejected since process start by reason."
1130 );
1131 let _ = writeln!(
1132 body,
1133 "# TYPE reddb_metrics_remote_write_series_rejected_by_reason_total counter"
1134 );
1135 let _ = writeln!(
1136 body,
1137 "reddb_metrics_remote_write_series_rejected_by_reason_total{{reason=\"cardinality_budget\"}} {}",
1138 metrics_ingest.series_rejected_cardinality_budget
1139 );
1140 let _ = writeln!(
1141 body,
1142 "# HELP reddb_metrics_tenant_activity_total Metrics adapter requests by tenant, namespace, and operation since process start."
1143 );
1144 let _ = writeln!(body, "# TYPE reddb_metrics_tenant_activity_total counter");
1145 for activity in self.runtime.metrics_tenant_activity_snapshot() {
1146 let _ = writeln!(
1147 body,
1148 "reddb_metrics_tenant_activity_total{{tenant=\"{}\",namespace=\"{}\",operation=\"{}\"}} {}",
1149 sanitize_label(&activity.tenant),
1150 sanitize_label(&activity.namespace),
1151 sanitize_label(&activity.operation),
1152 activity.count
1153 );
1154 }
1155
1156 let replicas = self.runtime.primary_replica_snapshots();
1161 let _ = writeln!(
1162 body,
1163 "# HELP reddb_replica_count Currently registered replicas."
1164 );
1165 let _ = writeln!(body, "# TYPE reddb_replica_count gauge");
1166 let _ = writeln!(body, "reddb_replica_count {}", replicas.len());
1167 if !replicas.is_empty() {
1168 let replica_lag_budget_secs = std::env::var("RED_SLO_REPLICA_LAG_BUDGET_SECONDS")
1169 .ok()
1170 .and_then(|value| value.parse::<f64>().ok())
1171 .filter(|value| value.is_finite() && *value >= 0.0)
1172 .unwrap_or(60.0);
1173 let _ = writeln!(
1174 body,
1175 "# HELP reddb_replica_ack_lsn Most recent LSN acked by each replica."
1176 );
1177 let _ = writeln!(body, "# TYPE reddb_replica_ack_lsn gauge");
1178 for r in &replicas {
1179 let _ = writeln!(
1180 body,
1181 "reddb_replica_ack_lsn{{replica_id=\"{}\"}} {}",
1182 sanitize_label(&r.id),
1183 r.last_acked_lsn
1184 );
1185 }
1186 let _ = writeln!(
1187 body,
1188 "# HELP reddb_replica_lag_records Distance from primary current LSN to replica acked LSN."
1189 );
1190 let _ = writeln!(body, "# TYPE reddb_replica_lag_records gauge");
1191 for r in &replicas {
1192 let _ = writeln!(
1193 body,
1194 "reddb_replica_lag_records{{replica_id=\"{}\"}} {}",
1195 sanitize_label(&r.id),
1196 current_lsn.saturating_sub(r.last_acked_lsn)
1197 );
1198 }
1199 let _ = writeln!(
1200 body,
1201 "# HELP reddb_replica_lag_seconds Wall-clock seconds since the replica was last seen."
1202 );
1203 let _ = writeln!(body, "# TYPE reddb_replica_lag_seconds gauge");
1204 let _ = writeln!(
1205 body,
1206 "# HELP reddb_slo_lag_budget_remaining_seconds Remaining per-replica lag budget; negative means SLO breach."
1207 );
1208 let _ = writeln!(body, "# TYPE reddb_slo_lag_budget_remaining_seconds gauge");
1209 for r in &replicas {
1210 let lag_ms = (now_ms as u128).saturating_sub(r.last_seen_at_unix_ms);
1211 let lag_secs = (lag_ms as f64) / 1000.0;
1212 let _ = writeln!(
1213 body,
1214 "reddb_replica_lag_seconds{{replica_id=\"{}\"}} {}",
1215 sanitize_label(&r.id),
1216 lag_secs
1217 );
1218 let _ = writeln!(
1219 body,
1220 "reddb_slo_lag_budget_remaining_seconds{{replica_id=\"{}\"}} {}",
1221 sanitize_label(&r.id),
1222 replica_lag_budget_secs - lag_secs
1223 );
1224 }
1225 }
1226
1227 let _ = writeln!(
1233 body,
1234 "# HELP reddb_replica_apply_errors_total Replica WAL apply errors since process start, by kind."
1235 );
1236 let _ = writeln!(body, "# TYPE reddb_replica_apply_errors_total counter");
1237 for (kind, count) in self.runtime.replica_apply_error_counts() {
1238 let _ = writeln!(
1239 body,
1240 "reddb_replica_apply_errors_total{{kind=\"{}\"}} {}",
1241 kind.label(),
1242 count
1243 );
1244 }
1245 if let Some(health) = self.runtime.replica_apply_health() {
1246 let _ = writeln!(
1247 body,
1248 "# HELP reddb_replica_apply_health Replica apply state (label, value=1)."
1249 );
1250 let _ = writeln!(body, "# TYPE reddb_replica_apply_health gauge");
1251 let _ = writeln!(
1252 body,
1253 "reddb_replica_apply_health{{state=\"{}\"}} 1",
1254 sanitize_label(&health)
1255 );
1256 }
1257
1258 self.runtime.quota_bucket().evict_idle();
1263 let rejections = self.runtime.quota_bucket().rejection_snapshot();
1264 if !rejections.is_empty() {
1265 let _ = writeln!(
1266 body,
1267 "# HELP reddb_quota_rejected_total Requests rejected by per-caller QPS quota."
1268 );
1269 let _ = writeln!(body, "# TYPE reddb_quota_rejected_total counter");
1270 for (principal, count) in &rejections {
1271 let _ = writeln!(
1272 body,
1273 "reddb_quota_rejected_total{{principal=\"{}\"}} {}",
1274 sanitize_label(principal),
1275 count
1276 );
1277 }
1278 }
1279
1280 let (reached, timed_out, not_required, last_micros) =
1285 self.runtime.commit_waiter_metrics_snapshot();
1286 let _ = writeln!(
1287 body,
1288 "# HELP reddb_commit_wait_total Commit-wait outcomes by kind."
1289 );
1290 let _ = writeln!(body, "# TYPE reddb_commit_wait_total counter");
1291 let _ = writeln!(
1292 body,
1293 "reddb_commit_wait_total{{outcome=\"reached\"}} {}",
1294 reached
1295 );
1296 let _ = writeln!(
1297 body,
1298 "reddb_commit_wait_total{{outcome=\"timed_out\"}} {}",
1299 timed_out
1300 );
1301 let _ = writeln!(
1302 body,
1303 "reddb_commit_wait_total{{outcome=\"not_required\"}} {}",
1304 not_required
1305 );
1306 let _ = writeln!(
1307 body,
1308 "# HELP reddb_commit_wait_last_seconds Wall-clock seconds of the most recent commit wait."
1309 );
1310 let _ = writeln!(body, "# TYPE reddb_commit_wait_last_seconds gauge");
1311 let _ = writeln!(
1312 body,
1313 "reddb_commit_wait_last_seconds {}",
1314 (last_micros as f64) / 1_000_000.0
1315 );
1316
1317 let policy = self.runtime.commit_policy();
1322 let _ = writeln!(
1323 body,
1324 "# HELP reddb_primary_commit_policy Active commit policy on the primary."
1325 );
1326 let _ = writeln!(body, "# TYPE reddb_primary_commit_policy gauge");
1327 let _ = writeln!(
1328 body,
1329 "reddb_primary_commit_policy{{policy=\"{}\"}} 1",
1330 policy.label()
1331 );
1332
1333 let blob_ns = "runtime.result_cache";
1338 let _ = writeln!(
1339 body,
1340 "# HELP reddb_cache_blob_get_total Blob Cache get outcomes by namespace."
1341 );
1342 let _ = writeln!(body, "# TYPE reddb_cache_blob_get_total counter");
1343 let _ = writeln!(
1344 body,
1345 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l1\"}} {}",
1346 blob_ns,
1347 result_blob_stats.hits()
1348 );
1349 let _ = writeln!(
1350 body,
1351 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"hit_l2\"}} 0",
1352 blob_ns
1353 );
1354 let _ = writeln!(
1355 body,
1356 "reddb_cache_blob_get_total{{namespace=\"{}\",result=\"miss\"}} {}",
1357 blob_ns,
1358 result_blob_stats.misses()
1359 );
1360 let _ = writeln!(
1361 body,
1362 "# HELP reddb_cache_blob_put_total Blob Cache put outcomes by namespace."
1363 );
1364 let _ = writeln!(body, "# TYPE reddb_cache_blob_put_total counter");
1365 let _ = writeln!(
1366 body,
1367 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"ok\"}} {}",
1368 blob_ns,
1369 result_blob_stats.insertions()
1370 );
1371 let _ = writeln!(
1372 body,
1373 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"version_mismatch\"}} {}",
1374 blob_ns,
1375 result_blob_stats.version_mismatches()
1376 );
1377 let _ = writeln!(
1378 body,
1379 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"too_large\"}} 0",
1380 blob_ns
1381 );
1382 let _ = writeln!(
1383 body,
1384 "reddb_cache_blob_put_total{{namespace=\"{}\",outcome=\"metadata_too_large\"}} 0",
1385 blob_ns
1386 );
1387 let _ = writeln!(
1388 body,
1389 "# HELP reddb_cache_blob_invalidate_total Blob Cache invalidations by namespace and kind."
1390 );
1391 let _ = writeln!(body, "# TYPE reddb_cache_blob_invalidate_total counter");
1392 for (kind, count) in [
1393 ("key", 0),
1394 ("prefix", 0),
1395 ("tag", 0),
1396 ("dependency", result_blob_stats.invalidations()),
1397 ("namespace", result_blob_stats.namespace_flushes()),
1398 ] {
1399 let _ = writeln!(
1400 body,
1401 "reddb_cache_blob_invalidate_total{{namespace=\"{}\",kind=\"{}\"}} {}",
1402 blob_ns, kind, count
1403 );
1404 }
1405 let _ = writeln!(
1406 body,
1407 "# HELP reddb_cache_blob_evict_total Blob Cache evictions by namespace and reason."
1408 );
1409 let _ = writeln!(body, "# TYPE reddb_cache_blob_evict_total counter");
1410 for (reason, count) in [
1411 ("capacity", result_blob_stats.evictions()),
1412 ("expiry", result_blob_stats.expirations()),
1413 ("policy", 0),
1414 ] {
1415 let _ = writeln!(
1416 body,
1417 "reddb_cache_blob_evict_total{{namespace=\"{}\",reason=\"{}\"}} {}",
1418 blob_ns, reason, count
1419 );
1420 }
1421 let _ = writeln!(
1422 body,
1423 "# HELP reddb_cache_blob_l1_bytes_in_use L1 bytes currently used by Blob Cache namespace."
1424 );
1425 let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_bytes_in_use gauge");
1426 let _ = writeln!(
1427 body,
1428 "reddb_cache_blob_l1_bytes_in_use{{namespace=\"{}\"}} {}",
1429 blob_ns,
1430 result_blob_stats.bytes_in_use()
1431 );
1432 let _ = writeln!(
1433 body,
1434 "# HELP reddb_cache_blob_l1_entries L1 entries currently held by Blob Cache namespace."
1435 );
1436 let _ = writeln!(body, "# TYPE reddb_cache_blob_l1_entries gauge");
1437 let _ = writeln!(
1438 body,
1439 "reddb_cache_blob_l1_entries{{namespace=\"{}\"}} {}",
1440 blob_ns,
1441 result_blob_stats.entries()
1442 );
1443 let _ = writeln!(
1444 body,
1445 "# HELP reddb_cache_blob_l2_bytes_in_use L2 bytes currently used by Blob Cache namespace."
1446 );
1447 let _ = writeln!(body, "# TYPE reddb_cache_blob_l2_bytes_in_use gauge");
1448 let _ = writeln!(
1449 body,
1450 "reddb_cache_blob_l2_bytes_in_use{{namespace=\"{}\"}} {}",
1451 blob_ns,
1452 result_blob_stats.l2_bytes_in_use()
1453 );
1454 let _ = writeln!(
1455 body,
1456 "# HELP reddb_cache_blob_l2_full_rejections_total Blob Cache puts rejected because L2 is full."
1457 );
1458 let _ = writeln!(
1459 body,
1460 "# TYPE reddb_cache_blob_l2_full_rejections_total counter"
1461 );
1462 let _ = writeln!(
1463 body,
1464 "reddb_cache_blob_l2_full_rejections_total{{namespace=\"{}\"}} {}",
1465 blob_ns,
1466 result_blob_stats.l2_full_rejections()
1467 );
1468 let _ = writeln!(
1469 body,
1470 "# HELP reddb_cache_blob_version_mismatch_total Blob Cache CAS version mismatches by namespace."
1471 );
1472 let _ = writeln!(
1473 body,
1474 "# TYPE reddb_cache_blob_version_mismatch_total counter"
1475 );
1476 let _ = writeln!(
1477 body,
1478 "reddb_cache_blob_version_mismatch_total{{namespace=\"{}\"}} {}",
1479 blob_ns,
1480 result_blob_stats.version_mismatches()
1481 );
1482
1483 let _ = writeln!(
1484 body,
1485 "# HELP reddb_kv_ops_total Normal-KV operations since process start."
1486 );
1487 let _ = writeln!(body, "# TYPE reddb_kv_ops_total counter");
1488 for (verb, count) in [
1489 ("put", kv_stats.puts),
1490 ("get", kv_stats.gets),
1491 ("delete", kv_stats.deletes),
1492 ("incr", kv_stats.incrs),
1493 ] {
1494 let _ = writeln!(body, "reddb_kv_ops_total{{verb=\"{}\"}} {}", verb, count);
1495 }
1496 let _ = writeln!(
1497 body,
1498 "# HELP reddb_kv_cas_total Normal-KV CAS outcomes since process start."
1499 );
1500 let _ = writeln!(body, "# TYPE reddb_kv_cas_total counter");
1501 let _ = writeln!(
1502 body,
1503 "reddb_kv_cas_total{{outcome=\"success\"}} {}",
1504 kv_stats.cas_success
1505 );
1506 let _ = writeln!(
1507 body,
1508 "reddb_kv_cas_total{{outcome=\"conflict\"}} {}",
1509 kv_stats.cas_conflict
1510 );
1511 let _ = writeln!(
1512 body,
1513 "# HELP reddb_kv_watch_streams_active Active normal-KV WATCH streams."
1514 );
1515 let _ = writeln!(body, "# TYPE reddb_kv_watch_streams_active gauge");
1516 let _ = writeln!(
1517 body,
1518 "reddb_kv_watch_streams_active {}",
1519 kv_stats.watch_streams_active
1520 );
1521 let _ = writeln!(
1522 body,
1523 "# HELP reddb_kv_watch_events_emitted_total Normal-KV WATCH events emitted since process start."
1524 );
1525 let _ = writeln!(body, "# TYPE reddb_kv_watch_events_emitted_total counter");
1526 let _ = writeln!(
1527 body,
1528 "reddb_kv_watch_events_emitted_total {}",
1529 kv_stats.watch_events_emitted
1530 );
1531 let _ = writeln!(
1532 body,
1533 "# HELP reddb_kv_watch_drops_total Normal-KV WATCH events dropped by bounded subscriber buffers."
1534 );
1535 let _ = writeln!(body, "# TYPE reddb_kv_watch_drops_total counter");
1536 let _ = writeln!(body, "reddb_kv_watch_drops_total {}", kv_stats.watch_drops);
1537
1538 let _ = writeln!(
1539 body,
1540 "# HELP reddb_db_size_bytes On-disk size of the primary database file."
1541 );
1542 let _ = writeln!(body, "# TYPE reddb_db_size_bytes gauge");
1543 let _ = writeln!(body, "reddb_db_size_bytes {}", db_size_bytes);
1544
1545 if let Some(secs) = cold_start_secs {
1546 let _ = writeln!(
1547 body,
1548 "# HELP reddb_cold_start_duration_seconds Seconds from process start to /health/ready 200."
1549 );
1550 let _ = writeln!(body, "# TYPE reddb_cold_start_duration_seconds gauge");
1551 let _ = writeln!(body, "reddb_cold_start_duration_seconds {}", secs);
1552 }
1553
1554 let phases = lifecycle.cold_start_phases().durations_ms();
1560 if !phases.is_empty() {
1561 let _ = writeln!(
1562 body,
1563 "# HELP reddb_cold_start_phase_seconds Per-phase cold-start duration."
1564 );
1565 let _ = writeln!(body, "# TYPE reddb_cold_start_phase_seconds gauge");
1566 for (name, dur_ms) in phases {
1567 let _ = writeln!(
1568 body,
1569 "reddb_cold_start_phase_seconds{{phase=\"{}\"}} {}",
1570 name,
1571 (dur_ms as f64) / 1000.0
1572 );
1573 }
1574 }
1575
1576 let limits = self.runtime.resource_limits();
1581 if let Some(v) = limits.max_db_size_bytes {
1582 let _ = writeln!(
1583 body,
1584 "# HELP reddb_limit_db_size_bytes Operator-pinned cap on the primary DB file size."
1585 );
1586 let _ = writeln!(body, "# TYPE reddb_limit_db_size_bytes gauge");
1587 let _ = writeln!(body, "reddb_limit_db_size_bytes {}", v);
1588 }
1589 if let Some(v) = limits.max_connections {
1590 let _ = writeln!(body, "# TYPE reddb_limit_connections gauge");
1591 let _ = writeln!(body, "reddb_limit_connections {}", v);
1592 }
1593 if let Some(v) = limits.max_qps {
1594 let _ = writeln!(body, "# TYPE reddb_limit_qps gauge");
1595 let _ = writeln!(body, "reddb_limit_qps {}", v);
1596 }
1597 if let Some(v) = limits.max_batch_size {
1598 let _ = writeln!(body, "# TYPE reddb_limit_batch_size gauge");
1599 let _ = writeln!(body, "reddb_limit_batch_size {}", v);
1600 }
1601 if let Some(v) = limits.max_memory_bytes {
1602 let _ = writeln!(body, "# TYPE reddb_limit_memory_bytes gauge");
1603 let _ = writeln!(body, "reddb_limit_memory_bytes {}", v);
1604 }
1605
1606 {
1608 use crate::runtime::impl_queue::{
1609 EVENTS_DLQ_TOTAL, EVENTS_DRAIN_RETRIES_TOTAL, EVENTS_ENQUEUED_TOTAL,
1610 };
1611 let enqueued = EVENTS_ENQUEUED_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1612 let retries = EVENTS_DRAIN_RETRIES_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1613 let dlq_total = EVENTS_DLQ_TOTAL.load(std::sync::atomic::Ordering::Relaxed);
1614
1615 let _ = writeln!(
1616 body,
1617 "# HELP reddb_events_enqueued_total Total events successfully pushed to target queues."
1618 );
1619 let _ = writeln!(body, "# TYPE reddb_events_enqueued_total counter");
1620 let _ = writeln!(body, "reddb_events_enqueued_total {enqueued}");
1621
1622 let _ = writeln!(
1623 body,
1624 "# HELP reddb_events_drain_retries_total Total event push failures that triggered DLQ routing."
1625 );
1626 let _ = writeln!(body, "# TYPE reddb_events_drain_retries_total counter");
1627 let _ = writeln!(
1628 body,
1629 "reddb_events_drain_retries_total{{reason=\"queue_full\"}} {retries}"
1630 );
1631
1632 let _ = writeln!(
1633 body,
1634 "# HELP reddb_events_dlq_total Total events routed to dead-letter queues."
1635 );
1636 let _ = writeln!(body, "# TYPE reddb_events_dlq_total counter");
1637 let _ = writeln!(body, "reddb_events_dlq_total {dlq_total}");
1638 }
1639
1640 crate::runtime::ai::metrics::render_ai_metrics(&mut body);
1642
1643 HttpResponse {
1644 status: 200,
1645 content_type: "text/plain; version=0.0.4",
1646 body: body.into_bytes(),
1647 extra_headers: Vec::new(),
1648 }
1649 }
1650
1651 pub(crate) fn handle_admin_status(&self) -> HttpResponse {
1656 let lifecycle = self.runtime.lifecycle();
1657 let phase = lifecycle.phase();
1658 let now_ms = std::time::SystemTime::now()
1659 .duration_since(std::time::UNIX_EPOCH)
1660 .map(|d| d.as_millis() as u64)
1661 .unwrap_or(0);
1662 let uptime_secs = (now_ms.saturating_sub(lifecycle.started_at_ms()) as f64) / 1000.0;
1663 let read_only = self.runtime.write_gate().is_read_only();
1664 let role = match self.runtime.write_gate().role() {
1665 crate::replication::ReplicationRole::Standalone => "standalone",
1666 crate::replication::ReplicationRole::Primary => "primary",
1667 crate::replication::ReplicationRole::Replica { .. } => "replica",
1668 };
1669 let db = self.runtime.db();
1670 let db_size_bytes = db
1671 .path()
1672 .and_then(|p| std::fs::metadata(p).ok())
1673 .map(|m| m.len())
1674 .unwrap_or(0);
1675 let backend_kind = db
1676 .options()
1677 .remote_backend
1678 .as_ref()
1679 .map(|b| b.name().to_string());
1680
1681 let mut object = Map::new();
1682 object.insert(
1683 "version".to_string(),
1684 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
1685 );
1686 object.insert(
1687 "phase".to_string(),
1688 JsonValue::String(phase.as_str().to_string()),
1689 );
1690 object.insert(
1691 "uptime_secs".to_string(),
1692 JsonValue::Number((uptime_secs * 1000.0).round() / 1000.0),
1693 );
1694 object.insert(
1695 "started_at_unix_ms".to_string(),
1696 JsonValue::Number(lifecycle.started_at_ms() as f64),
1697 );
1698 if let Some(ready_at) = lifecycle.ready_at_ms() {
1699 object.insert(
1700 "ready_at_unix_ms".to_string(),
1701 JsonValue::Number(ready_at as f64),
1702 );
1703 }
1704 object.insert(
1705 "db_size_bytes".to_string(),
1706 JsonValue::Number(db_size_bytes as f64),
1707 );
1708 object.insert("read_only".to_string(), JsonValue::Bool(read_only));
1709 object.insert(
1710 "replication_role".to_string(),
1711 JsonValue::String(role.to_string()),
1712 );
1713 object.insert(
1714 "writer_lease".to_string(),
1715 JsonValue::String(self.runtime.write_gate().lease_state().label().to_string()),
1716 );
1717
1718 let (enc_state, enc_error) = self.runtime.encryption_at_rest_status();
1722 let mut enc_obj = Map::new();
1723 enc_obj.insert(
1724 "state".to_string(),
1725 JsonValue::String(enc_state.to_string()),
1726 );
1727 if let Some(err) = enc_error {
1728 enc_obj.insert("error".to_string(), JsonValue::String(err));
1729 }
1730 object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc_obj));
1731
1732 let backup = self.runtime.backup_status();
1736 let mut backup_obj = Map::new();
1737 if let Some(last) = backup.last_backup.as_ref() {
1738 backup_obj.insert(
1739 "last_success_unix_ms".to_string(),
1740 JsonValue::Number(last.timestamp as f64),
1741 );
1742 backup_obj.insert(
1743 "last_duration_ms".to_string(),
1744 JsonValue::Number(last.duration_ms as f64),
1745 );
1746 backup_obj.insert(
1747 "age_seconds".to_string(),
1748 JsonValue::Number(((now_ms.saturating_sub(last.timestamp)) as f64) / 1000.0),
1749 );
1750 }
1751 backup_obj.insert(
1752 "total_successes".to_string(),
1753 JsonValue::Number(backup.total_backups as f64),
1754 );
1755 backup_obj.insert(
1756 "total_failures".to_string(),
1757 JsonValue::Number(backup.total_failures as f64),
1758 );
1759 backup_obj.insert(
1760 "interval_secs".to_string(),
1761 JsonValue::Number(backup.interval_secs as f64),
1762 );
1763 object.insert("backup".to_string(), JsonValue::Object(backup_obj));
1764
1765 let (current_lsn, last_archived_lsn) = self.runtime.wal_archive_progress();
1767 let mut wal_obj = Map::new();
1768 wal_obj.insert(
1769 "current_lsn".to_string(),
1770 JsonValue::Number(current_lsn as f64),
1771 );
1772 wal_obj.insert(
1773 "last_archived_lsn".to_string(),
1774 JsonValue::Number(last_archived_lsn as f64),
1775 );
1776 wal_obj.insert(
1777 "archive_lag_records".to_string(),
1778 JsonValue::Number(current_lsn.saturating_sub(last_archived_lsn) as f64),
1779 );
1780 object.insert("wal".to_string(), JsonValue::Object(wal_obj));
1781
1782 let mut replica_obj = Map::new();
1787 if let Some(health) = self.runtime.replica_apply_health() {
1788 replica_obj.insert("apply_health".to_string(), JsonValue::String(health));
1789 }
1790 let mut errors_obj = Map::new();
1791 for (kind, count) in self.runtime.replica_apply_error_counts() {
1792 errors_obj.insert(kind.label().to_string(), JsonValue::Number(count as f64));
1793 }
1794 replica_obj.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
1795 let snaps = self.runtime.primary_replica_snapshots();
1797 if !snaps.is_empty() {
1798 let arr: Vec<JsonValue> = snaps
1799 .iter()
1800 .map(|r| {
1801 let mut o = Map::new();
1802 o.insert("id".to_string(), JsonValue::String(r.id.clone()));
1803 o.insert(
1804 "last_acked_lsn".to_string(),
1805 JsonValue::Number(r.last_acked_lsn as f64),
1806 );
1807 o.insert(
1808 "last_sent_lsn".to_string(),
1809 JsonValue::Number(r.last_sent_lsn as f64),
1810 );
1811 o.insert(
1812 "last_durable_lsn".to_string(),
1813 JsonValue::Number(r.last_durable_lsn as f64),
1814 );
1815 o.insert(
1816 "last_seen_at_unix_ms".to_string(),
1817 JsonValue::Number(r.last_seen_at_unix_ms as f64),
1818 );
1819 o.insert(
1820 "lag_records".to_string(),
1821 JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
1822 );
1823 if let Some(region) = &r.region {
1824 o.insert("region".to_string(), JsonValue::String(region.clone()));
1825 }
1826 JsonValue::Object(o)
1827 })
1828 .collect();
1829 replica_obj.insert("primary_view".to_string(), JsonValue::Array(arr));
1830 }
1831 replica_obj.insert(
1832 "commit_policy".to_string(),
1833 JsonValue::String(self.runtime.commit_policy().label().to_string()),
1834 );
1835 let durable = self.runtime.commit_waiter_snapshot();
1838 if !durable.is_empty() {
1839 let arr: Vec<JsonValue> = durable
1840 .into_iter()
1841 .map(|(id, lsn)| {
1842 let mut o = Map::new();
1843 o.insert("replica_id".to_string(), JsonValue::String(id));
1844 o.insert("durable_lsn".to_string(), JsonValue::Number(lsn as f64));
1845 JsonValue::Object(o)
1846 })
1847 .collect();
1848 replica_obj.insert("durable_view".to_string(), JsonValue::Array(arr));
1849 }
1850 object.insert("replica".to_string(), JsonValue::Object(replica_obj));
1851 if let Some(backend) = backend_kind {
1852 object.insert("remote_backend".to_string(), JsonValue::String(backend));
1853 }
1854 let limits = self.runtime.resource_limits();
1857 let mut limits_obj = Map::new();
1858 if let Some(v) = limits.max_db_size_bytes {
1859 limits_obj.insert("max_db_size_bytes".to_string(), JsonValue::Number(v as f64));
1860 }
1861 if let Some(v) = limits.max_connections {
1862 limits_obj.insert("max_connections".to_string(), JsonValue::Number(v as f64));
1863 }
1864 if let Some(v) = limits.max_qps {
1865 limits_obj.insert("max_qps".to_string(), JsonValue::Number(v as f64));
1866 }
1867 if let Some(v) = limits.max_batch_size {
1868 limits_obj.insert("max_batch_size".to_string(), JsonValue::Number(v as f64));
1869 }
1870 if let Some(v) = limits.max_memory_bytes {
1871 limits_obj.insert("max_memory_bytes".to_string(), JsonValue::Number(v as f64));
1872 }
1873 if let Some(d) = limits.max_query_duration {
1874 limits_obj.insert(
1875 "max_query_duration_ms".to_string(),
1876 JsonValue::Number(d.as_millis() as f64),
1877 );
1878 }
1879 if let Some(v) = limits.max_result_bytes {
1880 limits_obj.insert("max_result_bytes".to_string(), JsonValue::Number(v as f64));
1881 }
1882 object.insert("limits".to_string(), JsonValue::Object(limits_obj));
1883
1884 if let Some(report) = lifecycle.shutdown_report() {
1885 let mut shutdown_obj = Map::new();
1886 shutdown_obj.insert(
1887 "duration_ms".to_string(),
1888 JsonValue::Number(report.duration_ms as f64),
1889 );
1890 shutdown_obj.insert(
1891 "flushed_wal".to_string(),
1892 JsonValue::Bool(report.flushed_wal),
1893 );
1894 shutdown_obj.insert(
1895 "backup_uploaded".to_string(),
1896 JsonValue::Bool(report.backup_uploaded),
1897 );
1898 object.insert("shutdown".to_string(), JsonValue::Object(shutdown_obj));
1899 }
1900 json_response(200, JsonValue::Object(object))
1901 }
1902
1903 pub(crate) fn handle_admin_failover_promote(&self, body: Vec<u8>) -> HttpResponse {
1932 if !matches!(
1934 self.runtime.write_gate().role(),
1935 crate::replication::ReplicationRole::Replica { .. }
1936 ) {
1937 return json_error(
1938 409,
1939 "promotion only allowed on a replica (current role is not Replica)",
1940 );
1941 }
1942
1943 let Some(backend) = self.runtime.db().options().remote_backend_atomic.clone() else {
1945 return json_error(
1946 412,
1947 "promotion requires a CAS-capable remote backend (use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true)",
1948 );
1949 };
1950
1951 let health = self.runtime.replica_apply_health().unwrap_or_default();
1954 if matches!(
1955 health.as_str(),
1956 "stalled_gap" | "divergence" | "apply_error"
1957 ) {
1958 return json_error(
1959 409,
1960 format!(
1961 "promotion refused — replica apply state is `{health}`; resolve before promoting"
1962 ),
1963 );
1964 }
1965
1966 let (holder_id, ttl_ms) = if body.is_empty() {
1968 (default_holder_id(), 60_000u64)
1969 } else {
1970 match crate::serde_json::from_slice::<crate::serde_json::Value>(&body) {
1971 Ok(v) => {
1972 let holder = v
1973 .get("holder_id")
1974 .and_then(|n| n.as_str())
1975 .map(|s| s.to_string())
1976 .unwrap_or_else(default_holder_id);
1977 let ttl = v
1978 .get("ttl_ms")
1979 .and_then(|n| n.as_u64())
1980 .filter(|t| *t > 0)
1981 .unwrap_or(60_000);
1982 (holder, ttl)
1983 }
1984 Err(err) => return json_error(400, format!("invalid JSON body: {err}")),
1985 }
1986 };
1987
1988 let database_key = self
1989 .runtime
1990 .db()
1991 .options()
1992 .remote_key
1993 .clone()
1994 .unwrap_or_else(|| "main".to_string());
1995 let store = crate::replication::LeaseStore::new(backend);
1996
1997 match crate::runtime::lease_lifecycle::admin_promote_lease(
1998 &store,
1999 self.runtime.audit_log(),
2000 &database_key,
2001 &holder_id,
2002 ttl_ms,
2003 ) {
2004 Ok(lease) => {
2005 let mut object = Map::new();
2006 object.insert("ok".to_string(), JsonValue::Bool(true));
2007 object.insert("holder_id".to_string(), JsonValue::String(lease.holder_id));
2008 object.insert(
2009 "generation".to_string(),
2010 JsonValue::Number(lease.generation as f64),
2011 );
2012 object.insert(
2013 "acquired_at_ms".to_string(),
2014 JsonValue::Number(lease.acquired_at_ms as f64),
2015 );
2016 object.insert(
2017 "expires_at_ms".to_string(),
2018 JsonValue::Number(lease.expires_at_ms as f64),
2019 );
2020 object.insert(
2021 "next_step".to_string(),
2022 JsonValue::String(
2023 "restart with RED_REPLICATION_MODE=primary to start accepting writes"
2024 .to_string(),
2025 ),
2026 );
2027 json_response(200, JsonValue::Object(object))
2028 }
2029 Err(err) => json_error(409, format!("promotion refused: {err}")),
2030 }
2031 }
2032
2033 pub(crate) fn handle_admin_audit_query(
2053 &self,
2054 query: &std::collections::BTreeMap<String, String>,
2055 ) -> HttpResponse {
2056 use crate::runtime::audit_log::Outcome;
2057 use crate::runtime::audit_query::{
2058 events_to_json_array, parse_time_arg, run_query, AuditQuery,
2059 };
2060
2061 let mut q = AuditQuery::new();
2062 if let Some(s) = query.get("since") {
2063 q.since_ms = parse_time_arg(s);
2064 if q.since_ms.is_none() {
2065 return json_error(400, format!("invalid 'since' value: {s}"));
2066 }
2067 }
2068 if let Some(u) = query.get("until") {
2069 q.until_ms = parse_time_arg(u);
2070 if q.until_ms.is_none() {
2071 return json_error(400, format!("invalid 'until' value: {u}"));
2072 }
2073 }
2074 if let Some(p) = query.get("principal") {
2075 if !p.is_empty() {
2076 q.principal = Some(p.clone());
2077 }
2078 }
2079 if let Some(t) = query.get("tenant") {
2080 if !t.is_empty() {
2081 q.tenant = Some(t.clone());
2082 }
2083 }
2084 if let Some(a) = query.get("action") {
2085 if !a.is_empty() {
2086 q.action_prefix = Some(a.clone());
2087 }
2088 }
2089 if let Some(o) = query.get("outcome") {
2090 if let Some(parsed) = Outcome::parse(o) {
2091 q.outcome = Some(parsed);
2092 } else {
2093 return json_error(
2094 400,
2095 format!("invalid 'outcome' value: {o} (expected success|denied|error)"),
2096 );
2097 }
2098 }
2099 if let Some(l) = query.get("limit") {
2100 match l.parse::<usize>() {
2101 Ok(n) if n > 0 => q.limit = n.min(1000),
2102 _ => return json_error(400, format!("invalid 'limit' value: {l}")),
2103 }
2104 } else {
2105 q.limit = 100;
2106 }
2107
2108 let format = query
2109 .get("format")
2110 .map(|s| s.to_ascii_lowercase())
2111 .unwrap_or_default();
2112
2113 let path = self.runtime.audit_log().path().to_path_buf();
2114 let events = run_query(&path, &q);
2115
2116 if format == "jsonl" || format == "ndjson" {
2117 let mut body = String::new();
2118 for ev in &events {
2119 body.push_str(&ev.to_json_line(None));
2120 body.push('\n');
2121 }
2122 return HttpResponse {
2123 status: 200,
2124 content_type: "application/x-ndjson",
2125 body: body.into_bytes(),
2126 extra_headers: Vec::new(),
2127 };
2128 }
2129
2130 json_response(200, events_to_json_array(&events))
2131 }
2132
2133 pub(crate) fn handle_admin_drain(&self) -> HttpResponse {
2134 self.runtime.lifecycle().mark_draining();
2135 self.runtime.audit_log().record(
2136 "admin/drain",
2137 "operator",
2138 "instance",
2139 "ok",
2140 JsonValue::Null,
2141 );
2142 let mut object = Map::new();
2143 object.insert("ok".to_string(), JsonValue::Bool(true));
2144 object.insert(
2145 "phase".to_string(),
2146 JsonValue::String(self.runtime.lifecycle().phase().as_str().to_string()),
2147 );
2148 json_response(200, JsonValue::Object(object))
2149 }
2150
2151 pub(crate) fn handle_health_live(&self) -> HttpResponse {
2155 let phase = self.runtime.lifecycle().phase();
2156 let alive = !matches!(phase, Phase::Stopped);
2157 let status = if alive { 200 } else { 503 };
2158 let mut object = Map::new();
2159 object.insert(
2160 "status".to_string(),
2161 JsonValue::String(if alive { "alive" } else { "stopped" }.to_string()),
2162 );
2163 object.insert(
2164 "phase".to_string(),
2165 JsonValue::String(phase.as_str().to_string()),
2166 );
2167 json_response(status, JsonValue::Object(object))
2168 }
2169
2170 pub(crate) fn handle_health_ready(&self) -> HttpResponse {
2173 self.health_ready_response("ready")
2174 }
2175
2176 pub(crate) fn handle_health_startup(&self) -> HttpResponse {
2180 self.health_ready_response("startup")
2181 }
2182
2183 fn health_ready_response(&self, probe: &str) -> HttpResponse {
2184 let lifecycle = self.runtime.lifecycle();
2185 let phase = lifecycle.phase();
2186 let now = std::time::SystemTime::now()
2187 .duration_since(std::time::UNIX_EPOCH)
2188 .map(|d| d.as_millis() as u64)
2189 .unwrap_or(0);
2190 let started_at = lifecycle.started_at_ms();
2191 let since_secs = (now.saturating_sub(started_at) as f64) / 1000.0;
2192 let mut object = Map::new();
2193 object.insert("probe".to_string(), JsonValue::String(probe.to_string()));
2194 object.insert(
2195 "transport_listeners".to_string(),
2196 self.transport_readiness_json(),
2197 );
2198 object.insert(
2199 "phase".to_string(),
2200 JsonValue::String(phase.as_str().to_string()),
2201 );
2202 object.insert(
2203 "since_secs".to_string(),
2204 JsonValue::Number((since_secs * 1000.0).round() / 1000.0),
2205 );
2206 if let Some(ready_at) = lifecycle.ready_at_ms() {
2207 object.insert(
2208 "ready_at_unix_ms".to_string(),
2209 JsonValue::Number(ready_at as f64),
2210 );
2211 }
2212
2213 if phase.accepts_queries() {
2214 object.insert("status".to_string(), JsonValue::String("ready".to_string()));
2215 json_response(200, JsonValue::Object(object))
2216 } else {
2217 object.insert(
2218 "status".to_string(),
2219 JsonValue::String(phase.as_str().to_string()),
2220 );
2221 if let Some(reason) = lifecycle.not_ready_reason() {
2222 object.insert("reason".to_string(), JsonValue::String(reason));
2223 } else {
2224 object.insert(
2225 "reason".to_string(),
2226 JsonValue::String(match phase {
2227 Phase::Starting => "starting".to_string(),
2228 Phase::ShuttingDown => "shutting_down".to_string(),
2229 Phase::Stopped => "stopped".to_string(),
2230 Phase::Draining => "draining".to_string(),
2231 Phase::Ready => "ready".to_string(),
2232 }),
2233 );
2234 }
2235 json_response(503, JsonValue::Object(object))
2236 }
2237 }
2238
2239 fn iam_audit(&self, action: &str, target: &str, outcome: &str) {
2244 self.runtime
2245 .audit_log()
2246 .record(action, "operator", target, outcome, JsonValue::Null);
2247 }
2248
2249 pub(crate) fn handle_iam_policy_put(&self, id: &str, body: Vec<u8>) -> HttpResponse {
2251 let Some(store) = self.auth_store.as_ref() else {
2252 return json_error(503, "auth store not configured");
2253 };
2254 let Ok(text) = std::str::from_utf8(&body) else {
2255 return json_error(400, "body must be utf-8 JSON");
2256 };
2257 let mut policy = match crate::auth::policies::Policy::from_json_str(text) {
2258 Ok(p) => p,
2259 Err(e) => return json_error(400, format!("policy parse: {e}")),
2260 };
2261 if policy.id != id {
2262 policy.id = id.to_string();
2263 }
2264 if let Err(e) = store.put_policy(policy) {
2265 return json_error(400, e.to_string());
2266 }
2267 self.runtime.invalidate_result_cache();
2268 self.iam_audit("iam/policy.put", id, "ok");
2269 let mut obj = Map::new();
2270 obj.insert("ok".to_string(), JsonValue::Bool(true));
2271 obj.insert("id".to_string(), JsonValue::String(id.to_string()));
2272 json_response(200, JsonValue::Object(obj))
2273 }
2274
2275 pub(crate) fn handle_iam_policy_get(&self, id: &str) -> HttpResponse {
2277 let Some(store) = self.auth_store.as_ref() else {
2278 return json_error(503, "auth store not configured");
2279 };
2280 let Some(p) = store.get_policy(id) else {
2281 return json_error(404, format!("policy `{id}` not found"));
2282 };
2283 let body = p.to_json_string();
2284 HttpResponse {
2285 status: 200,
2286 content_type: "application/json",
2287 body: body.into_bytes(),
2288 extra_headers: Vec::new(),
2289 }
2290 }
2291
2292 pub(crate) fn handle_iam_policy_list(&self) -> HttpResponse {
2294 let Some(store) = self.auth_store.as_ref() else {
2295 return json_error(503, "auth store not configured");
2296 };
2297 let pols = store.list_policies();
2298 let items: Vec<JsonValue> = pols
2299 .iter()
2300 .map(|p| {
2301 let mut obj = Map::new();
2302 obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2303 obj.insert("version".to_string(), JsonValue::Number(p.version as f64));
2304 obj.insert(
2305 "statements".to_string(),
2306 JsonValue::Number(p.statements.len() as f64),
2307 );
2308 obj.insert(
2309 "tenant".to_string(),
2310 p.tenant
2311 .as_deref()
2312 .map(|t| JsonValue::String(t.to_string()))
2313 .unwrap_or(JsonValue::Null),
2314 );
2315 JsonValue::Object(obj)
2316 })
2317 .collect();
2318 let mut envelope = Map::new();
2319 envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2320 envelope.insert("items".to_string(), JsonValue::Array(items));
2321 json_response(200, JsonValue::Object(envelope))
2322 }
2323
2324 pub(crate) fn handle_iam_policy_delete(&self, id: &str) -> HttpResponse {
2326 let Some(store) = self.auth_store.as_ref() else {
2327 return json_error(503, "auth store not configured");
2328 };
2329 match store.delete_policy(id) {
2330 Ok(()) => {
2331 self.runtime.invalidate_result_cache();
2332 self.iam_audit("iam/policy.drop", id, "ok");
2333 HttpResponse {
2334 status: 204,
2335 content_type: "application/json",
2336 body: Vec::new(),
2337 extra_headers: Vec::new(),
2338 }
2339 }
2340 Err(e) => json_error(404, e.to_string()),
2341 }
2342 }
2343
2344 pub(crate) fn handle_iam_attach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2347 let Some(store) = self.auth_store.as_ref() else {
2348 return json_error(503, "auth store not configured");
2349 };
2350 let uid = decode_user_arg(user);
2351 match store.attach_policy(
2352 crate::auth::store::PrincipalRef::User(uid.clone()),
2353 policy_id,
2354 ) {
2355 Ok(()) => {
2356 self.runtime.invalidate_result_cache();
2357 self.iam_audit(
2358 "iam/policy.attach",
2359 &format!("user:{uid}::{policy_id}"),
2360 "ok",
2361 );
2362 let mut obj = Map::new();
2363 obj.insert("ok".to_string(), JsonValue::Bool(true));
2364 json_response(200, JsonValue::Object(obj))
2365 }
2366 Err(e) => json_error(400, e.to_string()),
2367 }
2368 }
2369
2370 pub(crate) fn handle_iam_detach_user(&self, user: &str, policy_id: &str) -> HttpResponse {
2372 let Some(store) = self.auth_store.as_ref() else {
2373 return json_error(503, "auth store not configured");
2374 };
2375 let uid = decode_user_arg(user);
2376 match store.detach_policy(
2377 crate::auth::store::PrincipalRef::User(uid.clone()),
2378 policy_id,
2379 ) {
2380 Ok(()) => {
2381 self.runtime.invalidate_result_cache();
2382 self.iam_audit(
2383 "iam/policy.detach",
2384 &format!("user:{uid}::{policy_id}"),
2385 "ok",
2386 );
2387 HttpResponse {
2388 status: 204,
2389 content_type: "application/json",
2390 body: Vec::new(),
2391 extra_headers: Vec::new(),
2392 }
2393 }
2394 Err(e) => json_error(400, e.to_string()),
2395 }
2396 }
2397
2398 pub(crate) fn handle_iam_add_user_group(&self, user: &str, group: &str) -> HttpResponse {
2400 let Some(store) = self.auth_store.as_ref() else {
2401 return json_error(503, "auth store not configured");
2402 };
2403 let uid = decode_user_arg(user);
2404 match store.add_user_to_group(&uid, group) {
2405 Ok(()) => {
2406 self.runtime.invalidate_result_cache();
2407 self.iam_audit("iam/group.add", &format!("user:{uid}::group:{group}"), "ok");
2408 let mut obj = Map::new();
2409 obj.insert("ok".to_string(), JsonValue::Bool(true));
2410 json_response(200, JsonValue::Object(obj))
2411 }
2412 Err(e) => json_error(400, e.to_string()),
2413 }
2414 }
2415
2416 pub(crate) fn handle_iam_remove_user_group(&self, user: &str, group: &str) -> HttpResponse {
2418 let Some(store) = self.auth_store.as_ref() else {
2419 return json_error(503, "auth store not configured");
2420 };
2421 let uid = decode_user_arg(user);
2422 match store.remove_user_from_group(&uid, group) {
2423 Ok(()) => {
2424 self.runtime.invalidate_result_cache();
2425 self.iam_audit(
2426 "iam/group.remove",
2427 &format!("user:{uid}::group:{group}"),
2428 "ok",
2429 );
2430 HttpResponse {
2431 status: 204,
2432 content_type: "application/json",
2433 body: Vec::new(),
2434 extra_headers: Vec::new(),
2435 }
2436 }
2437 Err(e) => json_error(400, e.to_string()),
2438 }
2439 }
2440
2441 pub(crate) fn handle_iam_attach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2443 let Some(store) = self.auth_store.as_ref() else {
2444 return json_error(503, "auth store not configured");
2445 };
2446 match store.attach_policy(
2447 crate::auth::store::PrincipalRef::Group(group.to_string()),
2448 policy_id,
2449 ) {
2450 Ok(()) => {
2451 self.runtime.invalidate_result_cache();
2452 self.iam_audit(
2453 "iam/policy.attach",
2454 &format!("group:{group}::{policy_id}"),
2455 "ok",
2456 );
2457 let mut obj = Map::new();
2458 obj.insert("ok".to_string(), JsonValue::Bool(true));
2459 json_response(200, JsonValue::Object(obj))
2460 }
2461 Err(e) => json_error(400, e.to_string()),
2462 }
2463 }
2464
2465 pub(crate) fn handle_iam_detach_group(&self, group: &str, policy_id: &str) -> HttpResponse {
2467 let Some(store) = self.auth_store.as_ref() else {
2468 return json_error(503, "auth store not configured");
2469 };
2470 match store.detach_policy(
2471 crate::auth::store::PrincipalRef::Group(group.to_string()),
2472 policy_id,
2473 ) {
2474 Ok(()) => {
2475 self.runtime.invalidate_result_cache();
2476 self.iam_audit(
2477 "iam/policy.detach",
2478 &format!("group:{group}::{policy_id}"),
2479 "ok",
2480 );
2481 HttpResponse {
2482 status: 204,
2483 content_type: "application/json",
2484 body: Vec::new(),
2485 extra_headers: Vec::new(),
2486 }
2487 }
2488 Err(e) => json_error(400, e.to_string()),
2489 }
2490 }
2491
2492 pub(crate) fn handle_iam_effective_permissions(
2494 &self,
2495 user: &str,
2496 query: &std::collections::BTreeMap<String, String>,
2497 ) -> HttpResponse {
2498 let Some(store) = self.auth_store.as_ref() else {
2499 return json_error(503, "auth store not configured");
2500 };
2501 let uid = decode_user_arg(user);
2502 let pols = store.effective_policies(&uid);
2503
2504 let resource_echo = query.get("resource").cloned();
2509 let items: Vec<JsonValue> = pols
2510 .iter()
2511 .map(|p| {
2512 let mut obj = Map::new();
2513 obj.insert("id".to_string(), JsonValue::String(p.id.clone()));
2514 obj.insert(
2515 "statements".to_string(),
2516 JsonValue::Number(p.statements.len() as f64),
2517 );
2518 JsonValue::Object(obj)
2519 })
2520 .collect();
2521 let mut envelope = Map::new();
2522 envelope.insert("user".to_string(), JsonValue::String(uid.to_string()));
2523 if let Some(r) = resource_echo {
2524 envelope.insert("resource".to_string(), JsonValue::String(r));
2525 }
2526 envelope.insert("count".to_string(), JsonValue::Number(items.len() as f64));
2527 envelope.insert("policies".to_string(), JsonValue::Array(items));
2528 json_response(200, JsonValue::Object(envelope))
2529 }
2530
2531 pub(crate) fn handle_iam_simulate(&self, body: Vec<u8>) -> HttpResponse {
2534 let Some(store) = self.auth_store.as_ref() else {
2535 return json_error(503, "auth store not configured");
2536 };
2537 let parsed = match crate::serde_json::from_str::<crate::serde_json::Value>(
2538 std::str::from_utf8(&body).unwrap_or(""),
2539 ) {
2540 Ok(v) => v,
2541 Err(e) => return json_error(400, format!("invalid JSON body: {e}")),
2542 };
2543 let obj = match parsed.as_object() {
2544 Some(o) => o,
2545 None => return json_error(400, "body must be a JSON object"),
2546 };
2547 let principal = match obj.get("principal").and_then(|v| v.as_str()) {
2548 Some(s) => decode_user_arg(s),
2549 None => return json_error(400, "missing `principal`"),
2550 };
2551 let action = match obj.get("action").and_then(|v| v.as_str()) {
2552 Some(s) => s.to_string(),
2553 None => return json_error(400, "missing `action`"),
2554 };
2555 let resource = match obj.get("resource") {
2556 Some(JsonValue::Object(r)) => {
2557 let kind = r
2558 .get("kind")
2559 .and_then(|v| v.as_str())
2560 .unwrap_or("")
2561 .to_string();
2562 let name = r
2563 .get("name")
2564 .and_then(|v| v.as_str())
2565 .unwrap_or("")
2566 .to_string();
2567 if kind.is_empty() || name.is_empty() {
2568 return json_error(400, "resource needs kind+name");
2569 }
2570 let mut rr = crate::auth::policies::ResourceRef::new(kind, name);
2571 if let Some(t) = r.get("tenant").and_then(|v| v.as_str()) {
2572 rr = rr.with_tenant(t.to_string());
2573 }
2574 rr
2575 }
2576 Some(JsonValue::String(s)) => match s.split_once(':') {
2577 Some((k, n)) => crate::auth::policies::ResourceRef::new(k, n),
2578 None => return json_error(400, "resource string must be `kind:name`"),
2579 },
2580 _ => return json_error(400, "missing `resource`"),
2581 };
2582 let mut sim_ctx = crate::auth::store::SimCtx::default();
2583 if let Some(c) = obj.get("ctx").and_then(|v| v.as_object()) {
2584 if let Some(t) = c.get("current_tenant").and_then(|v| v.as_str()) {
2585 sim_ctx.current_tenant = Some(t.to_string());
2586 }
2587 if let Some(true) = c.get("mfa").and_then(|v| v.as_bool()) {
2588 sim_ctx.mfa_present = true;
2589 }
2590 if let Some(ip) = c
2591 .get("source_ip")
2592 .or_else(|| c.get("peer_ip"))
2593 .and_then(|v| v.as_str())
2594 {
2595 if let Ok(addr) = ip.parse() {
2596 sim_ctx.peer_ip = Some(addr);
2597 }
2598 }
2599 if let Some(ms) = c.get("now_ms").and_then(|v| v.as_u64()) {
2600 sim_ctx.now_ms = Some(ms as u128);
2601 }
2602 }
2603 let outcome = store.simulate(&principal, &action, &resource, sim_ctx);
2604 let (decision_str, matched_pid, matched_sid) =
2605 crate::runtime::impl_core::decision_to_strings(&outcome.decision);
2606
2607 self.iam_audit("iam/policy.simulate", &principal.to_string(), &decision_str);
2608
2609 let mut envelope = Map::new();
2610 envelope.insert("decision".to_string(), JsonValue::String(decision_str));
2611 envelope.insert(
2612 "matched_policy_id".to_string(),
2613 matched_pid
2614 .map(JsonValue::String)
2615 .unwrap_or(JsonValue::Null),
2616 );
2617 envelope.insert(
2618 "matched_sid".to_string(),
2619 matched_sid
2620 .map(JsonValue::String)
2621 .unwrap_or(JsonValue::Null),
2622 );
2623 envelope.insert("reason".to_string(), JsonValue::String(outcome.reason));
2624 let trail: Vec<JsonValue> = outcome
2625 .trail
2626 .into_iter()
2627 .map(|t| {
2628 let mut obj = Map::new();
2629 obj.insert("policy_id".to_string(), JsonValue::String(t.policy_id));
2630 obj.insert(
2631 "sid".to_string(),
2632 t.sid.map(JsonValue::String).unwrap_or(JsonValue::Null),
2633 );
2634 obj.insert("matched".to_string(), JsonValue::Bool(t.matched));
2635 obj.insert(
2636 "effect".to_string(),
2637 JsonValue::String(
2638 match t.effect {
2639 crate::auth::policies::Effect::Allow => "allow",
2640 crate::auth::policies::Effect::Deny => "deny",
2641 }
2642 .to_string(),
2643 ),
2644 );
2645 obj.insert(
2646 "why_skipped".to_string(),
2647 t.why_skipped
2648 .map(|s| JsonValue::String(s.to_string()))
2649 .unwrap_or(JsonValue::Null),
2650 );
2651 JsonValue::Object(obj)
2652 })
2653 .collect();
2654 envelope.insert("trail".to_string(), JsonValue::Array(trail));
2655 json_response(200, JsonValue::Object(envelope))
2656 }
2657}
2658
2659fn decode_user_arg(raw: &str) -> crate::auth::UserId {
2660 if let Some((tenant, name)) = raw.split_once('/') {
2663 return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2664 }
2665 if let Some((tenant, name)) = raw.split_once('.') {
2666 return crate::auth::UserId::scoped(tenant.to_string(), name.to_string());
2667 }
2668 crate::auth::UserId::platform(raw.to_string())
2669}
2670
2671#[cfg(test)]
2672mod tests {
2673 use super::*;
2674
2675 #[test]
2676 fn metrics_expose_result_blob_cache_label_set() {
2677 let runtime =
2678 crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2679 .expect("runtime");
2680 runtime
2681 .db()
2682 .store()
2683 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
2684
2685 runtime.execute_query("SELECT 1").expect("populate miss");
2686 runtime.execute_query("SELECT 1").expect("blob hit");
2687 runtime.invalidate_result_cache();
2688
2689 let server = RedDBServer::new(runtime);
2690 let response = server.handle_metrics();
2691 let body = String::from_utf8(response.body).expect("utf8 metrics");
2692
2693 for needle in [
2694 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l1\"}",
2695 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"hit_l2\"}",
2696 "reddb_cache_blob_get_total{namespace=\"runtime.result_cache\",result=\"miss\"}",
2697 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"ok\"}",
2698 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"version_mismatch\"}",
2699 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"too_large\"}",
2700 "reddb_cache_blob_put_total{namespace=\"runtime.result_cache\",outcome=\"metadata_too_large\"}",
2701 "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"dependency\"}",
2702 "reddb_cache_blob_invalidate_total{namespace=\"runtime.result_cache\",kind=\"namespace\"}",
2703 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"capacity\"}",
2704 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"expiry\"}",
2705 "reddb_cache_blob_evict_total{namespace=\"runtime.result_cache\",reason=\"policy\"}",
2706 "reddb_cache_blob_l1_bytes_in_use{namespace=\"runtime.result_cache\"}",
2707 "reddb_cache_blob_l1_entries{namespace=\"runtime.result_cache\"}",
2708 "reddb_cache_blob_l2_bytes_in_use{namespace=\"runtime.result_cache\"}",
2709 "reddb_cache_blob_l2_full_rejections_total{namespace=\"runtime.result_cache\"}",
2710 "reddb_cache_blob_version_mismatch_total{namespace=\"runtime.result_cache\"}",
2711 ] {
2712 assert!(body.contains(needle), "missing metric line for {needle}");
2713 }
2714 }
2715
2716 fn test_server() -> RedDBServer {
2721 let runtime =
2722 crate::runtime::RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory())
2723 .expect("runtime");
2724 RedDBServer::new(runtime)
2725 }
2726
2727 fn parse_body(resp: &HttpResponse) -> JsonValue {
2728 let s = std::str::from_utf8(&resp.body).expect("utf8 body");
2729 crate::serde_json::from_str::<JsonValue>(s).expect("JSON body")
2730 }
2731
2732 #[test]
2733 fn admin_blob_cache_sweep_happy_path_returns_well_formed_report() {
2734 let server = test_server();
2735 let body = br#"{"limit_entries": 100, "limit_millis": 50}"#.to_vec();
2736 let resp = server.handle_admin_blob_cache_sweep(body);
2737 assert_eq!(resp.status, 200);
2738 let parsed = parse_body(&resp);
2739 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2740 for field in [
2743 "entries_scanned",
2744 "entries_evicted",
2745 "bytes_reclaimed",
2746 "elapsed_ms",
2747 "truncated_due_to_limit",
2748 ] {
2749 assert!(
2750 parsed.get(field).is_some(),
2751 "missing field {field} in response: {parsed:?}"
2752 );
2753 }
2754 }
2755
2756 #[test]
2757 fn admin_blob_cache_sweep_empty_body_uses_unbounded_default() {
2758 let server = test_server();
2759 let resp = server.handle_admin_blob_cache_sweep(Vec::new());
2760 assert_eq!(resp.status, 200);
2761 let parsed = parse_body(&resp);
2762 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2763 }
2764
2765 #[test]
2766 fn admin_blob_cache_sweep_invalid_json_returns_400() {
2767 let server = test_server();
2768 let resp = server.handle_admin_blob_cache_sweep(b"not json".to_vec());
2769 assert_eq!(resp.status, 400);
2770 }
2771
2772 #[test]
2773 fn admin_blob_cache_flush_namespace_happy_path() {
2774 let server = test_server();
2775 let body = br#"{"namespace": "tenant-42:results"}"#.to_vec();
2776 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2777 assert_eq!(resp.status, 200);
2778 let parsed = parse_body(&resp);
2779 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
2780 assert_eq!(
2781 parsed.get("namespace").and_then(|v| v.as_str()),
2782 Some("tenant-42:results")
2783 );
2784 assert!(parsed.get("elapsed_micros").is_some());
2785 assert!(parsed.get("generation_before").is_some());
2786 assert!(parsed.get("generation_after").is_some());
2787 }
2788
2789 #[test]
2790 fn admin_blob_cache_flush_namespace_missing_body_returns_400() {
2791 let server = test_server();
2792 let resp = server.handle_admin_blob_cache_flush_namespace(Vec::new());
2793 assert_eq!(resp.status, 400);
2794 let parsed = parse_body(&resp);
2795 assert!(parsed
2796 .get("error")
2797 .and_then(|v| v.as_str())
2798 .map(|s| s.contains("namespace"))
2799 .unwrap_or(false));
2800 }
2801
2802 #[test]
2803 fn admin_blob_cache_flush_namespace_missing_field_returns_400() {
2804 let server = test_server();
2805 let body = br#"{"other": "x"}"#.to_vec();
2806 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2807 assert_eq!(resp.status, 400);
2808 }
2809
2810 #[test]
2811 fn admin_blob_cache_flush_namespace_empty_string_returns_400() {
2812 let server = test_server();
2813 let body = br#"{"namespace": ""}"#.to_vec();
2814 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2815 assert_eq!(resp.status, 400);
2816 }
2817
2818 #[test]
2819 fn admin_blob_cache_flush_namespace_rejects_crlf_smuggling_attempt() {
2820 let server = test_server();
2821 let body = br#"{"namespace": "real-ns\r\nfake-audit: spliced"}"#.to_vec();
2824 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2825 assert_eq!(resp.status, 400);
2826 let parsed = parse_body(&resp);
2827 let msg = parsed
2828 .get("error")
2829 .and_then(|v| v.as_str())
2830 .unwrap_or_default();
2831 assert!(msg.contains("CR/LF"), "unexpected error: {msg}");
2832 }
2833
2834 #[test]
2835 fn admin_blob_cache_flush_namespace_rejects_nul_byte() {
2836 let server = test_server();
2837 let body = b"{\"namespace\": \"with-nul-\\u0000-here\"}".to_vec();
2842 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2843 assert_eq!(resp.status, 400);
2844 let parsed = parse_body(&resp);
2845 let msg = parsed
2846 .get("error")
2847 .and_then(|v| v.as_str())
2848 .unwrap_or_default();
2849 assert!(msg.contains("NUL"), "unexpected error: {msg}");
2850 }
2851
2852 #[test]
2853 fn admin_blob_cache_flush_namespace_response_round_trips_unicode() {
2854 let server = test_server();
2855 let body = r#"{"namespace": "日本語-ns-🦀"}"#.as_bytes().to_vec();
2856 let resp = server.handle_admin_blob_cache_flush_namespace(body);
2857 assert_eq!(resp.status, 200);
2858 let parsed = parse_body(&resp);
2859 assert_eq!(
2860 parsed.get("namespace").and_then(|v| v.as_str()),
2861 Some("日本語-ns-🦀")
2862 );
2863 }
2864
2865 fn cas_body(namespace: &str, key: &str, new_value: &[u8], new_version: u64) -> Vec<u8> {
2870 let b64 = {
2871 let mut s = String::new();
2872 for chunk in new_value.chunks(3) {
2873 const CHARS: &[u8] =
2874 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
2875 let b0 = chunk[0] as u32;
2876 let b1 = chunk.get(1).copied().unwrap_or(0) as u32;
2877 let b2 = chunk.get(2).copied().unwrap_or(0) as u32;
2878 let n = (b0 << 16) | (b1 << 8) | b2;
2879 s.push(CHARS[((n >> 18) & 63) as usize] as char);
2880 s.push(CHARS[((n >> 12) & 63) as usize] as char);
2881 s.push(if chunk.len() > 1 {
2882 CHARS[((n >> 6) & 63) as usize] as char
2883 } else {
2884 '='
2885 });
2886 s.push(if chunk.len() > 2 {
2887 CHARS[(n & 63) as usize] as char
2888 } else {
2889 '='
2890 });
2891 }
2892 s
2893 };
2894 format!(
2895 r#"{{"namespace":"{namespace}","key":"{key}","expected_version":0,"new_value_b64":"{b64}","new_version":{new_version}}}"#
2896 )
2897 .into_bytes()
2898 }
2899
2900 #[test]
2901 fn cas_happy_first_write() {
2902 let server = test_server();
2903 let body = cas_body("ns1", "k1", b"hello", 1);
2904 let resp = server.handle_admin_blob_cache_compare_and_set(body);
2905 assert_eq!(resp.status, 200);
2906 let parsed = parse_body(&resp);
2907 assert_eq!(
2908 parsed.get("committed").and_then(|v| v.as_bool()),
2909 Some(true)
2910 );
2911 assert_eq!(
2912 parsed.get("current_version").and_then(|v| v.as_u64()),
2913 Some(1)
2914 );
2915 }
2916
2917 #[test]
2918 fn cas_happy_update_increments_version() {
2919 let server = test_server();
2920 server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v1", 1));
2922 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns2", "k2", b"v2", 2));
2924 assert_eq!(resp.status, 200);
2925 let parsed = parse_body(&resp);
2926 assert_eq!(
2927 parsed.get("committed").and_then(|v| v.as_bool()),
2928 Some(true)
2929 );
2930 assert_eq!(
2931 parsed.get("current_version").and_then(|v| v.as_u64()),
2932 Some(2)
2933 );
2934 }
2935
2936 #[test]
2937 fn cas_conflict_same_version_returns_409() {
2938 let server = test_server();
2939 server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v1", 5));
2941 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns3", "k3", b"v2", 5));
2943 assert_eq!(resp.status, 409);
2944 let parsed = parse_body(&resp);
2945 assert_eq!(
2946 parsed.get("committed").and_then(|v| v.as_bool()),
2947 Some(false)
2948 );
2949 assert_eq!(
2950 parsed.get("reason").and_then(|v| v.as_str()),
2951 Some("VersionMismatch")
2952 );
2953 }
2954
2955 #[test]
2956 fn cas_stale_expected_version_returns_409() {
2957 let server = test_server();
2958 server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v1", 10));
2960 let resp = server.handle_admin_blob_cache_compare_and_set(cas_body("ns4", "k4", b"v2", 9));
2962 assert_eq!(resp.status, 409);
2963 let parsed = parse_body(&resp);
2964 assert_eq!(
2965 parsed.get("current_version").and_then(|v| v.as_u64()),
2966 Some(10)
2967 );
2968 }
2969
2970 #[test]
2971 fn cas_crlf_in_namespace_returns_400() {
2972 let server = test_server();
2973 let body = b"{\"namespace\":\"real\\r\\ninjected\",\"key\":\"k\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
2975 let resp = server.handle_admin_blob_cache_compare_and_set(body);
2976 assert_eq!(resp.status, 400);
2977 let parsed = parse_body(&resp);
2978 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
2979 assert!(msg.contains("CR/LF"), "expected CR/LF error, got: {msg}");
2980 }
2981
2982 #[test]
2983 fn cas_nul_in_key_returns_400() {
2984 let server = test_server();
2985 let body = b"{\"namespace\":\"ns\",\"key\":\"k\\u0000nul\",\"expected_version\":0,\"new_value_b64\":\"aGk=\",\"new_version\":1}".to_vec();
2986 let resp = server.handle_admin_blob_cache_compare_and_set(body);
2987 assert_eq!(resp.status, 400);
2988 let parsed = parse_body(&resp);
2989 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
2990 assert!(msg.contains("NUL"), "expected NUL error, got: {msg}");
2991 }
2992
2993 #[test]
2994 fn cas_bad_base64_returns_400() {
2995 let server = test_server();
2996 let body = br#"{"namespace":"ns","key":"k","expected_version":0,"new_value_b64":"!!!invalid!!!","new_version":1}"#.to_vec();
2997 let resp = server.handle_admin_blob_cache_compare_and_set(body);
2998 assert_eq!(resp.status, 400);
2999 let parsed = parse_body(&resp);
3000 let msg = parsed.get("error").and_then(|v| v.as_str()).unwrap_or("");
3001 assert!(msg.contains("base64"), "expected base64 error, got: {msg}");
3002 }
3003
3004 #[test]
3005 fn cas_missing_bearer_returns_401_via_route() {
3006 use std::sync::Mutex;
3007 static GUARD: Mutex<()> = Mutex::new(());
3008 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3009
3010 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3011 unsafe {
3012 std::env::set_var("RED_ADMIN_TOKEN", "test-token-195");
3013 }
3014
3015 let server = test_server();
3016 let req = crate::server::transport::HttpRequest {
3017 method: "POST".to_string(),
3018 path: "/admin/cache/compare-and-set".to_string(),
3019 query: std::collections::BTreeMap::new(),
3020 headers: std::collections::BTreeMap::new(),
3021 body: cas_body("ns", "k", b"v", 1),
3022 };
3023 let resp = server.route(req);
3024 assert_eq!(resp.status, 401);
3025
3026 unsafe {
3027 match prev {
3028 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3029 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3030 }
3031 }
3032 }
3033
3034 #[test]
3035 fn cas_wrong_bearer_returns_401_via_route() {
3036 use std::sync::Mutex;
3037 static GUARD: Mutex<()> = Mutex::new(());
3038 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3039
3040 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3041 unsafe {
3042 std::env::set_var("RED_ADMIN_TOKEN", "correct-token");
3043 }
3044
3045 let server = test_server();
3046 let mut headers = std::collections::BTreeMap::new();
3047 headers.insert(
3048 "authorization".to_string(),
3049 "Bearer wrong-token".to_string(),
3050 );
3051 let req = crate::server::transport::HttpRequest {
3052 method: "POST".to_string(),
3053 path: "/admin/cache/compare-and-set".to_string(),
3054 query: std::collections::BTreeMap::new(),
3055 headers,
3056 body: cas_body("ns", "k", b"v", 1),
3057 };
3058 let resp = server.route(req);
3059 assert_eq!(resp.status, 401);
3060
3061 unsafe {
3062 match prev {
3063 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3064 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3065 }
3066 }
3067 }
3068
3069 #[test]
3070 fn cas_concurrent_race_exactly_one_commits() {
3071 use std::sync::{Arc, Mutex};
3072
3073 let server = Arc::new(Mutex::new(test_server()));
3077 let committed = Arc::new(Mutex::new(0u32));
3078 let conflicted = Arc::new(Mutex::new(0u32));
3079
3080 let handles: Vec<_> = (0..8)
3081 .map(|_| {
3082 let server = Arc::clone(&server);
3083 let committed = Arc::clone(&committed);
3084 let conflicted = Arc::clone(&conflicted);
3085 std::thread::spawn(move || {
3086 let body = cas_body("race-ns", "race-key", b"payload", 1);
3088 let resp = {
3089 let s = server.lock().unwrap();
3090 s.handle_admin_blob_cache_compare_and_set(body)
3091 };
3092 match resp.status {
3093 200 => *committed.lock().unwrap() += 1,
3094 409 => *conflicted.lock().unwrap() += 1,
3095 s => panic!("unexpected status {s}"),
3096 }
3097 })
3098 })
3099 .collect();
3100
3101 for h in handles {
3102 h.join().expect("thread panicked");
3103 }
3104
3105 assert_eq!(
3106 *committed.lock().unwrap(),
3107 1,
3108 "exactly one CAS should commit (version 1 can only be written once)"
3109 );
3110 }
3111
3112 #[test]
3120 fn admin_blob_cache_routes_reject_unauth_when_admin_token_set() {
3121 use std::sync::Mutex;
3125 static GUARD: Mutex<()> = Mutex::new(());
3126 let _g = GUARD.lock().unwrap_or_else(|e| e.into_inner());
3127
3128 let prev = std::env::var("RED_ADMIN_TOKEN").ok();
3129 unsafe {
3132 std::env::set_var("RED_ADMIN_TOKEN", "test-token-148");
3133 }
3134
3135 let server = test_server();
3136
3137 let req = crate::server::transport::HttpRequest {
3139 method: "POST".to_string(),
3140 path: "/admin/blob_cache/sweep".to_string(),
3141 query: std::collections::BTreeMap::new(),
3142 headers: std::collections::BTreeMap::new(),
3143 body: br#"{"limit_entries":1}"#.to_vec(),
3144 };
3145 let resp = server.route(req);
3146 assert_eq!(resp.status, 401, "sweep without admin token must be 401");
3147
3148 let req = crate::server::transport::HttpRequest {
3150 method: "POST".to_string(),
3151 path: "/admin/blob_cache/flush_namespace".to_string(),
3152 query: std::collections::BTreeMap::new(),
3153 headers: std::collections::BTreeMap::new(),
3154 body: br#"{"namespace":"x"}"#.to_vec(),
3155 };
3156 let resp = server.route(req);
3157 assert_eq!(resp.status, 401, "flush without admin token must be 401");
3158
3159 let mut headers = std::collections::BTreeMap::new();
3161 headers.insert(
3162 "authorization".to_string(),
3163 "Bearer test-token-148".to_string(),
3164 );
3165 let req = crate::server::transport::HttpRequest {
3166 method: "POST".to_string(),
3167 path: "/admin/blob_cache/flush_namespace".to_string(),
3168 query: std::collections::BTreeMap::new(),
3169 headers,
3170 body: br#"{"namespace":"ok"}"#.to_vec(),
3171 };
3172 let resp = server.route(req);
3173 assert_eq!(resp.status, 200, "flush with admin token must be 200");
3174
3175 unsafe {
3177 match prev {
3178 Some(v) => std::env::set_var("RED_ADMIN_TOKEN", v),
3179 None => std::env::remove_var("RED_ADMIN_TOKEN"),
3180 }
3181 }
3182 }
3183}