1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18 id: EntityId,
19 key: String,
20 version: i64,
21 value: Value,
22 tombstone: bool,
23 created_at_ms: i64,
24 op: String,
25 value_type: Option<ConfigValueType>,
26 schema_version: Option<i64>,
27 tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31 fn key(&self) -> &str {
32 &self.key
33 }
34
35 fn version(&self) -> i64 {
36 self.version
37 }
38}
39
40impl ConfigVersion {
41 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42 Self {
43 id: version.id,
44 key: version.key,
45 version: version.version,
46 value: version.value,
47 tombstone: version.tombstone,
48 created_at_ms: version.created_at_ms,
49 op: version.op,
50 value_type: row
51 .get_field("value_type")
52 .and_then(config_value_type_from_value),
53 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54 tags: config_tags_from_value(row.get_field("tags")),
55 }
56 }
57}
58
59struct ConfigSecretRef {
60 collection: String,
61 key: String,
62}
63
64struct ConfigMutationEvidence {
65 id: String,
66 resource_type: String,
67 managed: bool,
68 mutability: crate::auth::registry::Mutability,
69 matched_action: Option<String>,
70 matched_resource: Option<String>,
71 payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75 Allowed(ConfigMutationEvidence),
76 Denied {
77 reason: String,
78 evidence: ConfigMutationEvidence,
79 },
80}
81
82impl RedDBRuntime {
83 pub fn execute_config_command(
84 &self,
85 raw_query: &str,
86 cmd: &crate::storage::query::ast::ConfigCommand,
87 ) -> RedDBResult<RuntimeQueryResult> {
88 use crate::storage::query::ast::ConfigCommand;
89
90 match cmd {
91 ConfigCommand::Put {
92 collection,
93 key,
94 value,
95 value_type,
96 tags,
97 } => self.config_write_result(
98 raw_query,
99 collection,
100 key,
101 value.clone(),
102 *value_type,
103 tags,
104 "put",
105 ),
106 ConfigCommand::Rotate {
107 collection,
108 key,
109 value,
110 value_type,
111 tags,
112 } => self.config_write_result(
113 raw_query,
114 collection,
115 key,
116 value.clone(),
117 *value_type,
118 tags,
119 "rotate",
120 ),
121 ConfigCommand::Get { collection, key } => {
122 self.config_get_result(raw_query, collection, key)
123 }
124 ConfigCommand::Resolve { collection, key } => {
125 self.config_resolve_result(raw_query, collection, key)
126 }
127 ConfigCommand::Delete { collection, key } => {
128 self.config_delete_result(raw_query, collection, key)
129 }
130 ConfigCommand::History { collection, key } => {
131 self.config_history_result(raw_query, collection, key)
132 }
133 ConfigCommand::List {
134 collection,
135 prefix,
136 limit,
137 offset,
138 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139 ConfigCommand::Watch {
140 collection,
141 key,
142 prefix,
143 from_lsn,
144 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146 Err(invalid_config_volatility(operation))
147 }
148 }
149 }
150
151 pub(crate) fn validate_config_command_before_auth(
152 &self,
153 cmd: &crate::storage::query::ast::ConfigCommand,
154 ) -> RedDBResult<()> {
155 use crate::storage::query::ast::ConfigCommand;
156 match cmd {
157 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158 Err(invalid_config_volatility(operation))
159 }
160 ConfigCommand::Put { collection, .. }
161 | ConfigCommand::Get { collection, .. }
162 | ConfigCommand::Resolve { collection, .. }
163 | ConfigCommand::Rotate { collection, .. }
164 | ConfigCommand::Delete { collection, .. }
165 | ConfigCommand::History { collection, .. }
166 | ConfigCommand::List { collection, .. }
167 | ConfigCommand::Watch { collection, .. } => {
168 let snapshot = self.inner.db.catalog_model_snapshot();
169 let Some(actual_model) = snapshot
170 .collections
171 .iter()
172 .find(|c| c.name == *collection)
173 .map(|c| c.declared_model.unwrap_or(c.model))
174 else {
175 return Ok(());
176 };
177 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178 CollectionModel::Config,
179 actual_model,
180 )
181 }
182 }
183 }
184
185 fn config_resolve_result(
186 &self,
187 raw_query: &str,
188 collection: &str,
189 key: &str,
190 ) -> RedDBResult<RuntimeQueryResult> {
191 let latest = self.latest_config_version(collection, key)?;
192 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193 self.audit_config_resolve(
194 collection,
195 key,
196 None,
197 crate::runtime::audit_log::Outcome::Denied,
198 &reason,
199 );
200 return Err(RedDBError::Query(reason));
201 }
202
203 let Some(version) = latest else {
204 let reason = "not_found";
205 self.audit_config_resolve(
206 collection,
207 key,
208 None,
209 crate::runtime::audit_log::Outcome::Denied,
210 reason,
211 );
212 return Err(RedDBError::NotFound(format!(
213 "config '{}.{}' not found",
214 collection, key
215 )));
216 };
217 if version.tombstone {
218 let reason = "deleted";
219 self.audit_config_resolve(
220 collection,
221 key,
222 None,
223 crate::runtime::audit_log::Outcome::Denied,
224 reason,
225 );
226 return Err(RedDBError::NotFound(format!(
227 "config '{}.{}' is deleted",
228 collection, key
229 )));
230 }
231
232 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233 self.audit_config_resolve(
234 collection,
235 key,
236 None,
237 crate::runtime::audit_log::Outcome::Error,
238 &err.to_string(),
239 );
240 })?;
241
242 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243 Ok(value) => {
244 self.audit_config_resolve(
245 collection,
246 key,
247 Some(&secret_ref),
248 crate::runtime::audit_log::Outcome::Success,
249 "ok",
250 );
251 let mut result = UnifiedResult::with_columns(vec![
252 "collection".into(),
253 "key".into(),
254 "value".into(),
255 "resolved_store".into(),
256 "resolved_collection".into(),
257 "resolved_key".into(),
258 ]);
259 let mut record = UnifiedRecord::new();
260 record.set("collection", Value::text(collection.to_string()));
261 record.set("key", Value::text(key.to_string()));
262 record.set("value", value);
263 record.set("resolved_store", Value::text("vault"));
264 record.set("resolved_collection", Value::text(secret_ref.collection));
265 record.set("resolved_key", Value::text(secret_ref.key));
266 result.push(record);
267 Ok(RuntimeQueryResult {
268 query: raw_query.to_string(),
269 mode: crate::storage::query::modes::QueryMode::Sql,
270 statement: "config_resolve",
271 engine: "config",
272 result,
273 affected_rows: 0,
274 statement_type: "select",
275 })
276 }
277 Err(err) => {
278 let reason = err.to_string();
279 let outcome = if reason.contains("denied") {
280 crate::runtime::audit_log::Outcome::Denied
281 } else {
282 crate::runtime::audit_log::Outcome::Error
283 };
284 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
285 Err(err)
286 }
287 }
288 }
289
290 fn config_write_result(
291 &self,
292 raw_query: &str,
293 collection: &str,
294 key: &str,
295 value: Value,
296 requested_type: Option<ConfigValueType>,
297 tags: &[String],
298 op: &str,
299 ) -> RedDBResult<RuntimeQueryResult> {
300 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
301 ConfigMutationAuthz::Allowed(evidence) => evidence,
302 ConfigMutationAuthz::Denied {
303 reason,
304 mut evidence,
305 } => {
306 evidence.payload = Some(value.clone());
307 let _ = self.emit_config_mutation_event(
308 crate::runtime::control_events::EventKind::ConfigWrite,
309 crate::runtime::control_events::Outcome::Denied,
310 "config:write",
311 collection,
312 key,
313 Some(reason.clone()),
314 &evidence,
315 );
316 return Err(RedDBError::Query(reason));
317 }
318 };
319 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
320 let _ = self.emit_config_mutation_event(
321 crate::runtime::control_events::EventKind::ConfigWrite,
322 crate::runtime::control_events::Outcome::Error,
323 "config:write",
324 collection,
325 key,
326 Some(err.to_string()),
327 &evidence,
328 );
329 return Err(err);
330 }
331 if is_enforcement_mode_config(collection, key) {
336 if let Err(err) = validate_enforcement_mode_value(&value) {
337 let _ = self.emit_config_mutation_event(
338 crate::runtime::control_events::EventKind::ConfigWrite,
339 crate::runtime::control_events::Outcome::Denied,
340 "config:write",
341 collection,
342 key,
343 Some(err.to_string()),
344 &evidence,
345 );
346 return Err(err);
347 }
348 }
349 if let Err(err) = self.ensure_config_collection(collection) {
350 let _ = self.emit_config_mutation_event(
351 crate::runtime::control_events::EventKind::ConfigWrite,
352 crate::runtime::control_events::Outcome::Error,
353 "config:write",
354 collection,
355 key,
356 Some(err.to_string()),
357 &evidence,
358 );
359 return Err(err);
360 }
361 let latest = match self.latest_config_version(collection, key) {
362 Ok(latest) => latest,
363 Err(err) => {
364 let _ = self.emit_config_mutation_event(
365 crate::runtime::control_events::EventKind::ConfigWrite,
366 crate::runtime::control_events::Outcome::Error,
367 "config:write",
368 collection,
369 key,
370 Some(err.to_string()),
371 &evidence,
372 );
373 return Err(err);
374 }
375 };
376 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
377 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
378 if let Some(value_type) = value_type {
379 if let Err(err) = validate_config_value_type(&value, value_type) {
380 let _ = self.emit_config_mutation_event(
381 crate::runtime::control_events::EventKind::ConfigWrite,
382 crate::runtime::control_events::Outcome::Error,
383 "config:write",
384 collection,
385 key,
386 Some(err.to_string()),
387 &evidence,
388 );
389 return Err(err);
390 }
391 }
392 evidence.payload = Some(value.clone());
393 let before = latest.as_ref().and_then(|version| {
394 if version.tombstone {
395 None
396 } else {
397 Some(crate::presentation::entity_json::storage_value_to_json(
398 &version.value,
399 ))
400 }
401 });
402 let after = Some(crate::presentation::entity_json::storage_value_to_json(
403 &value,
404 ));
405 let change_op = if latest.is_some() {
406 crate::replication::cdc::ChangeOperation::Update
407 } else {
408 crate::replication::cdc::ChangeOperation::Insert
409 };
410 let id = match self.append_config_version(
411 collection,
412 key,
413 value,
414 version,
415 false,
416 op,
417 value_type,
418 schema_version,
419 tags,
420 ) {
421 Ok(id) => id,
422 Err(err) => {
423 let _ = self.emit_config_mutation_event(
424 crate::runtime::control_events::EventKind::ConfigWrite,
425 crate::runtime::control_events::Outcome::Error,
426 "config:write",
427 collection,
428 key,
429 Some(err.to_string()),
430 &evidence,
431 );
432 return Err(err);
433 }
434 };
435 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
436 if let Err(err) = self.prune_config_history(collection, key) {
437 let _ = self.emit_config_mutation_event(
438 crate::runtime::control_events::EventKind::ConfigWrite,
439 crate::runtime::control_events::Outcome::Error,
440 "config:write",
441 collection,
442 key,
443 Some(err.to_string()),
444 &evidence,
445 );
446 return Err(err);
447 }
448 self.invalidate_result_cache();
449 if let Err(err) = self.emit_config_mutation_event(
450 crate::runtime::control_events::EventKind::ConfigWrite,
451 crate::runtime::control_events::Outcome::Allowed,
452 "config:write",
453 collection,
454 key,
455 None,
456 &evidence,
457 ) {
458 let _ = self.inner.db.store().delete(collection, id);
459 self.invalidate_result_cache();
460 return Err(err);
461 }
462 if is_enforcement_mode_config(collection, key) {
466 if let Some(auth_store) = self.inner.auth_store.read().clone() {
467 if let Value::Text(text) =
468 &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
469 {
470 if let Some(mode) =
471 crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
472 {
473 auth_store.set_enforcement_mode(mode);
474 }
475 }
476 }
477 }
478 Ok(config_write_output(
479 raw_query,
480 collection,
481 key,
482 version,
483 id,
484 value_type,
485 schema_version,
486 tags,
487 match op {
488 "rotate" => "config_rotate",
489 _ => "config_put",
490 },
491 1,
492 ))
493 }
494
495 fn config_delete_result(
496 &self,
497 raw_query: &str,
498 collection: &str,
499 key: &str,
500 ) -> RedDBResult<RuntimeQueryResult> {
501 let mut evidence = match self.authorize_config_write_for_event(collection, key) {
502 ConfigMutationAuthz::Allowed(evidence) => evidence,
503 ConfigMutationAuthz::Denied { reason, evidence } => {
504 let _ = self.emit_config_mutation_event(
505 crate::runtime::control_events::EventKind::ConfigDelete,
506 crate::runtime::control_events::Outcome::Denied,
507 "config:delete",
508 collection,
509 key,
510 Some(reason.clone()),
511 &evidence,
512 );
513 return Err(RedDBError::Query(reason));
514 }
515 };
516 if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
517 let _ = self.emit_config_mutation_event(
518 crate::runtime::control_events::EventKind::ConfigDelete,
519 crate::runtime::control_events::Outcome::Error,
520 "config:delete",
521 collection,
522 key,
523 Some(err.to_string()),
524 &evidence,
525 );
526 return Err(err);
527 }
528 if let Err(err) = self.ensure_config_collection(collection) {
529 let _ = self.emit_config_mutation_event(
530 crate::runtime::control_events::EventKind::ConfigDelete,
531 crate::runtime::control_events::Outcome::Error,
532 "config:delete",
533 collection,
534 key,
535 Some(err.to_string()),
536 &evidence,
537 );
538 return Err(err);
539 }
540 let latest = match self.latest_config_version(collection, key) {
541 Ok(latest) => latest,
542 Err(err) => {
543 let _ = self.emit_config_mutation_event(
544 crate::runtime::control_events::EventKind::ConfigDelete,
545 crate::runtime::control_events::Outcome::Error,
546 "config:delete",
547 collection,
548 key,
549 Some(err.to_string()),
550 &evidence,
551 );
552 return Err(err);
553 }
554 };
555 evidence.payload = latest.as_ref().map(|version| version.value.clone());
556 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
557 let value_type = latest.as_ref().and_then(|version| version.value_type);
558 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
559 let id = match self.append_config_version(
560 collection,
561 key,
562 Value::Null,
563 version,
564 true,
565 "delete",
566 value_type,
567 schema_version,
568 &[],
569 ) {
570 Ok(id) => id,
571 Err(err) => {
572 let _ = self.emit_config_mutation_event(
573 crate::runtime::control_events::EventKind::ConfigDelete,
574 crate::runtime::control_events::Outcome::Error,
575 "config:delete",
576 collection,
577 key,
578 Some(err.to_string()),
579 &evidence,
580 );
581 return Err(err);
582 }
583 };
584 if let Some(before) = latest.as_ref().and_then(|version| {
585 if version.tombstone {
586 None
587 } else {
588 Some(crate::presentation::entity_json::storage_value_to_json(
589 &version.value,
590 ))
591 }
592 }) {
593 self.record_kv_watch_event(
594 crate::replication::cdc::ChangeOperation::Delete,
595 collection,
596 key,
597 id.raw(),
598 Some(before),
599 None,
600 );
601 }
602 if let Err(err) = self.prune_config_history(collection, key) {
603 let _ = self.emit_config_mutation_event(
604 crate::runtime::control_events::EventKind::ConfigDelete,
605 crate::runtime::control_events::Outcome::Error,
606 "config:delete",
607 collection,
608 key,
609 Some(err.to_string()),
610 &evidence,
611 );
612 return Err(err);
613 }
614 self.invalidate_result_cache();
615 if let Err(err) = self.emit_config_mutation_event(
616 crate::runtime::control_events::EventKind::ConfigDelete,
617 crate::runtime::control_events::Outcome::Allowed,
618 "config:delete",
619 collection,
620 key,
621 None,
622 &evidence,
623 ) {
624 let _ = self.inner.db.store().delete(collection, id);
625 self.invalidate_result_cache();
626 return Err(err);
627 }
628 Ok(config_write_output(
629 raw_query,
630 collection,
631 key,
632 version,
633 id,
634 value_type,
635 schema_version,
636 &[],
637 "delete",
638 1,
639 ))
640 }
641
642 fn config_get_result(
643 &self,
644 raw_query: &str,
645 collection: &str,
646 key: &str,
647 ) -> RedDBResult<RuntimeQueryResult> {
648 self.check_system_config_capability("config:read", collection, key)
649 .map_err(RedDBError::Query)?;
650 let latest = self.latest_config_version(collection, key)?;
651 let mut result = UnifiedResult::with_columns(vec![
652 "collection".into(),
653 "key".into(),
654 "value".into(),
655 "version".into(),
656 "value_type".into(),
657 "schema_version".into(),
658 "tags".into(),
659 "tombstone".into(),
660 ]);
661 let mut record = UnifiedRecord::new();
662 record.set("collection", Value::text(collection.to_string()));
663 record.set("key", Value::text(key.to_string()));
664 if let Some(version) = latest {
665 record.set("value", version.value);
666 record.set("version", Value::Integer(version.version));
667 record.set("value_type", config_value_type_value(version.value_type));
668 record.set(
669 "schema_version",
670 version
671 .schema_version
672 .map(Value::Integer)
673 .unwrap_or(Value::Null),
674 );
675 record.set("tags", config_tags_value(&version.tags));
676 record.set("tombstone", Value::Boolean(version.tombstone));
677 } else {
678 record.set("value", Value::Null);
679 record.set("version", Value::Null);
680 record.set("value_type", Value::Null);
681 record.set("schema_version", Value::Null);
682 record.set("tags", Value::Null);
683 record.set("tombstone", Value::Boolean(false));
684 }
685 result.push(record);
686 Ok(RuntimeQueryResult {
687 query: raw_query.to_string(),
688 mode: crate::storage::query::modes::QueryMode::Sql,
689 statement: "config_get",
690 engine: "config",
691 result,
692 affected_rows: 0,
693 statement_type: "select",
694 })
695 }
696
697 fn config_history_result(
698 &self,
699 raw_query: &str,
700 collection: &str,
701 key: &str,
702 ) -> RedDBResult<RuntimeQueryResult> {
703 self.check_system_config_capability("config:read", collection, key)
704 .map_err(RedDBError::Query)?;
705 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
706 let mut result = UnifiedResult::with_columns(vec![
707 "collection".into(),
708 "key".into(),
709 "version".into(),
710 "value".into(),
711 "value_type".into(),
712 "schema_version".into(),
713 "tags".into(),
714 "tombstone".into(),
715 "op".into(),
716 "created_at_ms".into(),
717 ]);
718 for version in versions {
719 let mut record = UnifiedRecord::new();
720 record.set("collection", Value::text(collection.to_string()));
721 record.set("key", Value::text(key.to_string()));
722 record.set("version", Value::Integer(version.version));
723 record.set("value", version.value);
724 record.set("value_type", config_value_type_value(version.value_type));
725 record.set(
726 "schema_version",
727 version
728 .schema_version
729 .map(Value::Integer)
730 .unwrap_or(Value::Null),
731 );
732 record.set("tags", Value::Null);
733 record.set("tombstone", Value::Boolean(version.tombstone));
734 record.set("op", Value::text(version.op));
735 record.set("created_at_ms", Value::Integer(version.created_at_ms));
736 result.push(record);
737 }
738 Ok(RuntimeQueryResult {
739 query: raw_query.to_string(),
740 mode: crate::storage::query::modes::QueryMode::Sql,
741 statement: "config_history",
742 engine: "config",
743 result,
744 affected_rows: 0,
745 statement_type: "select",
746 })
747 }
748
749 fn config_list_result(
750 &self,
751 raw_query: &str,
752 collection: &str,
753 prefix: Option<&str>,
754 limit: Option<usize>,
755 offset: usize,
756 ) -> RedDBResult<RuntimeQueryResult> {
757 let mut versions = self.latest_config_versions(collection, prefix)?;
758 versions.sort_by(|left, right| left.key.cmp(&right.key));
759 let mut result = UnifiedResult::with_columns(vec![
760 "collection".into(),
761 "key".into(),
762 "value".into(),
763 "version".into(),
764 "value_type".into(),
765 "schema_version".into(),
766 "tags".into(),
767 "tombstone".into(),
768 "op".into(),
769 "created_at_ms".into(),
770 ]);
771 for version in versions
772 .into_iter()
773 .filter(|version| {
774 self.check_config_capability("config:read", collection, &version.key)
775 .is_ok()
776 })
777 .skip(offset)
778 .take(limit.unwrap_or(usize::MAX))
779 {
780 let mut record = UnifiedRecord::new();
781 record.set("collection", Value::text(collection.to_string()));
782 record.set("key", Value::text(version.key));
783 record.set("value", version.value);
784 record.set("version", Value::Integer(version.version));
785 record.set("value_type", config_value_type_value(version.value_type));
786 record.set(
787 "schema_version",
788 version
789 .schema_version
790 .map(Value::Integer)
791 .unwrap_or(Value::Null),
792 );
793 record.set("tags", config_tags_value(&version.tags));
794 record.set("tombstone", Value::Boolean(version.tombstone));
795 record.set("op", Value::text(version.op));
796 record.set("created_at_ms", Value::Integer(version.created_at_ms));
797 result.push(record);
798 }
799 Ok(RuntimeQueryResult {
800 query: raw_query.to_string(),
801 mode: crate::storage::query::modes::QueryMode::Sql,
802 statement: "config_list",
803 engine: "config",
804 result,
805 affected_rows: 0,
806 statement_type: "select",
807 })
808 }
809
810 fn config_watch_result(
811 &self,
812 raw_query: &str,
813 collection: &str,
814 key: &str,
815 prefix: bool,
816 from_lsn: Option<u64>,
817 ) -> RedDBResult<RuntimeQueryResult> {
818 let watch_key = if prefix {
819 format!("{key}.*")
820 } else {
821 key.to_string()
822 };
823 let endpoint = match from_lsn {
824 Some(lsn) => {
825 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
826 }
827 None => format!("/collections/{collection}/config/{watch_key}/watch"),
828 };
829 let mut result = UnifiedResult::with_columns(vec![
830 "collection".into(),
831 "key".into(),
832 "prefix".into(),
833 "from_lsn".into(),
834 "watch_url".into(),
835 "streaming".into(),
836 ]);
837 let mut record = UnifiedRecord::new();
838 record.set("collection", Value::text(collection.to_string()));
839 record.set("key", Value::text(watch_key));
840 record.set("prefix", Value::Boolean(prefix));
841 record.set(
842 "from_lsn",
843 from_lsn
844 .map(Value::UnsignedInteger)
845 .unwrap_or(crate::storage::schema::Value::Null),
846 );
847 record.set("watch_url", Value::text(endpoint));
848 record.set("streaming", Value::Boolean(true));
849 result.push(record);
850 Ok(RuntimeQueryResult {
851 query: raw_query.to_string(),
852 mode: crate::storage::query::modes::QueryMode::Sql,
853 statement: "config_watch",
854 engine: "config",
855 result,
856 affected_rows: 0,
857 statement_type: "stream",
858 })
859 }
860
861 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
862 let store = self.inner.db.store();
863 if store.get_collection(collection).is_none() {
864 store
865 .create_collection(collection)
866 .map_err(|err| RedDBError::Internal(err.to_string()))?;
867 }
868 if let Some(contract) = self.inner.db.collection_contract(collection) {
869 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
870 CollectionModel::Config,
871 contract.declared_model,
872 )?;
873 return Ok(());
874 }
875 let now = current_unix_ms();
876 self.inner
877 .db
878 .save_collection_contract(CollectionContract {
879 name: collection.to_string(),
880 declared_model: CollectionModel::Config,
881 schema_mode: SchemaMode::Dynamic,
882 origin: ContractOrigin::Explicit,
883 version: 1,
884 created_at_unix_ms: now as u128,
885 updated_at_unix_ms: now as u128,
886 default_ttl_ms: None,
887 vector_dimension: None,
888 vector_metric: None,
889 context_index_fields: Vec::new(),
890 declared_columns: Vec::new(),
891 table_def: None,
892 timestamps_enabled: false,
893 context_index_enabled: false,
894 metrics_raw_retention_ms: None,
895 metrics_rollup_policies: Vec::new(),
896 metrics_tenant_identity: None,
897 metrics_namespace: None,
898 append_only: false,
899 subscriptions: Vec::new(),
900 session_key: None,
901 session_gap_ms: None,
902 retention_duration_ms: None,
903 })
904 .map(|_| ())
905 .map_err(|err| RedDBError::Internal(err.to_string()))
906 }
907
908 fn append_config_version(
909 &self,
910 collection: &str,
911 key: &str,
912 value: Value,
913 version: i64,
914 tombstone: bool,
915 op: &str,
916 value_type: Option<ConfigValueType>,
917 schema_version: Option<i64>,
918 tags: &[String],
919 ) -> RedDBResult<EntityId> {
920 let now = current_unix_ms() as i64;
921 let fields = vec![
922 ("key".to_string(), Value::text(key.to_string())),
923 ("value".to_string(), value),
924 ("version".to_string(), Value::Integer(version)),
925 (
926 "value_type".to_string(),
927 config_value_type_value(value_type),
928 ),
929 (
930 "schema_version".to_string(),
931 schema_version.map(Value::Integer).unwrap_or(Value::Null),
932 ),
933 ("tombstone".to_string(), Value::Boolean(tombstone)),
934 ("op".to_string(), Value::text(op.to_string())),
935 ("created_at_ms".to_string(), Value::Integer(now)),
936 ("tags".to_string(), config_tags_value(tags)),
937 ];
938 let mut row = RowData::new(Vec::new());
939 row.named = Some(fields.into_iter().collect());
940 let entity = UnifiedEntity::new(
941 EntityId::new(0),
942 EntityKind::TableRow {
943 table: Arc::from(collection),
944 row_id: 0,
945 },
946 EntityData::Row(row),
947 );
948 self.inner
949 .db
950 .store()
951 .insert(collection, entity)
952 .map_err(|err| RedDBError::Internal(err.to_string()))
953 }
954
955 fn latest_config_version(
956 &self,
957 collection: &str,
958 key: &str,
959 ) -> RedDBResult<Option<ConfigVersion>> {
960 Ok(super::keyed_spine::latest_version(
961 self.config_versions(collection, key)?,
962 ))
963 }
964
965 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
966 let store = self.inner.db.store();
967 let Some(manager) = store.get_collection(collection) else {
968 return Ok(Vec::new());
969 };
970 let mut versions = Vec::new();
971 for entity in manager.query_all(|_| true) {
972 let EntityData::Row(row) = &entity.data else {
973 continue;
974 };
975 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
976 continue;
977 };
978 if version.key != key {
979 continue;
980 }
981 versions.push(ConfigVersion::from_keyed_row(version, row));
982 }
983 Ok(versions)
984 }
985
986 fn latest_config_versions(
987 &self,
988 collection: &str,
989 prefix: Option<&str>,
990 ) -> RedDBResult<Vec<ConfigVersion>> {
991 let store = self.inner.db.store();
992 let Some(manager) = store.get_collection(collection) else {
993 return Ok(Vec::new());
994 };
995 let mut versions = Vec::new();
996 for entity in manager.query_all(|_| true) {
997 let EntityData::Row(row) = &entity.data else {
998 continue;
999 };
1000 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1001 continue;
1002 };
1003 versions.push(ConfigVersion::from_keyed_row(version, row));
1004 }
1005 Ok(super::keyed_spine::latest_versions(versions, prefix))
1006 }
1007
1008 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1009 let mut versions = self.config_versions(collection, key)?;
1010 if versions.len() <= CONFIG_HISTORY_LIMIT {
1011 return Ok(());
1012 }
1013 versions = super::keyed_spine::history_versions(versions);
1014 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1015 let store = self.inner.db.store();
1016 for version in versions.into_iter().take(drop_count) {
1017 store
1018 .delete(collection, version.id)
1019 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1020 }
1021 Ok(())
1022 }
1023
1024 fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1025 let default_evidence = self.default_config_mutation_evidence(collection, key);
1026 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1027 return ConfigMutationAuthz::Allowed(default_evidence);
1028 };
1029 if !auth_store.iam_authorization_enabled() {
1030 return ConfigMutationAuthz::Allowed(default_evidence);
1031 }
1032 let Some((principal, role)) = current_auth_identity() else {
1033 return ConfigMutationAuthz::Denied {
1034 reason:
1035 "IAM authorization is enabled; config capability check requires an authenticated principal"
1036 .to_string(),
1037 evidence: default_evidence,
1038 };
1039 };
1040 let tenant = current_tenant();
1041 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1042 let ctx = crate::auth::policies::EvalContext {
1043 principal_tenant: tenant.clone(),
1044 current_tenant: tenant.clone(),
1045 peer_ip: None,
1046 mfa_present: false,
1047 now_ms: crate::utils::now_unix_millis() as u128,
1048 principal_is_admin_role: role == crate::auth::Role::Admin,
1049 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1050 principal_is_platform_scoped: principal_id.tenant.is_none(),
1051 };
1052 let managed_key = if collection == "red.config" {
1053 format!("red.config.{key}")
1054 } else {
1055 key.to_string()
1056 };
1057 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1058 self.inner.config_registry.as_ref(),
1059 );
1060 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1061 crate::auth::managed_config::ManagedConfigDecision::Allow {
1062 entry_id,
1063 resource_type,
1064 managed,
1065 mutability,
1066 matched_action,
1067 matched_resource,
1068 ..
1069 } => {
1070 return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1071 id: entry_id,
1072 resource_type,
1073 managed,
1074 mutability,
1075 matched_action: Some(matched_action),
1076 matched_resource: Some(matched_resource),
1077 payload: None,
1078 });
1079 }
1080 crate::auth::managed_config::ManagedConfigDecision::Deny {
1081 entry_id,
1082 resource_type,
1083 managed,
1084 mutability,
1085 matched_action,
1086 matched_resource,
1087 reason,
1088 ..
1089 } => {
1090 return ConfigMutationAuthz::Denied {
1091 reason: format!(
1092 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1093 ),
1094 evidence: ConfigMutationEvidence {
1095 id: entry_id,
1096 resource_type,
1097 managed,
1098 mutability,
1099 matched_action: Some(matched_action),
1100 matched_resource: Some(matched_resource),
1101 payload: None,
1102 },
1103 };
1104 }
1105 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1106 }
1107
1108 let mut resource = crate::auth::policies::ResourceRef::new(
1109 "config",
1110 config_target_resource(collection, key),
1111 );
1112 if let Some(ref tenant) = tenant {
1113 resource = resource.with_tenant(tenant.clone());
1114 }
1115 if auth_store.check_policy_authz_with_role(
1116 &principal_id,
1117 "config:write",
1118 &resource,
1119 &ctx,
1120 role,
1121 ) {
1122 ConfigMutationAuthz::Allowed(default_evidence)
1123 } else {
1124 ConfigMutationAuthz::Denied {
1125 reason: format!(
1126 "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1127 principal,
1128 config_target_resource(collection, key)
1129 ),
1130 evidence: default_evidence,
1131 }
1132 }
1133 }
1134
1135 fn default_config_mutation_evidence(
1136 &self,
1137 collection: &str,
1138 key: &str,
1139 ) -> ConfigMutationEvidence {
1140 let id = if collection == "red.config" {
1141 format!("red.config.{key}")
1142 } else {
1143 key.to_string()
1144 };
1145 ConfigMutationEvidence {
1146 id,
1147 resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1148 managed: false,
1149 mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1150 matched_action: None,
1151 matched_resource: None,
1152 payload: None,
1153 }
1154 }
1155
1156 fn emit_config_mutation_event(
1157 &self,
1158 kind: crate::runtime::control_events::EventKind,
1159 outcome: crate::runtime::control_events::Outcome,
1160 action: &'static str,
1161 collection: &str,
1162 key: &str,
1163 reason: Option<String>,
1164 evidence: &ConfigMutationEvidence,
1165 ) -> RedDBResult<()> {
1166 use crate::runtime::control_events::{
1167 ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1168 };
1169
1170 let tenant = current_tenant();
1171 let principal = current_auth_identity();
1172 let actor_user = principal
1173 .as_ref()
1174 .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1175 let actor = actor_user
1176 .as_ref()
1177 .map(ActorRef::User)
1178 .unwrap_or(ActorRef::Anonymous);
1179 let ctx = ControlEventCtx {
1180 actor,
1181 scope: tenant
1182 .as_ref()
1183 .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1184 request_id: Some(std::borrow::Cow::Owned(format!(
1185 "conn-{}",
1186 current_connection_id()
1187 ))),
1188 trace_id: None,
1189 };
1190
1191 let mut fields = HashMap::new();
1192 fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1193 fields.insert(
1194 "resource_type".to_string(),
1195 Sensitivity::raw(evidence.resource_type.clone()),
1196 );
1197 fields.insert(
1198 "managed".to_string(),
1199 Sensitivity::raw(evidence.managed.to_string()),
1200 );
1201 fields.insert(
1202 "mutability".to_string(),
1203 Sensitivity::raw(config_mutability_label(evidence.mutability)),
1204 );
1205 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1206 fields.insert("key".to_string(), Sensitivity::raw(key));
1207 fields.insert(
1208 "connection_id".to_string(),
1209 Sensitivity::raw(current_connection_id().to_string()),
1210 );
1211 if let Some((_, role)) = principal {
1212 fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1213 }
1214 if let Some(matched_action) = &evidence.matched_action {
1215 fields.insert(
1216 "matched_action".to_string(),
1217 Sensitivity::raw(matched_action.clone()),
1218 );
1219 }
1220 if let Some(matched_resource) = &evidence.matched_resource {
1221 fields.insert(
1222 "matched_resource".to_string(),
1223 Sensitivity::raw(matched_resource.clone()),
1224 );
1225 }
1226 if let Some(payload) = &evidence.payload {
1227 fields.insert(
1228 "payload".to_string(),
1229 config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1230 );
1231 }
1232
1233 let event = ControlEvent {
1234 kind,
1235 outcome,
1236 action: std::borrow::Cow::Borrowed(action),
1237 resource: Some(format!(
1238 "config:{}",
1239 config_target_resource(collection, key)
1240 )),
1241 reason,
1242 matched_policy_id: None,
1243 fields,
1244 };
1245 let ledger = self.inner.control_event_ledger.read();
1246 match ledger.emit(&ctx, event) {
1247 Ok(_) => Ok(()),
1248 Err(err) if self.inner.control_event_config.require_persistence() => {
1249 Err(RedDBError::Internal(err.to_string()))
1250 }
1251 Err(_) => Ok(()),
1252 }
1253 }
1254
1255 fn check_config_capability(
1256 &self,
1257 action: &str,
1258 collection: &str,
1259 key: &str,
1260 ) -> Result<(), String> {
1261 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1262 return Ok(());
1263 };
1264 if !auth_store.iam_authorization_enabled() {
1265 return Ok(());
1266 }
1267 let Some((principal, role)) = current_auth_identity() else {
1268 return Err(
1269 "IAM authorization is enabled; config capability check requires an authenticated principal"
1270 .to_string(),
1271 );
1272 };
1273 let tenant = current_tenant();
1274 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1275 let mut resource = crate::auth::policies::ResourceRef::new(
1276 "config",
1277 config_target_resource(collection, key),
1278 );
1279 if let Some(ref tenant) = tenant {
1280 resource = resource.with_tenant(tenant.clone());
1281 }
1282 let ctx = crate::auth::policies::EvalContext {
1283 principal_tenant: tenant.clone(),
1284 current_tenant: tenant,
1285 peer_ip: None,
1286 mfa_present: false,
1287 now_ms: crate::utils::now_unix_millis() as u128,
1288 principal_is_admin_role: role == crate::auth::Role::Admin,
1289 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1290 principal_is_platform_scoped: principal_id.tenant.is_none(),
1291 };
1292 if action == "config:write" {
1293 let managed_key = if collection == "red.config" {
1294 format!("red.config.{key}")
1295 } else {
1296 key.to_string()
1297 };
1298 let gate = crate::auth::managed_config::ManagedConfigGate::new(
1299 self.inner.config_registry.as_ref(),
1300 );
1301 match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1302 crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1303 crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1304 crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1305 return Err(format!(
1306 "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1307 ));
1308 }
1309 }
1310 }
1311 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1312 Ok(())
1313 } else {
1314 Err(format!(
1315 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1316 principal,
1317 action,
1318 config_target_resource(collection, key)
1319 ))
1320 }
1321 }
1322
1323 fn check_system_config_capability(
1324 &self,
1325 action: &str,
1326 collection: &str,
1327 key: &str,
1328 ) -> Result<(), String> {
1329 if collection != "red.config" {
1330 return Ok(());
1331 }
1332 self.check_config_capability(action, collection, key)
1333 }
1334
1335 pub fn config_watch_events_since(
1336 &self,
1337 collection: &str,
1338 key: &str,
1339 since_lsn: u64,
1340 max_count: usize,
1341 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1342 self.kv_watch_events_since(collection, key, since_lsn, max_count)
1343 .into_iter()
1344 .map(|event| self.policy_filter_config_watch_event(event))
1345 .collect()
1346 }
1347
1348 pub fn config_watch_events_since_prefix(
1349 &self,
1350 collection: &str,
1351 prefix: &str,
1352 since_lsn: u64,
1353 max_count: usize,
1354 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1355 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1356 .into_iter()
1357 .map(|event| self.policy_filter_config_watch_event(event))
1358 .collect()
1359 }
1360
1361 fn policy_filter_config_watch_event(
1362 &self,
1363 mut event: crate::replication::cdc::KvWatchEvent,
1364 ) -> crate::replication::cdc::KvWatchEvent {
1365 if self
1366 .check_config_capability("config:read", &event.collection, &event.key)
1367 .is_err()
1368 {
1369 event.before = None;
1370 event.after = None;
1371 }
1372 event
1373 }
1374
1375 fn audit_config_resolve(
1376 &self,
1377 collection: &str,
1378 key: &str,
1379 secret_ref: Option<&ConfigSecretRef>,
1380 outcome: crate::runtime::audit_log::Outcome,
1381 reason: &str,
1382 ) {
1383 let actor = current_auth_identity()
1384 .map(|(principal, _)| principal)
1385 .unwrap_or_else(|| "anonymous".to_string());
1386 let request_id = match current_connection_id() {
1387 0 => "embedded".to_string(),
1388 id => format!("conn-{id}"),
1389 };
1390 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1391 .principal(actor.clone())
1392 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1393 .resource(format!(
1394 "config:{}",
1395 config_target_resource(collection, key)
1396 ))
1397 .outcome(outcome)
1398 .correlation_id(request_id.clone())
1399 .fields([
1400 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1401 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1402 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1403 crate::runtime::audit_log::AuditFieldEscaper::field(
1404 "target",
1405 config_target_resource(collection, key),
1406 ),
1407 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1408 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1409 crate::runtime::audit_log::AuditFieldEscaper::field(
1410 "connection_id",
1411 current_connection_id(),
1412 ),
1413 ]);
1414 if let Some(tenant) = current_tenant() {
1415 builder = builder.tenant(tenant);
1416 }
1417 if let Some(secret_ref) = secret_ref {
1418 builder = builder.fields([
1419 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1420 crate::runtime::audit_log::AuditFieldEscaper::field(
1421 "resolved_collection",
1422 secret_ref.collection.as_str(),
1423 ),
1424 crate::runtime::audit_log::AuditFieldEscaper::field(
1425 "resolved_key",
1426 secret_ref.key.as_str(),
1427 ),
1428 crate::runtime::audit_log::AuditFieldEscaper::field(
1429 "resolved_target",
1430 format!("{}.{}", secret_ref.collection, secret_ref.key),
1431 ),
1432 ]);
1433 }
1434 self.audit_log().record_event(builder.build());
1435 }
1436}
1437
1438fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1439 let Value::Json(bytes) = value else {
1440 return Err(RedDBError::InvalidConfig(
1441 "CONFIG value is not a SecretRef".to_string(),
1442 ));
1443 };
1444 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1445 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1446 })?;
1447 let Some(object) = json.as_object() else {
1448 return Err(RedDBError::InvalidConfig(
1449 "CONFIG SecretRef must be an object".to_string(),
1450 ));
1451 };
1452 let get_str = |field: &str| -> RedDBResult<&str> {
1453 object
1454 .get(field)
1455 .and_then(|value| value.as_str())
1456 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1457 };
1458 if get_str("type")? != "secret_ref" {
1459 return Err(RedDBError::InvalidConfig(
1460 "CONFIG value is not a SecretRef".to_string(),
1461 ));
1462 }
1463 if get_str("store")? != "vault" {
1464 return Err(RedDBError::InvalidConfig(
1465 "CONFIG SecretRef store is unsupported".to_string(),
1466 ));
1467 }
1468 Ok(ConfigSecretRef {
1469 collection: get_str("collection")?.to_string(),
1470 key: get_str("key")?.to_string(),
1471 })
1472}
1473
1474fn config_target_resource(collection: &str, key: &str) -> String {
1475 if collection == "red.config" {
1476 format!("red.config/{}", key.to_ascii_lowercase())
1477 } else {
1478 format!("{collection}.{key}")
1479 }
1480}
1481
1482fn config_write_output(
1483 raw_query: &str,
1484 collection: &str,
1485 key: &str,
1486 version: i64,
1487 id: EntityId,
1488 value_type: Option<ConfigValueType>,
1489 schema_version: Option<i64>,
1490 tags: &[String],
1491 statement: &'static str,
1492 affected_rows: u64,
1493) -> RuntimeQueryResult {
1494 let mut result = UnifiedResult::with_columns(vec![
1495 "ok".into(),
1496 "collection".into(),
1497 "key".into(),
1498 "version".into(),
1499 "value_type".into(),
1500 "schema_version".into(),
1501 "tags".into(),
1502 "id".into(),
1503 ]);
1504 let mut record = UnifiedRecord::new();
1505 record.set("ok", Value::Boolean(true));
1506 record.set("collection", Value::text(collection.to_string()));
1507 record.set("key", Value::text(key.to_string()));
1508 record.set("version", Value::Integer(version));
1509 record.set("value_type", config_value_type_value(value_type));
1510 record.set(
1511 "schema_version",
1512 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1513 );
1514 record.set("tags", config_tags_value(tags));
1515 record.set("id", Value::Integer(id.raw() as i64));
1516 result.push(record);
1517 RuntimeQueryResult {
1518 query: raw_query.to_string(),
1519 mode: crate::storage::query::modes::QueryMode::Sql,
1520 statement,
1521 engine: "config",
1522 result,
1523 affected_rows,
1524 statement_type: if statement == "delete" {
1525 "delete"
1526 } else {
1527 "update"
1528 },
1529 }
1530}
1531
1532fn invalid_config_volatility(operation: &str) -> RedDBError {
1533 RedDBError::InvalidOperation(format!(
1534 "CONFIG does not support KV-only volatility operation {operation}"
1535 ))
1536}
1537
1538fn resolve_config_schema(
1539 latest: Option<&ConfigVersion>,
1540 requested_type: Option<ConfigValueType>,
1541) -> (Option<ConfigValueType>, Option<i64>) {
1542 let previous_type = latest.and_then(|version| version.value_type);
1543 let previous_schema_version = latest.and_then(|version| version.schema_version);
1544 match requested_type {
1545 Some(value_type) if Some(value_type) != previous_type => (
1546 Some(value_type),
1547 Some(previous_schema_version.unwrap_or(0) + 1),
1548 ),
1549 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1550 None => (previous_type, previous_schema_version),
1551 }
1552}
1553
1554fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1555 let valid = match value_type {
1556 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1557 ConfigValueType::Int => matches!(
1558 value,
1559 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1560 ),
1561 ConfigValueType::String => matches!(value, Value::Text(_)),
1562 ConfigValueType::Url => validate_config_url(value),
1563 ConfigValueType::Object => validate_config_json_shape(value, true),
1564 ConfigValueType::Array => {
1565 matches!(value, Value::Array(_) | Value::Vector(_))
1566 || validate_config_json_shape(value, false)
1567 }
1568 };
1569 if valid {
1570 Ok(())
1571 } else {
1572 Err(RedDBError::InvalidConfig(format!(
1573 "CONFIG value type mismatch: expected {}, got {}",
1574 value_type.as_str(),
1575 config_actual_value_type(value),
1576 )))
1577 }
1578}
1579
1580fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1585 collection == "red.config" && key == "policy.enforcement_mode"
1586}
1587
1588fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1592 let text = match value {
1593 Value::Text(text) => text.as_ref(),
1594 _ => {
1595 return Err(RedDBError::InvalidConfig(format!(
1596 "config key `{}` must be a string ({} or {}); got {}",
1597 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1598 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1599 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1600 config_actual_value_type(value),
1601 )));
1602 }
1603 };
1604 if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1605 Ok(())
1606 } else {
1607 Err(RedDBError::InvalidConfig(format!(
1608 "config key `{}` accepts only `{}` or `{}`, got `{}`",
1609 crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1610 crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1611 crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1612 text,
1613 )))
1614 }
1615}
1616
1617fn validate_config_url(value: &Value) -> bool {
1618 let url = match value {
1619 Value::Url(value) => value.as_str(),
1620 Value::Text(value) => value.as_ref(),
1621 _ => return false,
1622 };
1623 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1624}
1625
1626fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1627 let Value::Json(bytes) = value else {
1628 return false;
1629 };
1630 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1631 return false;
1632 };
1633 matches!(
1634 (object, json),
1635 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1636 )
1637}
1638
1639fn config_actual_value_type(value: &Value) -> &'static str {
1640 match value {
1641 Value::Null => "null",
1642 Value::Boolean(_) => "bool",
1643 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1644 Value::Text(_) => "string",
1645 Value::Url(_) => "url",
1646 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1647 Ok(crate::json::Value::Object(_)) => "object",
1648 Ok(crate::json::Value::Array(_)) => "array",
1649 _ => "json",
1650 },
1651 Value::Array(_) | Value::Vector(_) => "array",
1652 _ => "other",
1653 }
1654}
1655
1656fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1657 value_type
1658 .map(|value_type| Value::text(value_type.as_str()))
1659 .unwrap_or(Value::Null)
1660}
1661
1662fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1663 match value {
1664 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1665 _ => None,
1666 }
1667}
1668
1669fn config_tags_value(tags: &[String]) -> Value {
1670 if tags.is_empty() {
1671 return Value::Null;
1672 }
1673 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1674}
1675
1676fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1677 match value {
1678 Some(Value::Array(values)) => values
1679 .iter()
1680 .filter_map(|value| match value {
1681 Value::Text(tag) => Some(tag.to_string()),
1682 _ => None,
1683 })
1684 .collect(),
1685 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1686 .ok()
1687 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1688 .map(|values| {
1689 values
1690 .into_iter()
1691 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1692 .collect()
1693 })
1694 .unwrap_or_default(),
1695 _ => Vec::new(),
1696 }
1697}
1698
1699fn config_payload_sensitivity(
1700 resource_type: &str,
1701 field: &str,
1702 value: &Value,
1703) -> crate::runtime::control_events::Sensitivity {
1704 let payload = config_payload_bytes(value);
1705 if config_payload_raw_allowed(resource_type, field) {
1706 crate::runtime::control_events::Sensitivity::raw(
1707 String::from_utf8_lossy(&payload).into_owned(),
1708 )
1709 } else {
1710 crate::runtime::control_events::Sensitivity::hashed(&payload)
1711 }
1712}
1713
1714fn config_payload_bytes(value: &Value) -> Vec<u8> {
1715 let json = crate::presentation::entity_json::storage_value_to_json(value);
1716 crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1717}
1718
1719fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1720 const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1721 RAW_PAYLOAD_FIELDS
1722 .iter()
1723 .any(|(allowed_type, allowed_field)| {
1724 *allowed_type == resource_type && *allowed_field == field
1725 })
1726}
1727
1728fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1729 match mutability {
1730 crate::auth::registry::Mutability::Immutable => "immutable",
1731 crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1732 }
1733}
1734
1735fn current_unix_ms() -> u64 {
1736 std::time::SystemTime::now()
1737 .duration_since(std::time::UNIX_EPOCH)
1738 .unwrap_or_default()
1739 .as_millis() as u64
1740}