1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18 id: EntityId,
19 key: String,
20 version: i64,
21 value: Value,
22 tombstone: bool,
23 created_at_ms: i64,
24 op: String,
25 value_type: Option<ConfigValueType>,
26 schema_version: Option<i64>,
27 tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31 fn key(&self) -> &str {
32 &self.key
33 }
34
35 fn version(&self) -> i64 {
36 self.version
37 }
38}
39
40impl ConfigVersion {
41 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42 Self {
43 id: version.id,
44 key: version.key,
45 version: version.version,
46 value: version.value,
47 tombstone: version.tombstone,
48 created_at_ms: version.created_at_ms,
49 op: version.op,
50 value_type: row
51 .get_field("value_type")
52 .and_then(config_value_type_from_value),
53 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54 tags: config_tags_from_value(row.get_field("tags")),
55 }
56 }
57}
58
59struct ConfigSecretRef {
60 collection: String,
61 key: String,
62}
63
64struct ConfigMutationEvidence {
65 id: String,
66 resource_type: String,
67 managed: bool,
68 mutability: crate::auth::registry::Mutability,
69 matched_action: Option<String>,
70 matched_resource: Option<String>,
71 payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75 Allowed(ConfigMutationEvidence),
76 Denied {
77 reason: String,
78 evidence: ConfigMutationEvidence,
79 },
80}
81
82impl RedDBRuntime {
83 pub fn execute_config_command(
84 &self,
85 raw_query: &str,
86 cmd: &crate::storage::query::ast::ConfigCommand,
87 ) -> RedDBResult<RuntimeQueryResult> {
88 use crate::storage::query::ast::ConfigCommand;
89
90 match cmd {
91 ConfigCommand::Put {
92 collection,
93 key,
94 value,
95 value_type,
96 tags,
97 } => self.config_write_result(
98 raw_query,
99 collection,
100 key,
101 value.clone(),
102 *value_type,
103 tags,
104 "put",
105 ),
106 ConfigCommand::Rotate {
107 collection,
108 key,
109 value,
110 value_type,
111 tags,
112 } => self.config_write_result(
113 raw_query,
114 collection,
115 key,
116 value.clone(),
117 *value_type,
118 tags,
119 "rotate",
120 ),
121 ConfigCommand::Get { collection, key } => {
122 self.config_get_result(raw_query, collection, key)
123 }
124 ConfigCommand::Resolve { collection, key } => {
125 self.config_resolve_result(raw_query, collection, key)
126 }
127 ConfigCommand::Delete { collection, key } => {
128 self.config_delete_result(raw_query, collection, key)
129 }
130 ConfigCommand::History { collection, key } => {
131 self.config_history_result(raw_query, collection, key)
132 }
133 ConfigCommand::List {
134 collection,
135 prefix,
136 limit,
137 offset,
138 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139 ConfigCommand::Watch {
140 collection,
141 key,
142 prefix,
143 from_lsn,
144 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146 Err(invalid_config_volatility(operation))
147 }
148 }
149 }
150
151 pub(crate) fn validate_config_command_before_auth(
152 &self,
153 cmd: &crate::storage::query::ast::ConfigCommand,
154 ) -> RedDBResult<()> {
155 use crate::storage::query::ast::ConfigCommand;
156 match cmd {
157 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158 Err(invalid_config_volatility(operation))
159 }
160 ConfigCommand::Put { collection, .. }
161 | ConfigCommand::Get { collection, .. }
162 | ConfigCommand::Resolve { collection, .. }
163 | ConfigCommand::Rotate { collection, .. }
164 | ConfigCommand::Delete { collection, .. }
165 | ConfigCommand::History { collection, .. }
166 | ConfigCommand::List { collection, .. }
167 | ConfigCommand::Watch { collection, .. } => {
168 let snapshot = self.inner.db.catalog_model_snapshot();
169 let Some(actual_model) = snapshot
170 .collections
171 .iter()
172 .find(|c| c.name == *collection)
173 .map(|c| c.declared_model.unwrap_or(c.model))
174 else {
175 return Ok(());
176 };
177 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178 CollectionModel::Config,
179 actual_model,
180 )
181 }
182 }
183 }
184
185 fn config_resolve_result(
186 &self,
187 raw_query: &str,
188 collection: &str,
189 key: &str,
190 ) -> RedDBResult<RuntimeQueryResult> {
191 let latest = self.latest_config_version(collection, key)?;
192 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193 self.audit_config_resolve(
194 collection,
195 key,
196 None,
197 crate::runtime::audit_log::Outcome::Denied,
198 &reason,
199 );
200 return Err(RedDBError::Query(reason));
201 }
202
203 let Some(version) = latest else {
204 let reason = "not_found";
205 self.audit_config_resolve(
206 collection,
207 key,
208 None,
209 crate::runtime::audit_log::Outcome::Denied,
210 reason,
211 );
212 return Err(RedDBError::NotFound(format!(
213 "config '{}.{}' not found",
214 collection, key
215 )));
216 };
217 if version.tombstone {
218 let reason = "deleted";
219 self.audit_config_resolve(
220 collection,
221 key,
222 None,
223 crate::runtime::audit_log::Outcome::Denied,
224 reason,
225 );
226 return Err(RedDBError::NotFound(format!(
227 "config '{}.{}' is deleted",
228 collection, key
229 )));
230 }
231
232 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233 self.audit_config_resolve(
234 collection,
235 key,
236 None,
237 crate::runtime::audit_log::Outcome::Error,
238 &err.to_string(),
239 );
240 })?;
241
242 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243 Ok(value) => {
244 if value_looks_like_secret_ref(&value) {
245 let err = secret_ref_chain_error(
246 collection,
247 key,
248 &secret_ref.collection,
249 &secret_ref.key,
250 );
251 let reason = err.to_string();
252 self.audit_config_resolve(
253 collection,
254 key,
255 Some(&secret_ref),
256 crate::runtime::audit_log::Outcome::Error,
257 &reason,
258 );
259 return Err(err);
260 }
261 self.audit_config_resolve(
262 collection,
263 key,
264 Some(&secret_ref),
265 crate::runtime::audit_log::Outcome::Success,
266 "ok",
267 );
268 let mut result = UnifiedResult::with_columns(vec![
269 "collection".into(),
270 "key".into(),
271 "value".into(),
272 "resolved_store".into(),
273 "resolved_collection".into(),
274 "resolved_key".into(),
275 ]);
276 let mut record = UnifiedRecord::new();
277 record.set("collection", Value::text(collection.to_string()));
278 record.set("key", Value::text(key.to_string()));
279 record.set("value", value);
280 record.set("resolved_store", Value::text("vault"));
281 record.set("resolved_collection", Value::text(secret_ref.collection));
282 record.set("resolved_key", Value::text(secret_ref.key));
283 result.push(record);
284 Ok(RuntimeQueryResult {
285 query: raw_query.to_string(),
286 mode: crate::storage::query::modes::QueryMode::Sql,
287 statement: "config_resolve",
288 engine: "config",
289 result,
290 affected_rows: 0,
291 statement_type: "select",
292 bookmark: None,
293 })
294 }
295 Err(err) => {
296 let reason = err.to_string();
297 let outcome = if reason.contains("denied") {
298 crate::runtime::audit_log::Outcome::Denied
299 } else {
300 crate::runtime::audit_log::Outcome::Error
301 };
302 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
303 Err(err)
304 }
305 }
306 }
307
308 fn config_write_result(
309 &self,
310 raw_query: &str,
311 collection: &str,
312 key: &str,
313 value: Value,
314 requested_type: Option<ConfigValueType>,
315 tags: &[String],
316 op: &str,
317 ) -> RedDBResult<RuntimeQueryResult> {
318 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
319 ConfigMutationAuthz::Allowed(evidence) => evidence,
320 ConfigMutationAuthz::Denied {
321 reason,
322 mut evidence,
323 } => {
324 evidence.payload = Some(value.clone());
325 let _ = self.emit_config_mutation_event(
326 crate::runtime::control_events::EventKind::ConfigWrite,
327 crate::runtime::control_events::Outcome::Denied,
328 "config:write",
329 collection,
330 key,
331 Some(reason.clone()),
332 &evidence,
333 );
334 return Err(RedDBError::Query(reason));
335 }
336 };
337 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
338 let _ = self.emit_config_mutation_event(
339 crate::runtime::control_events::EventKind::ConfigWrite,
340 crate::runtime::control_events::Outcome::Error,
341 "config:write",
342 collection,
343 key,
344 Some(err.to_string()),
345 &evidence,
346 );
347 return Err(err);
348 }
349 if is_enforcement_mode_config(collection, key) {
354 if let Err(err) = validate_enforcement_mode_value(&value) {
355 let _ = self.emit_config_mutation_event(
356 crate::runtime::control_events::EventKind::ConfigWrite,
357 crate::runtime::control_events::Outcome::Denied,
358 "config:write",
359 collection,
360 key,
361 Some(err.to_string()),
362 &evidence,
363 );
364 return Err(err);
365 }
366 }
367 if let Err(err) = self.ensure_config_collection(collection) {
368 let _ = self.emit_config_mutation_event(
369 crate::runtime::control_events::EventKind::ConfigWrite,
370 crate::runtime::control_events::Outcome::Error,
371 "config:write",
372 collection,
373 key,
374 Some(err.to_string()),
375 &evidence,
376 );
377 return Err(err);
378 }
379 let latest = match self.latest_config_version(collection, key) {
380 Ok(latest) => latest,
381 Err(err) => {
382 let _ = self.emit_config_mutation_event(
383 crate::runtime::control_events::EventKind::ConfigWrite,
384 crate::runtime::control_events::Outcome::Error,
385 "config:write",
386 collection,
387 key,
388 Some(err.to_string()),
389 &evidence,
390 );
391 return Err(err);
392 }
393 };
394 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
395 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
396 if let Some(value_type) = value_type {
397 if let Err(err) = validate_config_value_type(&value, value_type) {
398 let _ = self.emit_config_mutation_event(
399 crate::runtime::control_events::EventKind::ConfigWrite,
400 crate::runtime::control_events::Outcome::Error,
401 "config:write",
402 collection,
403 key,
404 Some(err.to_string()),
405 &evidence,
406 );
407 return Err(err);
408 }
409 }
410 evidence.payload = Some(value.clone());
411 if let Some(reason) = self.secret_ref_guard_write_check(collection, key, &value) {
412 let _ = self.emit_config_mutation_event(
413 crate::runtime::control_events::EventKind::ConfigWrite,
414 crate::runtime::control_events::Outcome::Denied,
415 "config:write",
416 collection,
417 key,
418 Some(reason.to_string()),
419 &evidence,
420 );
421 return Err(reason);
422 }
423 let before = latest.as_ref().and_then(|version| {
424 if version.tombstone {
425 None
426 } else {
427 Some(crate::presentation::entity_json::storage_value_to_json(
428 &version.value,
429 ))
430 }
431 });
432 let after = Some(crate::presentation::entity_json::storage_value_to_json(
433 &value,
434 ));
435 let change_op = if latest.is_some() {
436 crate::replication::cdc::ChangeOperation::Update
437 } else {
438 crate::replication::cdc::ChangeOperation::Insert
439 };
440 let id = match self.append_config_version(
441 collection,
442 key,
443 value,
444 version,
445 false,
446 op,
447 value_type,
448 schema_version,
449 tags,
450 ) {
451 Ok(id) => id,
452 Err(err) => {
453 let _ = self.emit_config_mutation_event(
454 crate::runtime::control_events::EventKind::ConfigWrite,
455 crate::runtime::control_events::Outcome::Error,
456 "config:write",
457 collection,
458 key,
459 Some(err.to_string()),
460 &evidence,
461 );
462 return Err(err);
463 }
464 };
465 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
466 if let Err(err) = self.prune_config_history(collection, key) {
467 let _ = self.emit_config_mutation_event(
468 crate::runtime::control_events::EventKind::ConfigWrite,
469 crate::runtime::control_events::Outcome::Error,
470 "config:write",
471 collection,
472 key,
473 Some(err.to_string()),
474 &evidence,
475 );
476 return Err(err);
477 }
478 self.invalidate_result_cache();
479 if let Err(err) = self.emit_config_mutation_event(
480 crate::runtime::control_events::EventKind::ConfigWrite,
481 crate::runtime::control_events::Outcome::Allowed,
482 "config:write",
483 collection,
484 key,
485 None,
486 &evidence,
487 ) {
488 let _ = self.inner.db.store().delete(collection, id);
489 self.invalidate_result_cache();
490 return Err(err);
491 }
492 if is_enforcement_mode_config(collection, key) {
496 if let Some(auth_store) = self.inner.auth_store.read().clone() {
497 if let Value::Text(text) =
498 &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
499 {
500 if let Some(mode) =
501 crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
502 {
503 auth_store.set_enforcement_mode(mode);
504 }
505 }
506 }
507 }
508 Ok(config_write_output(
509 raw_query,
510 collection,
511 key,
512 version,
513 id,
514 value_type,
515 schema_version,
516 tags,
517 match op {
518 "rotate" => "config_rotate",
519 _ => "config_put",
520 },
521 1,
522 ))
523 }
524
525 fn config_delete_result(
526 &self,
527 raw_query: &str,
528 collection: &str,
529 key: &str,
530 ) -> RedDBResult<RuntimeQueryResult> {
531 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
532 ConfigMutationAuthz::Allowed(evidence) => evidence,
533 ConfigMutationAuthz::Denied { reason, evidence } => {
534 let _ = self.emit_config_mutation_event(
535 crate::runtime::control_events::EventKind::ConfigDelete,
536 crate::runtime::control_events::Outcome::Denied,
537 "config:delete",
538 collection,
539 key,
540 Some(reason.clone()),
541 &evidence,
542 );
543 return Err(RedDBError::Query(reason));
544 }
545 };
546 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
547 let _ = self.emit_config_mutation_event(
548 crate::runtime::control_events::EventKind::ConfigDelete,
549 crate::runtime::control_events::Outcome::Error,
550 "config:delete",
551 collection,
552 key,
553 Some(err.to_string()),
554 &evidence,
555 );
556 return Err(err);
557 }
558 if let Err(err) = self.ensure_config_collection(collection) {
559 let _ = self.emit_config_mutation_event(
560 crate::runtime::control_events::EventKind::ConfigDelete,
561 crate::runtime::control_events::Outcome::Error,
562 "config:delete",
563 collection,
564 key,
565 Some(err.to_string()),
566 &evidence,
567 );
568 return Err(err);
569 }
570 let latest = match self.latest_config_version(collection, key) {
571 Ok(latest) => latest,
572 Err(err) => {
573 let _ = self.emit_config_mutation_event(
574 crate::runtime::control_events::EventKind::ConfigDelete,
575 crate::runtime::control_events::Outcome::Error,
576 "config:delete",
577 collection,
578 key,
579 Some(err.to_string()),
580 &evidence,
581 );
582 return Err(err);
583 }
584 };
585 evidence.payload = latest.as_ref().map(|version| version.value.clone());
586 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
587 let value_type = latest.as_ref().and_then(|version| version.value_type);
588 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
589 let id = match self.append_config_version(
590 collection,
591 key,
592 Value::Null,
593 version,
594 true,
595 "delete",
596 value_type,
597 schema_version,
598 &[],
599 ) {
600 Ok(id) => id,
601 Err(err) => {
602 let _ = self.emit_config_mutation_event(
603 crate::runtime::control_events::EventKind::ConfigDelete,
604 crate::runtime::control_events::Outcome::Error,
605 "config:delete",
606 collection,
607 key,
608 Some(err.to_string()),
609 &evidence,
610 );
611 return Err(err);
612 }
613 };
614 if let Some(before) = latest.as_ref().and_then(|version| {
615 if version.tombstone {
616 None
617 } else {
618 Some(crate::presentation::entity_json::storage_value_to_json(
619 &version.value,
620 ))
621 }
622 }) {
623 self.record_kv_watch_event(
624 crate::replication::cdc::ChangeOperation::Delete,
625 collection,
626 key,
627 id.raw(),
628 Some(before),
629 None,
630 );
631 }
632 if let Err(err) = self.prune_config_history(collection, key) {
633 let _ = self.emit_config_mutation_event(
634 crate::runtime::control_events::EventKind::ConfigDelete,
635 crate::runtime::control_events::Outcome::Error,
636 "config:delete",
637 collection,
638 key,
639 Some(err.to_string()),
640 &evidence,
641 );
642 return Err(err);
643 }
644 self.invalidate_result_cache();
645 if let Err(err) = self.emit_config_mutation_event(
646 crate::runtime::control_events::EventKind::ConfigDelete,
647 crate::runtime::control_events::Outcome::Allowed,
648 "config:delete",
649 collection,
650 key,
651 None,
652 &evidence,
653 ) {
654 let _ = self.inner.db.store().delete(collection, id);
655 self.invalidate_result_cache();
656 return Err(err);
657 }
658 Ok(config_write_output(
659 raw_query,
660 collection,
661 key,
662 version,
663 id,
664 value_type,
665 schema_version,
666 &[],
667 "delete",
668 1,
669 ))
670 }
671
672 fn config_get_result(
673 &self,
674 raw_query: &str,
675 collection: &str,
676 key: &str,
677 ) -> RedDBResult<RuntimeQueryResult> {
678 self.check_system_config_capability("config:read", collection, key)
679 .map_err(RedDBError::Query)?;
680 let latest = self.latest_config_version(collection, key)?;
681 let mut result = UnifiedResult::with_columns(vec![
682 "collection".into(),
683 "key".into(),
684 "value".into(),
685 "version".into(),
686 "value_type".into(),
687 "schema_version".into(),
688 "tags".into(),
689 "tombstone".into(),
690 ]);
691 let mut record = UnifiedRecord::new();
692 record.set("collection", Value::text(collection.to_string()));
693 record.set("key", Value::text(key.to_string()));
694 if let Some(version) = latest {
695 record.set("value", version.value);
696 record.set("version", Value::Integer(version.version));
697 record.set("value_type", config_value_type_value(version.value_type));
698 record.set(
699 "schema_version",
700 version
701 .schema_version
702 .map(Value::Integer)
703 .unwrap_or(Value::Null),
704 );
705 record.set("tags", config_tags_value(&version.tags));
706 record.set("tombstone", Value::Boolean(version.tombstone));
707 } else {
708 record.set("value", Value::Null);
709 record.set("version", Value::Null);
710 record.set("value_type", Value::Null);
711 record.set("schema_version", Value::Null);
712 record.set("tags", Value::Null);
713 record.set("tombstone", Value::Boolean(false));
714 }
715 result.push(record);
716 Ok(RuntimeQueryResult {
717 query: raw_query.to_string(),
718 mode: crate::storage::query::modes::QueryMode::Sql,
719 statement: "config_get",
720 engine: "config",
721 result,
722 affected_rows: 0,
723 statement_type: "select",
724 bookmark: None,
725 })
726 }
727
728 fn config_history_result(
729 &self,
730 raw_query: &str,
731 collection: &str,
732 key: &str,
733 ) -> RedDBResult<RuntimeQueryResult> {
734 self.check_system_config_capability("config:read", collection, key)
735 .map_err(RedDBError::Query)?;
736 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
737 let mut result = UnifiedResult::with_columns(vec![
738 "collection".into(),
739 "key".into(),
740 "version".into(),
741 "value".into(),
742 "value_type".into(),
743 "schema_version".into(),
744 "tags".into(),
745 "tombstone".into(),
746 "op".into(),
747 "created_at_ms".into(),
748 ]);
749 for version in versions {
750 let mut record = UnifiedRecord::new();
751 record.set("collection", Value::text(collection.to_string()));
752 record.set("key", Value::text(key.to_string()));
753 record.set("version", Value::Integer(version.version));
754 record.set("value", version.value);
755 record.set("value_type", config_value_type_value(version.value_type));
756 record.set(
757 "schema_version",
758 version
759 .schema_version
760 .map(Value::Integer)
761 .unwrap_or(Value::Null),
762 );
763 record.set("tags", Value::Null);
764 record.set("tombstone", Value::Boolean(version.tombstone));
765 record.set("op", Value::text(version.op));
766 record.set("created_at_ms", Value::Integer(version.created_at_ms));
767 result.push(record);
768 }
769 Ok(RuntimeQueryResult {
770 query: raw_query.to_string(),
771 mode: crate::storage::query::modes::QueryMode::Sql,
772 statement: "config_history",
773 engine: "config",
774 result,
775 affected_rows: 0,
776 statement_type: "select",
777 bookmark: None,
778 })
779 }
780
781 fn config_list_result(
782 &self,
783 raw_query: &str,
784 collection: &str,
785 prefix: Option<&str>,
786 limit: Option<usize>,
787 offset: usize,
788 ) -> RedDBResult<RuntimeQueryResult> {
789 let mut versions = self.latest_config_versions(collection, prefix)?;
790 versions.sort_by(|left, right| left.key.cmp(&right.key));
791 let mut result = UnifiedResult::with_columns(vec![
792 "collection".into(),
793 "key".into(),
794 "value".into(),
795 "version".into(),
796 "value_type".into(),
797 "schema_version".into(),
798 "tags".into(),
799 "tombstone".into(),
800 "op".into(),
801 "created_at_ms".into(),
802 ]);
803 for version in versions
804 .into_iter()
805 .filter(|version| {
806 self.check_config_capability("config:read", collection, &version.key)
807 .is_ok()
808 })
809 .skip(offset)
810 .take(limit.unwrap_or(usize::MAX))
811 {
812 let mut record = UnifiedRecord::new();
813 record.set("collection", Value::text(collection.to_string()));
814 record.set("key", Value::text(version.key));
815 record.set("value", version.value);
816 record.set("version", Value::Integer(version.version));
817 record.set("value_type", config_value_type_value(version.value_type));
818 record.set(
819 "schema_version",
820 version
821 .schema_version
822 .map(Value::Integer)
823 .unwrap_or(Value::Null),
824 );
825 record.set("tags", config_tags_value(&version.tags));
826 record.set("tombstone", Value::Boolean(version.tombstone));
827 record.set("op", Value::text(version.op));
828 record.set("created_at_ms", Value::Integer(version.created_at_ms));
829 result.push(record);
830 }
831 Ok(RuntimeQueryResult {
832 query: raw_query.to_string(),
833 mode: crate::storage::query::modes::QueryMode::Sql,
834 statement: "config_list",
835 engine: "config",
836 result,
837 affected_rows: 0,
838 statement_type: "select",
839 bookmark: None,
840 })
841 }
842
843 fn config_watch_result(
844 &self,
845 raw_query: &str,
846 collection: &str,
847 key: &str,
848 prefix: bool,
849 from_lsn: Option<u64>,
850 ) -> RedDBResult<RuntimeQueryResult> {
851 let watch_key = if prefix {
852 format!("{key}.*")
853 } else {
854 key.to_string()
855 };
856 let endpoint = match from_lsn {
857 Some(lsn) => {
858 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
859 }
860 None => format!("/collections/{collection}/config/{watch_key}/watch"),
861 };
862 let mut result = UnifiedResult::with_columns(vec![
863 "collection".into(),
864 "key".into(),
865 "prefix".into(),
866 "from_lsn".into(),
867 "watch_url".into(),
868 "streaming".into(),
869 ]);
870 let mut record = UnifiedRecord::new();
871 record.set("collection", Value::text(collection.to_string()));
872 record.set("key", Value::text(watch_key));
873 record.set("prefix", Value::Boolean(prefix));
874 record.set(
875 "from_lsn",
876 from_lsn
877 .map(Value::UnsignedInteger)
878 .unwrap_or(crate::storage::schema::Value::Null),
879 );
880 record.set("watch_url", Value::text(endpoint));
881 record.set("streaming", Value::Boolean(true));
882 result.push(record);
883 Ok(RuntimeQueryResult {
884 query: raw_query.to_string(),
885 mode: crate::storage::query::modes::QueryMode::Sql,
886 statement: "config_watch",
887 engine: "config",
888 result,
889 affected_rows: 0,
890 statement_type: "stream",
891 bookmark: None,
892 })
893 }
894
895 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
896 let store = self.inner.db.store();
897 if store.get_collection(collection).is_none() {
898 store
899 .create_collection(collection)
900 .map_err(|err| RedDBError::Internal(err.to_string()))?;
901 }
902 if let Some(contract) = self.inner.db.collection_contract(collection) {
903 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
904 CollectionModel::Config,
905 contract.declared_model,
906 )?;
907 return Ok(());
908 }
909 let now = current_unix_ms();
910 self.inner
911 .db
912 .save_collection_contract(CollectionContract {
913 name: collection.to_string(),
914 declared_model: CollectionModel::Config,
915 schema_mode: SchemaMode::Dynamic,
916 origin: ContractOrigin::Explicit,
917 version: 1,
918 created_at_unix_ms: now as u128,
919 updated_at_unix_ms: now as u128,
920 default_ttl_ms: None,
921 vector_dimension: None,
922 vector_metric: None,
923 context_index_fields: Vec::new(),
924 declared_columns: Vec::new(),
925 table_def: None,
926 timestamps_enabled: false,
927 context_index_enabled: false,
928 metrics_raw_retention_ms: None,
929 metrics_rollup_policies: Vec::new(),
930 metrics_tenant_identity: None,
931 metrics_namespace: None,
932 append_only: false,
933 subscriptions: Vec::new(),
934 analytics_config: Vec::new(),
935 session_key: None,
936 session_gap_ms: None,
937 retention_duration_ms: None,
938 analytical_storage: None,
939 })
940 .map(|_| ())
941 .map_err(|err| RedDBError::Internal(err.to_string()))
942 }
943
944 fn append_config_version(
945 &self,
946 collection: &str,
947 key: &str,
948 value: Value,
949 version: i64,
950 tombstone: bool,
951 op: &str,
952 value_type: Option<ConfigValueType>,
953 schema_version: Option<i64>,
954 tags: &[String],
955 ) -> RedDBResult<EntityId> {
956 let now = current_unix_ms() as i64;
957 let fields = vec![
958 ("key".to_string(), Value::text(key.to_string())),
959 ("value".to_string(), value),
960 ("version".to_string(), Value::Integer(version)),
961 (
962 "value_type".to_string(),
963 config_value_type_value(value_type),
964 ),
965 (
966 "schema_version".to_string(),
967 schema_version.map(Value::Integer).unwrap_or(Value::Null),
968 ),
969 ("tombstone".to_string(), Value::Boolean(tombstone)),
970 ("op".to_string(), Value::text(op.to_string())),
971 ("created_at_ms".to_string(), Value::Integer(now)),
972 ("tags".to_string(), config_tags_value(tags)),
973 ];
974 let mut row = RowData::new(Vec::new());
975 row.named = Some(fields.into_iter().collect());
976 let entity = UnifiedEntity::new(
977 EntityId::new(0),
978 EntityKind::TableRow {
979 table: Arc::from(collection),
980 row_id: 0,
981 },
982 EntityData::Row(row),
983 );
984 self.inner
985 .db
986 .store()
987 .insert(collection, entity)
988 .map_err(|err| RedDBError::Internal(err.to_string()))
989 }
990
991 fn latest_config_version(
992 &self,
993 collection: &str,
994 key: &str,
995 ) -> RedDBResult<Option<ConfigVersion>> {
996 Ok(super::keyed_spine::latest_version(
997 self.config_versions(collection, key)?,
998 ))
999 }
1000
1001 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
1002 let store = self.inner.db.store();
1003 let Some(manager) = store.get_collection(collection) else {
1004 return Ok(Vec::new());
1005 };
1006 let mut versions = Vec::new();
1007 for entity in manager.query_all(|_| true) {
1008 let EntityData::Row(row) = &entity.data else {
1009 continue;
1010 };
1011 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1012 continue;
1013 };
1014 if version.key != key {
1015 continue;
1016 }
1017 versions.push(ConfigVersion::from_keyed_row(version, row));
1018 }
1019 Ok(versions)
1020 }
1021
1022 fn latest_config_versions(
1023 &self,
1024 collection: &str,
1025 prefix: Option<&str>,
1026 ) -> RedDBResult<Vec<ConfigVersion>> {
1027 let store = self.inner.db.store();
1028 let Some(manager) = store.get_collection(collection) else {
1029 return Ok(Vec::new());
1030 };
1031 let mut versions = Vec::new();
1032 for entity in manager.query_all(|_| true) {
1033 let EntityData::Row(row) = &entity.data else {
1034 continue;
1035 };
1036 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1037 continue;
1038 };
1039 versions.push(ConfigVersion::from_keyed_row(version, row));
1040 }
1041 Ok(super::keyed_spine::latest_versions(versions, prefix))
1042 }
1043
1044 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1045 let mut versions = self.config_versions(collection, key)?;
1046 if versions.len() <= CONFIG_HISTORY_LIMIT {
1047 return Ok(());
1048 }
1049 versions = super::keyed_spine::history_versions(versions);
1050 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1051 let store = self.inner.db.store();
1052 for version in versions.into_iter().take(drop_count) {
1053 store
1054 .delete(collection, version.id)
1055 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1056 }
1057 Ok(())
1058 }
1059
1060 fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1061 let default_evidence = self.default_config_mutation_evidence(collection, key);
1062 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1063 return ConfigMutationAuthz::Allowed(default_evidence);
1064 };
1065 if !auth_store.iam_authorization_enabled() {
1066 return ConfigMutationAuthz::Allowed(default_evidence);
1067 }
1068 let Some((principal, role)) = current_auth_identity() else {
1069 return ConfigMutationAuthz::Denied {
1070 reason:
1071 "IAM authorization is enabled; config capability check requires an authenticated principal"
1072 .to_string(),
1073 evidence: default_evidence,
1074 };
1075 };
1076 let tenant = current_tenant();
1077 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1078 let ctx = crate::auth::policies::EvalContext {
1079 principal_tenant: tenant.clone(),
1080 current_tenant: tenant.clone(),
1081 peer_ip: None,
1082 mfa_present: false,
1083 now_ms: crate::utils::now_unix_millis() as u128,
1084 principal_is_admin_role: role == crate::auth::Role::Admin,
1085 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1086 principal_is_platform_scoped: principal_id.tenant.is_none(),
1087 };
1088 let managed_key = if collection == "red.config" {
1089 format!("red.config.{key}")
1090 } else {
1091 key.to_string()
1092 };
1093 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1094 self.inner.config_registry.as_ref(),
1095 );
1096 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1097 crate::auth::managed_config::ManagedConfigDecision::Allow {
1098 entry_id,
1099 resource_type,
1100 managed,
1101 mutability,
1102 matched_action,
1103 matched_resource,
1104 ..
1105 } => {
1106 return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1107 id: entry_id,
1108 resource_type,
1109 managed,
1110 mutability,
1111 matched_action: Some(matched_action),
1112 matched_resource: Some(matched_resource),
1113 payload: None,
1114 });
1115 }
1116 crate::auth::managed_config::ManagedConfigDecision::Deny {
1117 entry_id,
1118 resource_type,
1119 managed,
1120 mutability,
1121 matched_action,
1122 matched_resource,
1123 reason,
1124 ..
1125 } => {
1126 return ConfigMutationAuthz::Denied {
1127 reason: format!(
1128 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1129 ),
1130 evidence: ConfigMutationEvidence {
1131 id: entry_id,
1132 resource_type,
1133 managed,
1134 mutability,
1135 matched_action: Some(matched_action),
1136 matched_resource: Some(matched_resource),
1137 payload: None,
1138 },
1139 };
1140 }
1141 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1142 }
1143
1144 let mut resource = crate::auth::policies::ResourceRef::new(
1145 "config",
1146 config_target_resource(collection, key),
1147 );
1148 if let Some(ref tenant) = tenant {
1149 resource = resource.with_tenant(tenant.clone());
1150 }
1151 if auth_store.check_policy_authz_with_role(
1152 &principal_id,
1153 "config:write",
1154 &resource,
1155 &ctx,
1156 role,
1157 ) {
1158 ConfigMutationAuthz::Allowed(default_evidence)
1159 } else {
1160 ConfigMutationAuthz::Denied {
1161 reason: format!(
1162 "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1163 principal,
1164 config_target_resource(collection, key)
1165 ),
1166 evidence: default_evidence,
1167 }
1168 }
1169 }
1170
1171 fn default_config_mutation_evidence(
1172 &self,
1173 collection: &str,
1174 key: &str,
1175 ) -> ConfigMutationEvidence {
1176 let id = if collection == "red.config" {
1177 format!("red.config.{key}")
1178 } else {
1179 key.to_string()
1180 };
1181 ConfigMutationEvidence {
1182 id,
1183 resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1184 managed: false,
1185 mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1186 matched_action: None,
1187 matched_resource: None,
1188 payload: None,
1189 }
1190 }
1191
1192 fn emit_config_mutation_event(
1193 &self,
1194 kind: crate::runtime::control_events::EventKind,
1195 outcome: crate::runtime::control_events::Outcome,
1196 action: &'static str,
1197 collection: &str,
1198 key: &str,
1199 reason: Option<String>,
1200 evidence: &ConfigMutationEvidence,
1201 ) -> RedDBResult<()> {
1202 use crate::runtime::control_events::{
1203 ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1204 };
1205
1206 let tenant = current_tenant();
1207 let principal = current_auth_identity();
1208 let actor_user = principal
1209 .as_ref()
1210 .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1211 let actor = actor_user
1212 .as_ref()
1213 .map(ActorRef::User)
1214 .unwrap_or(ActorRef::Anonymous);
1215 let ctx = ControlEventCtx {
1216 actor,
1217 scope: tenant
1218 .as_ref()
1219 .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1220 request_id: Some(std::borrow::Cow::Owned(format!(
1221 "conn-{}",
1222 current_connection_id()
1223 ))),
1224 trace_id: None,
1225 };
1226
1227 let mut fields = HashMap::new();
1228 fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1229 fields.insert(
1230 "resource_type".to_string(),
1231 Sensitivity::raw(evidence.resource_type.clone()),
1232 );
1233 fields.insert(
1234 "managed".to_string(),
1235 Sensitivity::raw(evidence.managed.to_string()),
1236 );
1237 fields.insert(
1238 "mutability".to_string(),
1239 Sensitivity::raw(config_mutability_label(evidence.mutability)),
1240 );
1241 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1242 fields.insert("key".to_string(), Sensitivity::raw(key));
1243 fields.insert(
1244 "connection_id".to_string(),
1245 Sensitivity::raw(current_connection_id().to_string()),
1246 );
1247 if let Some((_, role)) = principal {
1248 fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1249 }
1250 if let Some(matched_action) = &evidence.matched_action {
1251 fields.insert(
1252 "matched_action".to_string(),
1253 Sensitivity::raw(matched_action.clone()),
1254 );
1255 }
1256 if let Some(matched_resource) = &evidence.matched_resource {
1257 fields.insert(
1258 "matched_resource".to_string(),
1259 Sensitivity::raw(matched_resource.clone()),
1260 );
1261 }
1262 if let Some(payload) = &evidence.payload {
1263 fields.insert(
1264 "payload".to_string(),
1265 config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1266 );
1267 }
1268
1269 let event = ControlEvent {
1270 kind,
1271 outcome,
1272 action: std::borrow::Cow::Borrowed(action),
1273 resource: Some(format!(
1274 "config:{}",
1275 config_target_resource(collection, key)
1276 )),
1277 reason,
1278 matched_policy_id: None,
1279 fields,
1280 };
1281 let ledger = self.inner.control_event_ledger.read();
1282 match ledger.emit(&ctx, event) {
1283 Ok(_) => Ok(()),
1284 Err(err) if self.inner.control_event_config.require_persistence() => {
1285 Err(RedDBError::Internal(err.to_string()))
1286 }
1287 Err(_) => Ok(()),
1288 }
1289 }
1290
1291 fn check_config_capability(
1292 &self,
1293 action: &str,
1294 collection: &str,
1295 key: &str,
1296 ) -> Result<(), String> {
1297 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1298 return Ok(());
1299 };
1300 if !auth_store.iam_authorization_enabled() {
1301 return Ok(());
1302 }
1303 let Some((principal, role)) = current_auth_identity() else {
1304 return Err(
1305 "IAM authorization is enabled; config capability check requires an authenticated principal"
1306 .to_string(),
1307 );
1308 };
1309 let tenant = current_tenant();
1310 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1311 let mut resource = crate::auth::policies::ResourceRef::new(
1312 "config",
1313 config_target_resource(collection, key),
1314 );
1315 if let Some(ref tenant) = tenant {
1316 resource = resource.with_tenant(tenant.clone());
1317 }
1318 let ctx = crate::auth::policies::EvalContext {
1319 principal_tenant: tenant.clone(),
1320 current_tenant: tenant,
1321 peer_ip: None,
1322 mfa_present: false,
1323 now_ms: crate::utils::now_unix_millis() as u128,
1324 principal_is_admin_role: role == crate::auth::Role::Admin,
1325 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1326 principal_is_platform_scoped: principal_id.tenant.is_none(),
1327 };
1328 if action == "config:write" {
1329 let managed_key = if collection == "red.config" {
1330 format!("red.config.{key}")
1331 } else {
1332 key.to_string()
1333 };
1334 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1335 self.inner.config_registry.as_ref(),
1336 );
1337 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1338 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1339 crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1340 crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1341 return Err(format!(
1342 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1343 ));
1344 }
1345 }
1346 }
1347 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1348 Ok(())
1349 } else {
1350 Err(format!(
1351 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1352 principal,
1353 action,
1354 config_target_resource(collection, key)
1355 ))
1356 }
1357 }
1358
1359 fn check_system_config_capability(
1360 &self,
1361 action: &str,
1362 collection: &str,
1363 key: &str,
1364 ) -> Result<(), String> {
1365 if collection != "red.config" {
1366 return Ok(());
1367 }
1368 self.check_config_capability(action, collection, key)
1369 }
1370
1371 pub fn config_watch_events_since(
1372 &self,
1373 collection: &str,
1374 key: &str,
1375 since_lsn: u64,
1376 max_count: usize,
1377 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1378 self.kv_watch_events_since(collection, key, since_lsn, max_count)
1379 .into_iter()
1380 .map(|event| self.policy_filter_config_watch_event(event))
1381 .collect()
1382 }
1383
1384 pub fn config_watch_events_since_prefix(
1385 &self,
1386 collection: &str,
1387 prefix: &str,
1388 since_lsn: u64,
1389 max_count: usize,
1390 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1391 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1392 .into_iter()
1393 .map(|event| self.policy_filter_config_watch_event(event))
1394 .collect()
1395 }
1396
1397 fn policy_filter_config_watch_event(
1398 &self,
1399 mut event: crate::replication::cdc::KvWatchEvent,
1400 ) -> crate::replication::cdc::KvWatchEvent {
1401 if self
1402 .check_config_capability("config:read", &event.collection, &event.key)
1403 .is_err()
1404 {
1405 event.before = None;
1406 event.after = None;
1407 }
1408 event
1409 }
1410
1411 fn secret_ref_guard_write_check(
1417 &self,
1418 collection: &str,
1419 key: &str,
1420 value: &Value,
1421 ) -> Option<RedDBError> {
1422 if !value_looks_like_secret_ref(value) {
1423 return None;
1424 }
1425 let Ok(secret_ref) = parse_config_secret_ref(value) else {
1426 return None;
1427 };
1428 let unsealed = match self.peek_vault_unsealed(&secret_ref.collection, &secret_ref.key) {
1429 Ok(Some(value)) => value,
1430 Ok(None) => return None,
1431 Err(_) => return None,
1432 };
1433 if value_looks_like_secret_ref(&unsealed) {
1434 return Some(secret_ref_chain_error(
1435 collection,
1436 key,
1437 &secret_ref.collection,
1438 &secret_ref.key,
1439 ));
1440 }
1441 None
1442 }
1443
1444 fn audit_config_resolve(
1445 &self,
1446 collection: &str,
1447 key: &str,
1448 secret_ref: Option<&ConfigSecretRef>,
1449 outcome: crate::runtime::audit_log::Outcome,
1450 reason: &str,
1451 ) {
1452 let actor = current_auth_identity()
1453 .map(|(principal, _)| principal)
1454 .unwrap_or_else(|| "anonymous".to_string());
1455 let request_id = match current_connection_id() {
1456 0 => "embedded".to_string(),
1457 id => format!("conn-{id}"),
1458 };
1459 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1460 .principal(actor.clone())
1461 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1462 .resource(format!(
1463 "config:{}",
1464 config_target_resource(collection, key)
1465 ))
1466 .outcome(outcome)
1467 .correlation_id(request_id.clone())
1468 .fields([
1469 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1470 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1471 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1472 crate::runtime::audit_log::AuditFieldEscaper::field(
1473 "target",
1474 config_target_resource(collection, key),
1475 ),
1476 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1477 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1478 crate::runtime::audit_log::AuditFieldEscaper::field(
1479 "connection_id",
1480 current_connection_id(),
1481 ),
1482 ]);
1483 if let Some(tenant) = current_tenant() {
1484 builder = builder.tenant(tenant);
1485 }
1486 if let Some(secret_ref) = secret_ref {
1487 builder = builder.fields([
1488 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1489 crate::runtime::audit_log::AuditFieldEscaper::field(
1490 "resolved_collection",
1491 secret_ref.collection.as_str(),
1492 ),
1493 crate::runtime::audit_log::AuditFieldEscaper::field(
1494 "resolved_key",
1495 secret_ref.key.as_str(),
1496 ),
1497 crate::runtime::audit_log::AuditFieldEscaper::field(
1498 "resolved_target",
1499 format!("{}.{}", secret_ref.collection, secret_ref.key),
1500 ),
1501 ]);
1502 }
1503 self.audit_log().record_event(builder.build());
1504 }
1505}
1506
1507fn value_looks_like_secret_ref(value: &Value) -> bool {
1512 let Value::Json(bytes) = value else {
1513 return false;
1514 };
1515 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1516 return false;
1517 };
1518 let Some(object) = json.as_object() else {
1519 return false;
1520 };
1521 object.get("type").and_then(|value| value.as_str()) == Some("secret_ref")
1522}
1523
1524fn secret_ref_chain_error(
1525 source_collection: &str,
1526 source_key: &str,
1527 target_collection: &str,
1528 target_key: &str,
1529) -> RedDBError {
1530 RedDBError::InvalidConfig(format!(
1531 "secret_ref chain rejected: config `{source_collection}.{source_key}` points at vault `{target_collection}.{target_key}` which is itself a secret_ref; depth-1 invariant requires the target to resolve to a non-secret_ref value"
1532 ))
1533}
1534
1535fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1536 let Value::Json(bytes) = value else {
1537 return Err(RedDBError::InvalidConfig(
1538 "CONFIG value is not a SecretRef".to_string(),
1539 ));
1540 };
1541 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1542 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1543 })?;
1544 let Some(object) = json.as_object() else {
1545 return Err(RedDBError::InvalidConfig(
1546 "CONFIG SecretRef must be an object".to_string(),
1547 ));
1548 };
1549 let get_str = |field: &str| -> RedDBResult<&str> {
1550 object
1551 .get(field)
1552 .and_then(|value| value.as_str())
1553 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1554 };
1555 if get_str("type")? != "secret_ref" {
1556 return Err(RedDBError::InvalidConfig(
1557 "CONFIG value is not a SecretRef".to_string(),
1558 ));
1559 }
1560 if get_str("store")? != "vault" {
1561 return Err(RedDBError::InvalidConfig(
1562 "CONFIG SecretRef store is unsupported".to_string(),
1563 ));
1564 }
1565 Ok(ConfigSecretRef {
1566 collection: get_str("collection")?.to_string(),
1567 key: get_str("key")?.to_string(),
1568 })
1569}
1570
1571fn config_target_resource(collection: &str, key: &str) -> String {
1572 if collection == "red.config" {
1573 format!("red.config/{}", key.to_ascii_lowercase())
1574 } else {
1575 format!("{collection}.{key}")
1576 }
1577}
1578
1579fn config_write_output(
1580 raw_query: &str,
1581 collection: &str,
1582 key: &str,
1583 version: i64,
1584 id: EntityId,
1585 value_type: Option<ConfigValueType>,
1586 schema_version: Option<i64>,
1587 tags: &[String],
1588 statement: &'static str,
1589 affected_rows: u64,
1590) -> RuntimeQueryResult {
1591 let mut result = UnifiedResult::with_columns(vec![
1592 "ok".into(),
1593 "collection".into(),
1594 "key".into(),
1595 "version".into(),
1596 "value_type".into(),
1597 "schema_version".into(),
1598 "tags".into(),
1599 "id".into(),
1600 ]);
1601 let mut record = UnifiedRecord::new();
1602 record.set("ok", Value::Boolean(true));
1603 record.set("collection", Value::text(collection.to_string()));
1604 record.set("key", Value::text(key.to_string()));
1605 record.set("version", Value::Integer(version));
1606 record.set("value_type", config_value_type_value(value_type));
1607 record.set(
1608 "schema_version",
1609 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1610 );
1611 record.set("tags", config_tags_value(tags));
1612 record.set("id", Value::Integer(id.raw() as i64));
1613 result.push(record);
1614 RuntimeQueryResult {
1615 query: raw_query.to_string(),
1616 mode: crate::storage::query::modes::QueryMode::Sql,
1617 statement,
1618 engine: "config",
1619 result,
1620 affected_rows,
1621 statement_type: if statement == "delete" {
1622 "delete"
1623 } else {
1624 "update"
1625 },
1626 bookmark: None,
1627 }
1628}
1629
1630fn invalid_config_volatility(operation: &str) -> RedDBError {
1631 RedDBError::InvalidOperation(format!(
1632 "CONFIG does not support KV-only volatility operation {operation}"
1633 ))
1634}
1635
1636fn resolve_config_schema(
1637 latest: Option<&ConfigVersion>,
1638 requested_type: Option<ConfigValueType>,
1639) -> (Option<ConfigValueType>, Option<i64>) {
1640 let previous_type = latest.and_then(|version| version.value_type);
1641 let previous_schema_version = latest.and_then(|version| version.schema_version);
1642 match requested_type {
1643 Some(value_type) if Some(value_type) != previous_type => (
1644 Some(value_type),
1645 Some(previous_schema_version.unwrap_or(0) + 1),
1646 ),
1647 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1648 None => (previous_type, previous_schema_version),
1649 }
1650}
1651
1652fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1653 let valid = match value_type {
1654 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1655 ConfigValueType::Int => matches!(
1656 value,
1657 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1658 ),
1659 ConfigValueType::String => matches!(value, Value::Text(_)),
1660 ConfigValueType::Url => validate_config_url(value),
1661 ConfigValueType::Object => validate_config_json_shape(value, true),
1662 ConfigValueType::Array => {
1663 matches!(value, Value::Array(_) | Value::Vector(_))
1664 || validate_config_json_shape(value, false)
1665 }
1666 };
1667 if valid {
1668 Ok(())
1669 } else {
1670 Err(RedDBError::InvalidConfig(format!(
1671 "CONFIG value type mismatch: expected {}, got {}",
1672 value_type.as_str(),
1673 config_actual_value_type(value),
1674 )))
1675 }
1676}
1677
1678fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1683 collection == "red.config" && key == "policy.enforcement_mode"
1684}
1685
1686fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1690 let text = match value {
1691 Value::Text(text) => text.as_ref(),
1692 _ => {
1693 return Err(RedDBError::InvalidConfig(format!(
1694 "config key `{}` must be a string ({} or {}); got {}",
1695 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1696 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1697 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1698 config_actual_value_type(value),
1699 )));
1700 }
1701 };
1702 if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1703 Ok(())
1704 } else {
1705 Err(RedDBError::InvalidConfig(format!(
1706 "config key `{}` accepts only `{}` or `{}`, got `{}`",
1707 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1708 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1709 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1710 text,
1711 )))
1712 }
1713}
1714
1715fn validate_config_url(value: &Value) -> bool {
1716 let url = match value {
1717 Value::Url(value) => value.as_str(),
1718 Value::Text(value) => value.as_ref(),
1719 _ => return false,
1720 };
1721 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1722}
1723
1724fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1725 let Value::Json(bytes) = value else {
1726 return false;
1727 };
1728 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1729 return false;
1730 };
1731 matches!(
1732 (object, json),
1733 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1734 )
1735}
1736
1737fn config_actual_value_type(value: &Value) -> &'static str {
1738 match value {
1739 Value::Null => "null",
1740 Value::Boolean(_) => "bool",
1741 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1742 Value::Text(_) => "string",
1743 Value::Url(_) => "url",
1744 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1745 Ok(crate::json::Value::Object(_)) => "object",
1746 Ok(crate::json::Value::Array(_)) => "array",
1747 _ => "json",
1748 },
1749 Value::Array(_) | Value::Vector(_) => "array",
1750 _ => "other",
1751 }
1752}
1753
1754fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1755 value_type
1756 .map(|value_type| Value::text(value_type.as_str()))
1757 .unwrap_or(Value::Null)
1758}
1759
1760fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1761 match value {
1762 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1763 _ => None,
1764 }
1765}
1766
1767fn config_tags_value(tags: &[String]) -> Value {
1768 if tags.is_empty() {
1769 return Value::Null;
1770 }
1771 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1772}
1773
1774fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1775 match value {
1776 Some(Value::Array(values)) => values
1777 .iter()
1778 .filter_map(|value| match value {
1779 Value::Text(tag) => Some(tag.to_string()),
1780 _ => None,
1781 })
1782 .collect(),
1783 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1784 .ok()
1785 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1786 .map(|values| {
1787 values
1788 .into_iter()
1789 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1790 .collect()
1791 })
1792 .unwrap_or_default(),
1793 _ => Vec::new(),
1794 }
1795}
1796
1797fn config_payload_sensitivity(
1798 resource_type: &str,
1799 field: &str,
1800 value: &Value,
1801) -> crate::runtime::control_events::Sensitivity {
1802 let payload = config_payload_bytes(value);
1803 if config_payload_raw_allowed(resource_type, field) {
1804 crate::runtime::control_events::Sensitivity::raw(
1805 String::from_utf8_lossy(&payload).into_owned(),
1806 )
1807 } else {
1808 crate::runtime::control_events::Sensitivity::hashed(&payload)
1809 }
1810}
1811
1812fn config_payload_bytes(value: &Value) -> Vec<u8> {
1813 let json = crate::presentation::entity_json::storage_value_to_json(value);
1814 crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1815}
1816
1817fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1818 const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1819 RAW_PAYLOAD_FIELDS
1820 .iter()
1821 .any(|(allowed_type, allowed_field)| {
1822 *allowed_type == resource_type && *allowed_field == field
1823 })
1824}
1825
1826fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1827 match mutability {
1828 crate::auth::registry::Mutability::Immutable => "immutable",
1829 crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1830 }
1831}
1832
1833fn current_unix_ms() -> u64 {
1834 std::time::SystemTime::now()
1835 .duration_since(std::time::UNIX_EPOCH)
1836 .unwrap_or_default()
1837 .as_millis() as u64
1838}