1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18 id: EntityId,
19 key: String,
20 version: i64,
21 value: Value,
22 tombstone: bool,
23 created_at_ms: i64,
24 op: String,
25 value_type: Option<ConfigValueType>,
26 schema_version: Option<i64>,
27 tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31 fn key(&self) -> &str {
32 &self.key
33 }
34
35 fn version(&self) -> i64 {
36 self.version
37 }
38}
39
40impl ConfigVersion {
41 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42 Self {
43 id: version.id,
44 key: version.key,
45 version: version.version,
46 value: version.value,
47 tombstone: version.tombstone,
48 created_at_ms: version.created_at_ms,
49 op: version.op,
50 value_type: row
51 .get_field("value_type")
52 .and_then(config_value_type_from_value),
53 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54 tags: config_tags_from_value(row.get_field("tags")),
55 }
56 }
57}
58
59struct ConfigSecretRef {
60 collection: String,
61 key: String,
62}
63
64struct ConfigMutationEvidence {
65 id: String,
66 resource_type: String,
67 managed: bool,
68 mutability: crate::auth::registry::Mutability,
69 matched_action: Option<String>,
70 matched_resource: Option<String>,
71 payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75 Allowed(ConfigMutationEvidence),
76 Denied {
77 reason: String,
78 evidence: ConfigMutationEvidence,
79 },
80}
81
82impl RedDBRuntime {
83 pub fn execute_config_command(
84 &self,
85 raw_query: &str,
86 cmd: &crate::storage::query::ast::ConfigCommand,
87 ) -> RedDBResult<RuntimeQueryResult> {
88 use crate::storage::query::ast::ConfigCommand;
89
90 match cmd {
91 ConfigCommand::Put {
92 collection,
93 key,
94 value,
95 value_type,
96 tags,
97 } => self.config_write_result(
98 raw_query,
99 collection,
100 key,
101 value.clone(),
102 *value_type,
103 tags,
104 "put",
105 ),
106 ConfigCommand::Rotate {
107 collection,
108 key,
109 value,
110 value_type,
111 tags,
112 } => self.config_write_result(
113 raw_query,
114 collection,
115 key,
116 value.clone(),
117 *value_type,
118 tags,
119 "rotate",
120 ),
121 ConfigCommand::Get { collection, key } => {
122 self.config_get_result(raw_query, collection, key)
123 }
124 ConfigCommand::Resolve { collection, key } => {
125 self.config_resolve_result(raw_query, collection, key)
126 }
127 ConfigCommand::Delete { collection, key } => {
128 self.config_delete_result(raw_query, collection, key)
129 }
130 ConfigCommand::History { collection, key } => {
131 self.config_history_result(raw_query, collection, key)
132 }
133 ConfigCommand::List {
134 collection,
135 prefix,
136 limit,
137 offset,
138 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139 ConfigCommand::Watch {
140 collection,
141 key,
142 prefix,
143 from_lsn,
144 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146 Err(invalid_config_volatility(operation))
147 }
148 }
149 }
150
151 pub(crate) fn validate_config_command_before_auth(
152 &self,
153 cmd: &crate::storage::query::ast::ConfigCommand,
154 ) -> RedDBResult<()> {
155 use crate::storage::query::ast::ConfigCommand;
156 match cmd {
157 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158 Err(invalid_config_volatility(operation))
159 }
160 ConfigCommand::Put { collection, .. }
161 | ConfigCommand::Get { collection, .. }
162 | ConfigCommand::Resolve { collection, .. }
163 | ConfigCommand::Rotate { collection, .. }
164 | ConfigCommand::Delete { collection, .. }
165 | ConfigCommand::History { collection, .. }
166 | ConfigCommand::List { collection, .. }
167 | ConfigCommand::Watch { collection, .. } => {
168 let snapshot = self.inner.db.catalog_model_snapshot();
169 let Some(actual_model) = snapshot
170 .collections
171 .iter()
172 .find(|c| c.name == *collection)
173 .map(|c| c.declared_model.unwrap_or(c.model))
174 else {
175 return Ok(());
176 };
177 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178 CollectionModel::Config,
179 actual_model,
180 )
181 }
182 }
183 }
184
185 fn config_resolve_result(
186 &self,
187 raw_query: &str,
188 collection: &str,
189 key: &str,
190 ) -> RedDBResult<RuntimeQueryResult> {
191 let latest = self.latest_config_version(collection, key)?;
192 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193 self.audit_config_resolve(
194 collection,
195 key,
196 None,
197 crate::runtime::audit_log::Outcome::Denied,
198 &reason,
199 );
200 return Err(RedDBError::Query(reason));
201 }
202
203 let Some(version) = latest else {
204 let reason = "not_found";
205 self.audit_config_resolve(
206 collection,
207 key,
208 None,
209 crate::runtime::audit_log::Outcome::Denied,
210 reason,
211 );
212 return Err(RedDBError::NotFound(format!(
213 "config '{}.{}' not found",
214 collection, key
215 )));
216 };
217 if version.tombstone {
218 let reason = "deleted";
219 self.audit_config_resolve(
220 collection,
221 key,
222 None,
223 crate::runtime::audit_log::Outcome::Denied,
224 reason,
225 );
226 return Err(RedDBError::NotFound(format!(
227 "config '{}.{}' is deleted",
228 collection, key
229 )));
230 }
231
232 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233 self.audit_config_resolve(
234 collection,
235 key,
236 None,
237 crate::runtime::audit_log::Outcome::Error,
238 &err.to_string(),
239 );
240 })?;
241
242 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243 Ok(value) => {
244 if value_looks_like_secret_ref(&value) {
245 let err = secret_ref_chain_error(
246 collection,
247 key,
248 &secret_ref.collection,
249 &secret_ref.key,
250 );
251 let reason = err.to_string();
252 self.audit_config_resolve(
253 collection,
254 key,
255 Some(&secret_ref),
256 crate::runtime::audit_log::Outcome::Error,
257 &reason,
258 );
259 return Err(err);
260 }
261 self.audit_config_resolve(
262 collection,
263 key,
264 Some(&secret_ref),
265 crate::runtime::audit_log::Outcome::Success,
266 "ok",
267 );
268 let mut result = UnifiedResult::with_columns(vec![
269 "collection".into(),
270 "key".into(),
271 "value".into(),
272 "resolved_store".into(),
273 "resolved_collection".into(),
274 "resolved_key".into(),
275 ]);
276 let mut record = UnifiedRecord::new();
277 record.set("collection", Value::text(collection.to_string()));
278 record.set("key", Value::text(key.to_string()));
279 record.set("value", value);
280 record.set("resolved_store", Value::text("vault"));
281 record.set("resolved_collection", Value::text(secret_ref.collection));
282 record.set("resolved_key", Value::text(secret_ref.key));
283 result.push(record);
284 Ok(RuntimeQueryResult {
285 query: raw_query.to_string(),
286 mode: crate::storage::query::modes::QueryMode::Sql,
287 statement: "config_resolve",
288 engine: "config",
289 result,
290 affected_rows: 0,
291 statement_type: "select",
292 bookmark: None,
293 })
294 }
295 Err(err) => {
296 let reason = err.to_string();
297 let outcome = if reason.contains("denied") {
298 crate::runtime::audit_log::Outcome::Denied
299 } else {
300 crate::runtime::audit_log::Outcome::Error
301 };
302 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
303 Err(err)
304 }
305 }
306 }
307
308 fn config_write_result(
309 &self,
310 raw_query: &str,
311 collection: &str,
312 key: &str,
313 value: Value,
314 requested_type: Option<ConfigValueType>,
315 tags: &[String],
316 op: &str,
317 ) -> RedDBResult<RuntimeQueryResult> {
318 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
319 ConfigMutationAuthz::Allowed(evidence) => evidence,
320 ConfigMutationAuthz::Denied {
321 reason,
322 mut evidence,
323 } => {
324 evidence.payload = Some(value.clone());
325 let _ = self.emit_config_mutation_event(
326 crate::runtime::control_events::EventKind::ConfigWrite,
327 crate::runtime::control_events::Outcome::Denied,
328 "config:write",
329 collection,
330 key,
331 Some(reason.clone()),
332 &evidence,
333 );
334 return Err(RedDBError::Query(reason));
335 }
336 };
337 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
338 let _ = self.emit_config_mutation_event(
339 crate::runtime::control_events::EventKind::ConfigWrite,
340 crate::runtime::control_events::Outcome::Error,
341 "config:write",
342 collection,
343 key,
344 Some(err.to_string()),
345 &evidence,
346 );
347 return Err(err);
348 }
349 if is_enforcement_mode_config(collection, key) {
354 if let Err(err) = validate_enforcement_mode_value(&value) {
355 let _ = self.emit_config_mutation_event(
356 crate::runtime::control_events::EventKind::ConfigWrite,
357 crate::runtime::control_events::Outcome::Denied,
358 "config:write",
359 collection,
360 key,
361 Some(err.to_string()),
362 &evidence,
363 );
364 return Err(err);
365 }
366 }
367 if let Err(err) = self.ensure_config_collection(collection) {
368 let _ = self.emit_config_mutation_event(
369 crate::runtime::control_events::EventKind::ConfigWrite,
370 crate::runtime::control_events::Outcome::Error,
371 "config:write",
372 collection,
373 key,
374 Some(err.to_string()),
375 &evidence,
376 );
377 return Err(err);
378 }
379 let latest = match self.latest_config_version(collection, key) {
380 Ok(latest) => latest,
381 Err(err) => {
382 let _ = self.emit_config_mutation_event(
383 crate::runtime::control_events::EventKind::ConfigWrite,
384 crate::runtime::control_events::Outcome::Error,
385 "config:write",
386 collection,
387 key,
388 Some(err.to_string()),
389 &evidence,
390 );
391 return Err(err);
392 }
393 };
394 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
395 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
396 if let Some(value_type) = value_type {
397 if let Err(err) = validate_config_value_type(&value, value_type) {
398 let _ = self.emit_config_mutation_event(
399 crate::runtime::control_events::EventKind::ConfigWrite,
400 crate::runtime::control_events::Outcome::Error,
401 "config:write",
402 collection,
403 key,
404 Some(err.to_string()),
405 &evidence,
406 );
407 return Err(err);
408 }
409 }
410 evidence.payload = Some(value.clone());
411 if let Some(reason) = self.secret_ref_guard_write_check(collection, key, &value) {
412 let _ = self.emit_config_mutation_event(
413 crate::runtime::control_events::EventKind::ConfigWrite,
414 crate::runtime::control_events::Outcome::Denied,
415 "config:write",
416 collection,
417 key,
418 Some(reason.to_string()),
419 &evidence,
420 );
421 return Err(reason);
422 }
423 let before = latest.as_ref().and_then(|version| {
424 if version.tombstone {
425 None
426 } else {
427 Some(crate::presentation::entity_json::storage_value_to_json(
428 &version.value,
429 ))
430 }
431 });
432 let after = Some(crate::presentation::entity_json::storage_value_to_json(
433 &value,
434 ));
435 let change_op = if latest.is_some() {
436 crate::replication::cdc::ChangeOperation::Update
437 } else {
438 crate::replication::cdc::ChangeOperation::Insert
439 };
440 let id = match self.append_config_version(
441 collection,
442 key,
443 value,
444 version,
445 false,
446 op,
447 value_type,
448 schema_version,
449 tags,
450 ) {
451 Ok(id) => id,
452 Err(err) => {
453 let _ = self.emit_config_mutation_event(
454 crate::runtime::control_events::EventKind::ConfigWrite,
455 crate::runtime::control_events::Outcome::Error,
456 "config:write",
457 collection,
458 key,
459 Some(err.to_string()),
460 &evidence,
461 );
462 return Err(err);
463 }
464 };
465 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
466 if let Err(err) = self.prune_config_history(collection, key) {
467 let _ = self.emit_config_mutation_event(
468 crate::runtime::control_events::EventKind::ConfigWrite,
469 crate::runtime::control_events::Outcome::Error,
470 "config:write",
471 collection,
472 key,
473 Some(err.to_string()),
474 &evidence,
475 );
476 return Err(err);
477 }
478 self.invalidate_result_cache();
479 if let Err(err) = self.emit_config_mutation_event(
480 crate::runtime::control_events::EventKind::ConfigWrite,
481 crate::runtime::control_events::Outcome::Allowed,
482 "config:write",
483 collection,
484 key,
485 None,
486 &evidence,
487 ) {
488 let _ = self.inner.db.store().delete(collection, id);
489 self.invalidate_result_cache();
490 return Err(err);
491 }
492 if is_enforcement_mode_config(collection, key) {
496 if let Some(auth_store) = self.inner.auth_store.read().clone() {
497 if let Value::Text(text) =
498 &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
499 {
500 if let Some(mode) =
501 crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
502 {
503 auth_store.set_enforcement_mode(mode);
504 }
505 }
506 }
507 }
508 Ok(config_write_output(
509 raw_query,
510 collection,
511 key,
512 version,
513 id,
514 value_type,
515 schema_version,
516 tags,
517 match op {
518 "rotate" => "config_rotate",
519 _ => "config_put",
520 },
521 1,
522 ))
523 }
524
525 fn config_delete_result(
526 &self,
527 raw_query: &str,
528 collection: &str,
529 key: &str,
530 ) -> RedDBResult<RuntimeQueryResult> {
531 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
532 ConfigMutationAuthz::Allowed(evidence) => evidence,
533 ConfigMutationAuthz::Denied { reason, evidence } => {
534 let _ = self.emit_config_mutation_event(
535 crate::runtime::control_events::EventKind::ConfigDelete,
536 crate::runtime::control_events::Outcome::Denied,
537 "config:delete",
538 collection,
539 key,
540 Some(reason.clone()),
541 &evidence,
542 );
543 return Err(RedDBError::Query(reason));
544 }
545 };
546 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
547 let _ = self.emit_config_mutation_event(
548 crate::runtime::control_events::EventKind::ConfigDelete,
549 crate::runtime::control_events::Outcome::Error,
550 "config:delete",
551 collection,
552 key,
553 Some(err.to_string()),
554 &evidence,
555 );
556 return Err(err);
557 }
558 if let Err(err) = self.ensure_config_collection(collection) {
559 let _ = self.emit_config_mutation_event(
560 crate::runtime::control_events::EventKind::ConfigDelete,
561 crate::runtime::control_events::Outcome::Error,
562 "config:delete",
563 collection,
564 key,
565 Some(err.to_string()),
566 &evidence,
567 );
568 return Err(err);
569 }
570 let latest = match self.latest_config_version(collection, key) {
571 Ok(latest) => latest,
572 Err(err) => {
573 let _ = self.emit_config_mutation_event(
574 crate::runtime::control_events::EventKind::ConfigDelete,
575 crate::runtime::control_events::Outcome::Error,
576 "config:delete",
577 collection,
578 key,
579 Some(err.to_string()),
580 &evidence,
581 );
582 return Err(err);
583 }
584 };
585 evidence.payload = latest.as_ref().map(|version| version.value.clone());
586 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
587 let value_type = latest.as_ref().and_then(|version| version.value_type);
588 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
589 let id = match self.append_config_version(
590 collection,
591 key,
592 Value::Null,
593 version,
594 true,
595 "delete",
596 value_type,
597 schema_version,
598 &[],
599 ) {
600 Ok(id) => id,
601 Err(err) => {
602 let _ = self.emit_config_mutation_event(
603 crate::runtime::control_events::EventKind::ConfigDelete,
604 crate::runtime::control_events::Outcome::Error,
605 "config:delete",
606 collection,
607 key,
608 Some(err.to_string()),
609 &evidence,
610 );
611 return Err(err);
612 }
613 };
614 if let Some(before) = latest.as_ref().and_then(|version| {
615 if version.tombstone {
616 None
617 } else {
618 Some(crate::presentation::entity_json::storage_value_to_json(
619 &version.value,
620 ))
621 }
622 }) {
623 self.record_kv_watch_event(
624 crate::replication::cdc::ChangeOperation::Delete,
625 collection,
626 key,
627 id.raw(),
628 Some(before),
629 None,
630 );
631 }
632 if let Err(err) = self.prune_config_history(collection, key) {
633 let _ = self.emit_config_mutation_event(
634 crate::runtime::control_events::EventKind::ConfigDelete,
635 crate::runtime::control_events::Outcome::Error,
636 "config:delete",
637 collection,
638 key,
639 Some(err.to_string()),
640 &evidence,
641 );
642 return Err(err);
643 }
644 self.invalidate_result_cache();
645 if let Err(err) = self.emit_config_mutation_event(
646 crate::runtime::control_events::EventKind::ConfigDelete,
647 crate::runtime::control_events::Outcome::Allowed,
648 "config:delete",
649 collection,
650 key,
651 None,
652 &evidence,
653 ) {
654 let _ = self.inner.db.store().delete(collection, id);
655 self.invalidate_result_cache();
656 return Err(err);
657 }
658 Ok(config_write_output(
659 raw_query,
660 collection,
661 key,
662 version,
663 id,
664 value_type,
665 schema_version,
666 &[],
667 "delete",
668 1,
669 ))
670 }
671
672 fn config_get_result(
673 &self,
674 raw_query: &str,
675 collection: &str,
676 key: &str,
677 ) -> RedDBResult<RuntimeQueryResult> {
678 self.check_system_config_capability("config:read", collection, key)
679 .map_err(RedDBError::Query)?;
680 let latest = self.latest_config_version(collection, key)?;
681 let mut result = UnifiedResult::with_columns(vec![
682 "collection".into(),
683 "key".into(),
684 "value".into(),
685 "version".into(),
686 "value_type".into(),
687 "schema_version".into(),
688 "tags".into(),
689 "tombstone".into(),
690 ]);
691 let mut record = UnifiedRecord::new();
692 record.set("collection", Value::text(collection.to_string()));
693 record.set("key", Value::text(key.to_string()));
694 if let Some(version) = latest {
695 record.set("value", version.value);
696 record.set("version", Value::Integer(version.version));
697 record.set("value_type", config_value_type_value(version.value_type));
698 record.set(
699 "schema_version",
700 version
701 .schema_version
702 .map(Value::Integer)
703 .unwrap_or(Value::Null),
704 );
705 record.set("tags", config_tags_value(&version.tags));
706 record.set("tombstone", Value::Boolean(version.tombstone));
707 } else {
708 record.set("value", Value::Null);
709 record.set("version", Value::Null);
710 record.set("value_type", Value::Null);
711 record.set("schema_version", Value::Null);
712 record.set("tags", Value::Null);
713 record.set("tombstone", Value::Boolean(false));
714 }
715 result.push(record);
716 Ok(RuntimeQueryResult {
717 query: raw_query.to_string(),
718 mode: crate::storage::query::modes::QueryMode::Sql,
719 statement: "config_get",
720 engine: "config",
721 result,
722 affected_rows: 0,
723 statement_type: "select",
724 bookmark: None,
725 })
726 }
727
728 fn config_history_result(
729 &self,
730 raw_query: &str,
731 collection: &str,
732 key: &str,
733 ) -> RedDBResult<RuntimeQueryResult> {
734 self.check_system_config_capability("config:read", collection, key)
735 .map_err(RedDBError::Query)?;
736 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
737 let mut result = UnifiedResult::with_columns(vec![
738 "collection".into(),
739 "key".into(),
740 "version".into(),
741 "value".into(),
742 "value_type".into(),
743 "schema_version".into(),
744 "tags".into(),
745 "tombstone".into(),
746 "op".into(),
747 "created_at_ms".into(),
748 ]);
749 for version in versions {
750 let mut record = UnifiedRecord::new();
751 record.set("collection", Value::text(collection.to_string()));
752 record.set("key", Value::text(key.to_string()));
753 record.set("version", Value::Integer(version.version));
754 record.set("value", version.value);
755 record.set("value_type", config_value_type_value(version.value_type));
756 record.set(
757 "schema_version",
758 version
759 .schema_version
760 .map(Value::Integer)
761 .unwrap_or(Value::Null),
762 );
763 record.set("tags", Value::Null);
764 record.set("tombstone", Value::Boolean(version.tombstone));
765 record.set("op", Value::text(version.op));
766 record.set("created_at_ms", Value::Integer(version.created_at_ms));
767 result.push(record);
768 }
769 Ok(RuntimeQueryResult {
770 query: raw_query.to_string(),
771 mode: crate::storage::query::modes::QueryMode::Sql,
772 statement: "config_history",
773 engine: "config",
774 result,
775 affected_rows: 0,
776 statement_type: "select",
777 bookmark: None,
778 })
779 }
780
781 fn config_list_result(
782 &self,
783 raw_query: &str,
784 collection: &str,
785 prefix: Option<&str>,
786 limit: Option<usize>,
787 offset: usize,
788 ) -> RedDBResult<RuntimeQueryResult> {
789 let mut versions = self.latest_config_versions(collection, prefix)?;
790 versions.sort_by(|left, right| left.key.cmp(&right.key));
791 let mut result = UnifiedResult::with_columns(vec![
792 "collection".into(),
793 "key".into(),
794 "value".into(),
795 "version".into(),
796 "value_type".into(),
797 "schema_version".into(),
798 "tags".into(),
799 "tombstone".into(),
800 "op".into(),
801 "created_at_ms".into(),
802 ]);
803 for version in versions
804 .into_iter()
805 .filter(|version| {
806 self.check_config_capability("config:read", collection, &version.key)
807 .is_ok()
808 })
809 .skip(offset)
810 .take(limit.unwrap_or(usize::MAX))
811 {
812 let mut record = UnifiedRecord::new();
813 record.set("collection", Value::text(collection.to_string()));
814 record.set("key", Value::text(version.key));
815 record.set("value", version.value);
816 record.set("version", Value::Integer(version.version));
817 record.set("value_type", config_value_type_value(version.value_type));
818 record.set(
819 "schema_version",
820 version
821 .schema_version
822 .map(Value::Integer)
823 .unwrap_or(Value::Null),
824 );
825 record.set("tags", config_tags_value(&version.tags));
826 record.set("tombstone", Value::Boolean(version.tombstone));
827 record.set("op", Value::text(version.op));
828 record.set("created_at_ms", Value::Integer(version.created_at_ms));
829 result.push(record);
830 }
831 Ok(RuntimeQueryResult {
832 query: raw_query.to_string(),
833 mode: crate::storage::query::modes::QueryMode::Sql,
834 statement: "config_list",
835 engine: "config",
836 result,
837 affected_rows: 0,
838 statement_type: "select",
839 bookmark: None,
840 })
841 }
842
843 fn config_watch_result(
844 &self,
845 raw_query: &str,
846 collection: &str,
847 key: &str,
848 prefix: bool,
849 from_lsn: Option<u64>,
850 ) -> RedDBResult<RuntimeQueryResult> {
851 let watch_key = if prefix {
852 format!("{key}.*")
853 } else {
854 key.to_string()
855 };
856 let endpoint = match from_lsn {
857 Some(lsn) => {
858 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
859 }
860 None => format!("/collections/{collection}/config/{watch_key}/watch"),
861 };
862 let mut result = UnifiedResult::with_columns(vec![
863 "collection".into(),
864 "key".into(),
865 "prefix".into(),
866 "from_lsn".into(),
867 "watch_url".into(),
868 "streaming".into(),
869 ]);
870 let mut record = UnifiedRecord::new();
871 record.set("collection", Value::text(collection.to_string()));
872 record.set("key", Value::text(watch_key));
873 record.set("prefix", Value::Boolean(prefix));
874 record.set(
875 "from_lsn",
876 from_lsn
877 .map(Value::UnsignedInteger)
878 .unwrap_or(crate::storage::schema::Value::Null),
879 );
880 record.set("watch_url", Value::text(endpoint));
881 record.set("streaming", Value::Boolean(true));
882 result.push(record);
883 Ok(RuntimeQueryResult {
884 query: raw_query.to_string(),
885 mode: crate::storage::query::modes::QueryMode::Sql,
886 statement: "config_watch",
887 engine: "config",
888 result,
889 affected_rows: 0,
890 statement_type: "stream",
891 bookmark: None,
892 })
893 }
894
895 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
896 let store = self.inner.db.store();
897 if store.get_collection(collection).is_none() {
898 store
899 .create_collection(collection)
900 .map_err(|err| RedDBError::Internal(err.to_string()))?;
901 }
902 if let Some(contract) = self.inner.db.collection_contract(collection) {
903 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
904 CollectionModel::Config,
905 contract.declared_model,
906 )?;
907 return Ok(());
908 }
909 let now = current_unix_ms();
910 self.inner
911 .db
912 .save_collection_contract(CollectionContract {
913 name: collection.to_string(),
914 declared_model: CollectionModel::Config,
915 schema_mode: SchemaMode::Dynamic,
916 origin: ContractOrigin::Explicit,
917 version: 1,
918 created_at_unix_ms: now as u128,
919 updated_at_unix_ms: now as u128,
920 default_ttl_ms: None,
921 vector_dimension: None,
922 vector_metric: None,
923 context_index_fields: Vec::new(),
924 declared_columns: Vec::new(),
925 table_def: None,
926 timestamps_enabled: false,
927 context_index_enabled: false,
928 metrics_raw_retention_ms: None,
929 metrics_rollup_policies: Vec::new(),
930 metrics_tenant_identity: None,
931 metrics_namespace: None,
932 append_only: false,
933 subscriptions: Vec::new(),
934 analytics_config: Vec::new(),
935 session_key: None,
936 session_gap_ms: None,
937 retention_duration_ms: None,
938 })
939 .map(|_| ())
940 .map_err(|err| RedDBError::Internal(err.to_string()))
941 }
942
943 fn append_config_version(
944 &self,
945 collection: &str,
946 key: &str,
947 value: Value,
948 version: i64,
949 tombstone: bool,
950 op: &str,
951 value_type: Option<ConfigValueType>,
952 schema_version: Option<i64>,
953 tags: &[String],
954 ) -> RedDBResult<EntityId> {
955 let now = current_unix_ms() as i64;
956 let fields = vec![
957 ("key".to_string(), Value::text(key.to_string())),
958 ("value".to_string(), value),
959 ("version".to_string(), Value::Integer(version)),
960 (
961 "value_type".to_string(),
962 config_value_type_value(value_type),
963 ),
964 (
965 "schema_version".to_string(),
966 schema_version.map(Value::Integer).unwrap_or(Value::Null),
967 ),
968 ("tombstone".to_string(), Value::Boolean(tombstone)),
969 ("op".to_string(), Value::text(op.to_string())),
970 ("created_at_ms".to_string(), Value::Integer(now)),
971 ("tags".to_string(), config_tags_value(tags)),
972 ];
973 let mut row = RowData::new(Vec::new());
974 row.named = Some(fields.into_iter().collect());
975 let entity = UnifiedEntity::new(
976 EntityId::new(0),
977 EntityKind::TableRow {
978 table: Arc::from(collection),
979 row_id: 0,
980 },
981 EntityData::Row(row),
982 );
983 self.inner
984 .db
985 .store()
986 .insert(collection, entity)
987 .map_err(|err| RedDBError::Internal(err.to_string()))
988 }
989
990 fn latest_config_version(
991 &self,
992 collection: &str,
993 key: &str,
994 ) -> RedDBResult<Option<ConfigVersion>> {
995 Ok(super::keyed_spine::latest_version(
996 self.config_versions(collection, key)?,
997 ))
998 }
999
1000 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
1001 let store = self.inner.db.store();
1002 let Some(manager) = store.get_collection(collection) else {
1003 return Ok(Vec::new());
1004 };
1005 let mut versions = Vec::new();
1006 for entity in manager.query_all(|_| true) {
1007 let EntityData::Row(row) = &entity.data else {
1008 continue;
1009 };
1010 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1011 continue;
1012 };
1013 if version.key != key {
1014 continue;
1015 }
1016 versions.push(ConfigVersion::from_keyed_row(version, row));
1017 }
1018 Ok(versions)
1019 }
1020
1021 fn latest_config_versions(
1022 &self,
1023 collection: &str,
1024 prefix: Option<&str>,
1025 ) -> RedDBResult<Vec<ConfigVersion>> {
1026 let store = self.inner.db.store();
1027 let Some(manager) = store.get_collection(collection) else {
1028 return Ok(Vec::new());
1029 };
1030 let mut versions = Vec::new();
1031 for entity in manager.query_all(|_| true) {
1032 let EntityData::Row(row) = &entity.data else {
1033 continue;
1034 };
1035 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1036 continue;
1037 };
1038 versions.push(ConfigVersion::from_keyed_row(version, row));
1039 }
1040 Ok(super::keyed_spine::latest_versions(versions, prefix))
1041 }
1042
1043 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1044 let mut versions = self.config_versions(collection, key)?;
1045 if versions.len() <= CONFIG_HISTORY_LIMIT {
1046 return Ok(());
1047 }
1048 versions = super::keyed_spine::history_versions(versions);
1049 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1050 let store = self.inner.db.store();
1051 for version in versions.into_iter().take(drop_count) {
1052 store
1053 .delete(collection, version.id)
1054 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1055 }
1056 Ok(())
1057 }
1058
1059 fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1060 let default_evidence = self.default_config_mutation_evidence(collection, key);
1061 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1062 return ConfigMutationAuthz::Allowed(default_evidence);
1063 };
1064 if !auth_store.iam_authorization_enabled() {
1065 return ConfigMutationAuthz::Allowed(default_evidence);
1066 }
1067 let Some((principal, role)) = current_auth_identity() else {
1068 return ConfigMutationAuthz::Denied {
1069 reason:
1070 "IAM authorization is enabled; config capability check requires an authenticated principal"
1071 .to_string(),
1072 evidence: default_evidence,
1073 };
1074 };
1075 let tenant = current_tenant();
1076 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1077 let ctx = crate::auth::policies::EvalContext {
1078 principal_tenant: tenant.clone(),
1079 current_tenant: tenant.clone(),
1080 peer_ip: None,
1081 mfa_present: false,
1082 now_ms: crate::utils::now_unix_millis() as u128,
1083 principal_is_admin_role: role == crate::auth::Role::Admin,
1084 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1085 principal_is_platform_scoped: principal_id.tenant.is_none(),
1086 };
1087 let managed_key = if collection == "red.config" {
1088 format!("red.config.{key}")
1089 } else {
1090 key.to_string()
1091 };
1092 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1093 self.inner.config_registry.as_ref(),
1094 );
1095 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1096 crate::auth::managed_config::ManagedConfigDecision::Allow {
1097 entry_id,
1098 resource_type,
1099 managed,
1100 mutability,
1101 matched_action,
1102 matched_resource,
1103 ..
1104 } => {
1105 return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1106 id: entry_id,
1107 resource_type,
1108 managed,
1109 mutability,
1110 matched_action: Some(matched_action),
1111 matched_resource: Some(matched_resource),
1112 payload: None,
1113 });
1114 }
1115 crate::auth::managed_config::ManagedConfigDecision::Deny {
1116 entry_id,
1117 resource_type,
1118 managed,
1119 mutability,
1120 matched_action,
1121 matched_resource,
1122 reason,
1123 ..
1124 } => {
1125 return ConfigMutationAuthz::Denied {
1126 reason: format!(
1127 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1128 ),
1129 evidence: ConfigMutationEvidence {
1130 id: entry_id,
1131 resource_type,
1132 managed,
1133 mutability,
1134 matched_action: Some(matched_action),
1135 matched_resource: Some(matched_resource),
1136 payload: None,
1137 },
1138 };
1139 }
1140 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1141 }
1142
1143 let mut resource = crate::auth::policies::ResourceRef::new(
1144 "config",
1145 config_target_resource(collection, key),
1146 );
1147 if let Some(ref tenant) = tenant {
1148 resource = resource.with_tenant(tenant.clone());
1149 }
1150 if auth_store.check_policy_authz_with_role(
1151 &principal_id,
1152 "config:write",
1153 &resource,
1154 &ctx,
1155 role,
1156 ) {
1157 ConfigMutationAuthz::Allowed(default_evidence)
1158 } else {
1159 ConfigMutationAuthz::Denied {
1160 reason: format!(
1161 "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1162 principal,
1163 config_target_resource(collection, key)
1164 ),
1165 evidence: default_evidence,
1166 }
1167 }
1168 }
1169
1170 fn default_config_mutation_evidence(
1171 &self,
1172 collection: &str,
1173 key: &str,
1174 ) -> ConfigMutationEvidence {
1175 let id = if collection == "red.config" {
1176 format!("red.config.{key}")
1177 } else {
1178 key.to_string()
1179 };
1180 ConfigMutationEvidence {
1181 id,
1182 resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1183 managed: false,
1184 mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1185 matched_action: None,
1186 matched_resource: None,
1187 payload: None,
1188 }
1189 }
1190
1191 fn emit_config_mutation_event(
1192 &self,
1193 kind: crate::runtime::control_events::EventKind,
1194 outcome: crate::runtime::control_events::Outcome,
1195 action: &'static str,
1196 collection: &str,
1197 key: &str,
1198 reason: Option<String>,
1199 evidence: &ConfigMutationEvidence,
1200 ) -> RedDBResult<()> {
1201 use crate::runtime::control_events::{
1202 ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1203 };
1204
1205 let tenant = current_tenant();
1206 let principal = current_auth_identity();
1207 let actor_user = principal
1208 .as_ref()
1209 .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1210 let actor = actor_user
1211 .as_ref()
1212 .map(ActorRef::User)
1213 .unwrap_or(ActorRef::Anonymous);
1214 let ctx = ControlEventCtx {
1215 actor,
1216 scope: tenant
1217 .as_ref()
1218 .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1219 request_id: Some(std::borrow::Cow::Owned(format!(
1220 "conn-{}",
1221 current_connection_id()
1222 ))),
1223 trace_id: None,
1224 };
1225
1226 let mut fields = HashMap::new();
1227 fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1228 fields.insert(
1229 "resource_type".to_string(),
1230 Sensitivity::raw(evidence.resource_type.clone()),
1231 );
1232 fields.insert(
1233 "managed".to_string(),
1234 Sensitivity::raw(evidence.managed.to_string()),
1235 );
1236 fields.insert(
1237 "mutability".to_string(),
1238 Sensitivity::raw(config_mutability_label(evidence.mutability)),
1239 );
1240 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1241 fields.insert("key".to_string(), Sensitivity::raw(key));
1242 fields.insert(
1243 "connection_id".to_string(),
1244 Sensitivity::raw(current_connection_id().to_string()),
1245 );
1246 if let Some((_, role)) = principal {
1247 fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1248 }
1249 if let Some(matched_action) = &evidence.matched_action {
1250 fields.insert(
1251 "matched_action".to_string(),
1252 Sensitivity::raw(matched_action.clone()),
1253 );
1254 }
1255 if let Some(matched_resource) = &evidence.matched_resource {
1256 fields.insert(
1257 "matched_resource".to_string(),
1258 Sensitivity::raw(matched_resource.clone()),
1259 );
1260 }
1261 if let Some(payload) = &evidence.payload {
1262 fields.insert(
1263 "payload".to_string(),
1264 config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1265 );
1266 }
1267
1268 let event = ControlEvent {
1269 kind,
1270 outcome,
1271 action: std::borrow::Cow::Borrowed(action),
1272 resource: Some(format!(
1273 "config:{}",
1274 config_target_resource(collection, key)
1275 )),
1276 reason,
1277 matched_policy_id: None,
1278 fields,
1279 };
1280 let ledger = self.inner.control_event_ledger.read();
1281 match ledger.emit(&ctx, event) {
1282 Ok(_) => Ok(()),
1283 Err(err) if self.inner.control_event_config.require_persistence() => {
1284 Err(RedDBError::Internal(err.to_string()))
1285 }
1286 Err(_) => Ok(()),
1287 }
1288 }
1289
1290 fn check_config_capability(
1291 &self,
1292 action: &str,
1293 collection: &str,
1294 key: &str,
1295 ) -> Result<(), String> {
1296 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1297 return Ok(());
1298 };
1299 if !auth_store.iam_authorization_enabled() {
1300 return Ok(());
1301 }
1302 let Some((principal, role)) = current_auth_identity() else {
1303 return Err(
1304 "IAM authorization is enabled; config capability check requires an authenticated principal"
1305 .to_string(),
1306 );
1307 };
1308 let tenant = current_tenant();
1309 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1310 let mut resource = crate::auth::policies::ResourceRef::new(
1311 "config",
1312 config_target_resource(collection, key),
1313 );
1314 if let Some(ref tenant) = tenant {
1315 resource = resource.with_tenant(tenant.clone());
1316 }
1317 let ctx = crate::auth::policies::EvalContext {
1318 principal_tenant: tenant.clone(),
1319 current_tenant: tenant,
1320 peer_ip: None,
1321 mfa_present: false,
1322 now_ms: crate::utils::now_unix_millis() as u128,
1323 principal_is_admin_role: role == crate::auth::Role::Admin,
1324 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1325 principal_is_platform_scoped: principal_id.tenant.is_none(),
1326 };
1327 if action == "config:write" {
1328 let managed_key = if collection == "red.config" {
1329 format!("red.config.{key}")
1330 } else {
1331 key.to_string()
1332 };
1333 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1334 self.inner.config_registry.as_ref(),
1335 );
1336 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1337 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1338 crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1339 crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1340 return Err(format!(
1341 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1342 ));
1343 }
1344 }
1345 }
1346 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1347 Ok(())
1348 } else {
1349 Err(format!(
1350 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1351 principal,
1352 action,
1353 config_target_resource(collection, key)
1354 ))
1355 }
1356 }
1357
1358 fn check_system_config_capability(
1359 &self,
1360 action: &str,
1361 collection: &str,
1362 key: &str,
1363 ) -> Result<(), String> {
1364 if collection != "red.config" {
1365 return Ok(());
1366 }
1367 self.check_config_capability(action, collection, key)
1368 }
1369
1370 pub fn config_watch_events_since(
1371 &self,
1372 collection: &str,
1373 key: &str,
1374 since_lsn: u64,
1375 max_count: usize,
1376 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1377 self.kv_watch_events_since(collection, key, since_lsn, max_count)
1378 .into_iter()
1379 .map(|event| self.policy_filter_config_watch_event(event))
1380 .collect()
1381 }
1382
1383 pub fn config_watch_events_since_prefix(
1384 &self,
1385 collection: &str,
1386 prefix: &str,
1387 since_lsn: u64,
1388 max_count: usize,
1389 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1390 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1391 .into_iter()
1392 .map(|event| self.policy_filter_config_watch_event(event))
1393 .collect()
1394 }
1395
1396 fn policy_filter_config_watch_event(
1397 &self,
1398 mut event: crate::replication::cdc::KvWatchEvent,
1399 ) -> crate::replication::cdc::KvWatchEvent {
1400 if self
1401 .check_config_capability("config:read", &event.collection, &event.key)
1402 .is_err()
1403 {
1404 event.before = None;
1405 event.after = None;
1406 }
1407 event
1408 }
1409
1410 fn secret_ref_guard_write_check(
1416 &self,
1417 collection: &str,
1418 key: &str,
1419 value: &Value,
1420 ) -> Option<RedDBError> {
1421 if !value_looks_like_secret_ref(value) {
1422 return None;
1423 }
1424 let Ok(secret_ref) = parse_config_secret_ref(value) else {
1425 return None;
1426 };
1427 let unsealed = match self.peek_vault_unsealed(&secret_ref.collection, &secret_ref.key) {
1428 Ok(Some(value)) => value,
1429 Ok(None) => return None,
1430 Err(_) => return None,
1431 };
1432 if value_looks_like_secret_ref(&unsealed) {
1433 return Some(secret_ref_chain_error(
1434 collection,
1435 key,
1436 &secret_ref.collection,
1437 &secret_ref.key,
1438 ));
1439 }
1440 None
1441 }
1442
1443 fn audit_config_resolve(
1444 &self,
1445 collection: &str,
1446 key: &str,
1447 secret_ref: Option<&ConfigSecretRef>,
1448 outcome: crate::runtime::audit_log::Outcome,
1449 reason: &str,
1450 ) {
1451 let actor = current_auth_identity()
1452 .map(|(principal, _)| principal)
1453 .unwrap_or_else(|| "anonymous".to_string());
1454 let request_id = match current_connection_id() {
1455 0 => "embedded".to_string(),
1456 id => format!("conn-{id}"),
1457 };
1458 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1459 .principal(actor.clone())
1460 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1461 .resource(format!(
1462 "config:{}",
1463 config_target_resource(collection, key)
1464 ))
1465 .outcome(outcome)
1466 .correlation_id(request_id.clone())
1467 .fields([
1468 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1469 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1470 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1471 crate::runtime::audit_log::AuditFieldEscaper::field(
1472 "target",
1473 config_target_resource(collection, key),
1474 ),
1475 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1476 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1477 crate::runtime::audit_log::AuditFieldEscaper::field(
1478 "connection_id",
1479 current_connection_id(),
1480 ),
1481 ]);
1482 if let Some(tenant) = current_tenant() {
1483 builder = builder.tenant(tenant);
1484 }
1485 if let Some(secret_ref) = secret_ref {
1486 builder = builder.fields([
1487 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1488 crate::runtime::audit_log::AuditFieldEscaper::field(
1489 "resolved_collection",
1490 secret_ref.collection.as_str(),
1491 ),
1492 crate::runtime::audit_log::AuditFieldEscaper::field(
1493 "resolved_key",
1494 secret_ref.key.as_str(),
1495 ),
1496 crate::runtime::audit_log::AuditFieldEscaper::field(
1497 "resolved_target",
1498 format!("{}.{}", secret_ref.collection, secret_ref.key),
1499 ),
1500 ]);
1501 }
1502 self.audit_log().record_event(builder.build());
1503 }
1504}
1505
1506fn value_looks_like_secret_ref(value: &Value) -> bool {
1511 let Value::Json(bytes) = value else {
1512 return false;
1513 };
1514 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1515 return false;
1516 };
1517 let Some(object) = json.as_object() else {
1518 return false;
1519 };
1520 object.get("type").and_then(|value| value.as_str()) == Some("secret_ref")
1521}
1522
1523fn secret_ref_chain_error(
1524 source_collection: &str,
1525 source_key: &str,
1526 target_collection: &str,
1527 target_key: &str,
1528) -> RedDBError {
1529 RedDBError::InvalidConfig(format!(
1530 "secret_ref chain rejected: config `{source_collection}.{source_key}` points at vault `{target_collection}.{target_key}` which is itself a secret_ref; depth-1 invariant requires the target to resolve to a non-secret_ref value"
1531 ))
1532}
1533
1534fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1535 let Value::Json(bytes) = value else {
1536 return Err(RedDBError::InvalidConfig(
1537 "CONFIG value is not a SecretRef".to_string(),
1538 ));
1539 };
1540 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1541 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1542 })?;
1543 let Some(object) = json.as_object() else {
1544 return Err(RedDBError::InvalidConfig(
1545 "CONFIG SecretRef must be an object".to_string(),
1546 ));
1547 };
1548 let get_str = |field: &str| -> RedDBResult<&str> {
1549 object
1550 .get(field)
1551 .and_then(|value| value.as_str())
1552 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1553 };
1554 if get_str("type")? != "secret_ref" {
1555 return Err(RedDBError::InvalidConfig(
1556 "CONFIG value is not a SecretRef".to_string(),
1557 ));
1558 }
1559 if get_str("store")? != "vault" {
1560 return Err(RedDBError::InvalidConfig(
1561 "CONFIG SecretRef store is unsupported".to_string(),
1562 ));
1563 }
1564 Ok(ConfigSecretRef {
1565 collection: get_str("collection")?.to_string(),
1566 key: get_str("key")?.to_string(),
1567 })
1568}
1569
1570fn config_target_resource(collection: &str, key: &str) -> String {
1571 if collection == "red.config" {
1572 format!("red.config/{}", key.to_ascii_lowercase())
1573 } else {
1574 format!("{collection}.{key}")
1575 }
1576}
1577
1578fn config_write_output(
1579 raw_query: &str,
1580 collection: &str,
1581 key: &str,
1582 version: i64,
1583 id: EntityId,
1584 value_type: Option<ConfigValueType>,
1585 schema_version: Option<i64>,
1586 tags: &[String],
1587 statement: &'static str,
1588 affected_rows: u64,
1589) -> RuntimeQueryResult {
1590 let mut result = UnifiedResult::with_columns(vec![
1591 "ok".into(),
1592 "collection".into(),
1593 "key".into(),
1594 "version".into(),
1595 "value_type".into(),
1596 "schema_version".into(),
1597 "tags".into(),
1598 "id".into(),
1599 ]);
1600 let mut record = UnifiedRecord::new();
1601 record.set("ok", Value::Boolean(true));
1602 record.set("collection", Value::text(collection.to_string()));
1603 record.set("key", Value::text(key.to_string()));
1604 record.set("version", Value::Integer(version));
1605 record.set("value_type", config_value_type_value(value_type));
1606 record.set(
1607 "schema_version",
1608 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1609 );
1610 record.set("tags", config_tags_value(tags));
1611 record.set("id", Value::Integer(id.raw() as i64));
1612 result.push(record);
1613 RuntimeQueryResult {
1614 query: raw_query.to_string(),
1615 mode: crate::storage::query::modes::QueryMode::Sql,
1616 statement,
1617 engine: "config",
1618 result,
1619 affected_rows,
1620 statement_type: if statement == "delete" {
1621 "delete"
1622 } else {
1623 "update"
1624 },
1625 bookmark: None,
1626 }
1627}
1628
1629fn invalid_config_volatility(operation: &str) -> RedDBError {
1630 RedDBError::InvalidOperation(format!(
1631 "CONFIG does not support KV-only volatility operation {operation}"
1632 ))
1633}
1634
1635fn resolve_config_schema(
1636 latest: Option<&ConfigVersion>,
1637 requested_type: Option<ConfigValueType>,
1638) -> (Option<ConfigValueType>, Option<i64>) {
1639 let previous_type = latest.and_then(|version| version.value_type);
1640 let previous_schema_version = latest.and_then(|version| version.schema_version);
1641 match requested_type {
1642 Some(value_type) if Some(value_type) != previous_type => (
1643 Some(value_type),
1644 Some(previous_schema_version.unwrap_or(0) + 1),
1645 ),
1646 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1647 None => (previous_type, previous_schema_version),
1648 }
1649}
1650
1651fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1652 let valid = match value_type {
1653 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1654 ConfigValueType::Int => matches!(
1655 value,
1656 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1657 ),
1658 ConfigValueType::String => matches!(value, Value::Text(_)),
1659 ConfigValueType::Url => validate_config_url(value),
1660 ConfigValueType::Object => validate_config_json_shape(value, true),
1661 ConfigValueType::Array => {
1662 matches!(value, Value::Array(_) | Value::Vector(_))
1663 || validate_config_json_shape(value, false)
1664 }
1665 };
1666 if valid {
1667 Ok(())
1668 } else {
1669 Err(RedDBError::InvalidConfig(format!(
1670 "CONFIG value type mismatch: expected {}, got {}",
1671 value_type.as_str(),
1672 config_actual_value_type(value),
1673 )))
1674 }
1675}
1676
1677fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1682 collection == "red.config" && key == "policy.enforcement_mode"
1683}
1684
1685fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1689 let text = match value {
1690 Value::Text(text) => text.as_ref(),
1691 _ => {
1692 return Err(RedDBError::InvalidConfig(format!(
1693 "config key `{}` must be a string ({} or {}); got {}",
1694 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1695 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1696 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1697 config_actual_value_type(value),
1698 )));
1699 }
1700 };
1701 if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1702 Ok(())
1703 } else {
1704 Err(RedDBError::InvalidConfig(format!(
1705 "config key `{}` accepts only `{}` or `{}`, got `{}`",
1706 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1707 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1708 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1709 text,
1710 )))
1711 }
1712}
1713
1714fn validate_config_url(value: &Value) -> bool {
1715 let url = match value {
1716 Value::Url(value) => value.as_str(),
1717 Value::Text(value) => value.as_ref(),
1718 _ => return false,
1719 };
1720 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1721}
1722
1723fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1724 let Value::Json(bytes) = value else {
1725 return false;
1726 };
1727 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1728 return false;
1729 };
1730 matches!(
1731 (object, json),
1732 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1733 )
1734}
1735
1736fn config_actual_value_type(value: &Value) -> &'static str {
1737 match value {
1738 Value::Null => "null",
1739 Value::Boolean(_) => "bool",
1740 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1741 Value::Text(_) => "string",
1742 Value::Url(_) => "url",
1743 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1744 Ok(crate::json::Value::Object(_)) => "object",
1745 Ok(crate::json::Value::Array(_)) => "array",
1746 _ => "json",
1747 },
1748 Value::Array(_) | Value::Vector(_) => "array",
1749 _ => "other",
1750 }
1751}
1752
1753fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1754 value_type
1755 .map(|value_type| Value::text(value_type.as_str()))
1756 .unwrap_or(Value::Null)
1757}
1758
1759fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1760 match value {
1761 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1762 _ => None,
1763 }
1764}
1765
1766fn config_tags_value(tags: &[String]) -> Value {
1767 if tags.is_empty() {
1768 return Value::Null;
1769 }
1770 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1771}
1772
1773fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1774 match value {
1775 Some(Value::Array(values)) => values
1776 .iter()
1777 .filter_map(|value| match value {
1778 Value::Text(tag) => Some(tag.to_string()),
1779 _ => None,
1780 })
1781 .collect(),
1782 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1783 .ok()
1784 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1785 .map(|values| {
1786 values
1787 .into_iter()
1788 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1789 .collect()
1790 })
1791 .unwrap_or_default(),
1792 _ => Vec::new(),
1793 }
1794}
1795
1796fn config_payload_sensitivity(
1797 resource_type: &str,
1798 field: &str,
1799 value: &Value,
1800) -> crate::runtime::control_events::Sensitivity {
1801 let payload = config_payload_bytes(value);
1802 if config_payload_raw_allowed(resource_type, field) {
1803 crate::runtime::control_events::Sensitivity::raw(
1804 String::from_utf8_lossy(&payload).into_owned(),
1805 )
1806 } else {
1807 crate::runtime::control_events::Sensitivity::hashed(&payload)
1808 }
1809}
1810
1811fn config_payload_bytes(value: &Value) -> Vec<u8> {
1812 let json = crate::presentation::entity_json::storage_value_to_json(value);
1813 crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1814}
1815
1816fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1817 const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1818 RAW_PAYLOAD_FIELDS
1819 .iter()
1820 .any(|(allowed_type, allowed_field)| {
1821 *allowed_type == resource_type && *allowed_field == field
1822 })
1823}
1824
1825fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1826 match mutability {
1827 crate::auth::registry::Mutability::Immutable => "immutable",
1828 crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1829 }
1830}
1831
1832fn current_unix_ms() -> u64 {
1833 std::time::SystemTime::now()
1834 .duration_since(std::time::UNIX_EPOCH)
1835 .unwrap_or_default()
1836 .as_millis() as u64
1837}