Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18    id: EntityId,
19    key: String,
20    version: i64,
21    value: Value,
22    tombstone: bool,
23    created_at_ms: i64,
24    op: String,
25    value_type: Option<ConfigValueType>,
26    schema_version: Option<i64>,
27    tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31    fn key(&self) -> &str {
32        &self.key
33    }
34
35    fn version(&self) -> i64 {
36        self.version
37    }
38}
39
40impl ConfigVersion {
41    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42        Self {
43            id: version.id,
44            key: version.key,
45            version: version.version,
46            value: version.value,
47            tombstone: version.tombstone,
48            created_at_ms: version.created_at_ms,
49            op: version.op,
50            value_type: row
51                .get_field("value_type")
52                .and_then(config_value_type_from_value),
53            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54            tags: config_tags_from_value(row.get_field("tags")),
55        }
56    }
57}
58
59struct ConfigSecretRef {
60    collection: String,
61    key: String,
62}
63
64struct ConfigMutationEvidence {
65    id: String,
66    resource_type: String,
67    managed: bool,
68    mutability: crate::auth::registry::Mutability,
69    matched_action: Option<String>,
70    matched_resource: Option<String>,
71    payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75    Allowed(ConfigMutationEvidence),
76    Denied {
77        reason: String,
78        evidence: ConfigMutationEvidence,
79    },
80}
81
82impl RedDBRuntime {
83    pub fn execute_config_command(
84        &self,
85        raw_query: &str,
86        cmd: &crate::storage::query::ast::ConfigCommand,
87    ) -> RedDBResult<RuntimeQueryResult> {
88        use crate::storage::query::ast::ConfigCommand;
89
90        match cmd {
91            ConfigCommand::Put {
92                collection,
93                key,
94                value,
95                value_type,
96                tags,
97            } => self.config_write_result(
98                raw_query,
99                collection,
100                key,
101                value.clone(),
102                *value_type,
103                tags,
104                "put",
105            ),
106            ConfigCommand::Rotate {
107                collection,
108                key,
109                value,
110                value_type,
111                tags,
112            } => self.config_write_result(
113                raw_query,
114                collection,
115                key,
116                value.clone(),
117                *value_type,
118                tags,
119                "rotate",
120            ),
121            ConfigCommand::Get { collection, key } => {
122                self.config_get_result(raw_query, collection, key)
123            }
124            ConfigCommand::Resolve { collection, key } => {
125                self.config_resolve_result(raw_query, collection, key)
126            }
127            ConfigCommand::Delete { collection, key } => {
128                self.config_delete_result(raw_query, collection, key)
129            }
130            ConfigCommand::History { collection, key } => {
131                self.config_history_result(raw_query, collection, key)
132            }
133            ConfigCommand::List {
134                collection,
135                prefix,
136                limit,
137                offset,
138            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139            ConfigCommand::Watch {
140                collection,
141                key,
142                prefix,
143                from_lsn,
144            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146                Err(invalid_config_volatility(operation))
147            }
148        }
149    }
150
151    pub(crate) fn validate_config_command_before_auth(
152        &self,
153        cmd: &crate::storage::query::ast::ConfigCommand,
154    ) -> RedDBResult<()> {
155        use crate::storage::query::ast::ConfigCommand;
156        match cmd {
157            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158                Err(invalid_config_volatility(operation))
159            }
160            ConfigCommand::Put { collection, .. }
161            | ConfigCommand::Get { collection, .. }
162            | ConfigCommand::Resolve { collection, .. }
163            | ConfigCommand::Rotate { collection, .. }
164            | ConfigCommand::Delete { collection, .. }
165            | ConfigCommand::History { collection, .. }
166            | ConfigCommand::List { collection, .. }
167            | ConfigCommand::Watch { collection, .. } => {
168                let snapshot = self.inner.db.catalog_model_snapshot();
169                let Some(actual_model) = snapshot
170                    .collections
171                    .iter()
172                    .find(|c| c.name == *collection)
173                    .map(|c| c.declared_model.unwrap_or(c.model))
174                else {
175                    return Ok(());
176                };
177                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178                    CollectionModel::Config,
179                    actual_model,
180                )
181            }
182        }
183    }
184
185    fn config_resolve_result(
186        &self,
187        raw_query: &str,
188        collection: &str,
189        key: &str,
190    ) -> RedDBResult<RuntimeQueryResult> {
191        let latest = self.latest_config_version(collection, key)?;
192        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193            self.audit_config_resolve(
194                collection,
195                key,
196                None,
197                crate::runtime::audit_log::Outcome::Denied,
198                &reason,
199            );
200            return Err(RedDBError::Query(reason));
201        }
202
203        let Some(version) = latest else {
204            let reason = "not_found";
205            self.audit_config_resolve(
206                collection,
207                key,
208                None,
209                crate::runtime::audit_log::Outcome::Denied,
210                reason,
211            );
212            return Err(RedDBError::NotFound(format!(
213                "config '{}.{}' not found",
214                collection, key
215            )));
216        };
217        if version.tombstone {
218            let reason = "deleted";
219            self.audit_config_resolve(
220                collection,
221                key,
222                None,
223                crate::runtime::audit_log::Outcome::Denied,
224                reason,
225            );
226            return Err(RedDBError::NotFound(format!(
227                "config '{}.{}' is deleted",
228                collection, key
229            )));
230        }
231
232        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233            self.audit_config_resolve(
234                collection,
235                key,
236                None,
237                crate::runtime::audit_log::Outcome::Error,
238                &err.to_string(),
239            );
240        })?;
241
242        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243            Ok(value) => {
244                self.audit_config_resolve(
245                    collection,
246                    key,
247                    Some(&secret_ref),
248                    crate::runtime::audit_log::Outcome::Success,
249                    "ok",
250                );
251                let mut result = UnifiedResult::with_columns(vec![
252                    "collection".into(),
253                    "key".into(),
254                    "value".into(),
255                    "resolved_store".into(),
256                    "resolved_collection".into(),
257                    "resolved_key".into(),
258                ]);
259                let mut record = UnifiedRecord::new();
260                record.set("collection", Value::text(collection.to_string()));
261                record.set("key", Value::text(key.to_string()));
262                record.set("value", value);
263                record.set("resolved_store", Value::text("vault"));
264                record.set("resolved_collection", Value::text(secret_ref.collection));
265                record.set("resolved_key", Value::text(secret_ref.key));
266                result.push(record);
267                Ok(RuntimeQueryResult {
268                    query: raw_query.to_string(),
269                    mode: crate::storage::query::modes::QueryMode::Sql,
270                    statement: "config_resolve",
271                    engine: "config",
272                    result,
273                    affected_rows: 0,
274                    statement_type: "select",
275                })
276            }
277            Err(err) => {
278                let reason = err.to_string();
279                let outcome = if reason.contains("denied") {
280                    crate::runtime::audit_log::Outcome::Denied
281                } else {
282                    crate::runtime::audit_log::Outcome::Error
283                };
284                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
285                Err(err)
286            }
287        }
288    }
289
290    fn config_write_result(
291        &self,
292        raw_query: &str,
293        collection: &str,
294        key: &str,
295        value: Value,
296        requested_type: Option<ConfigValueType>,
297        tags: &[String],
298        op: &str,
299    ) -> RedDBResult<RuntimeQueryResult> {
300        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
301            ConfigMutationAuthz::Allowed(evidence) => evidence,
302            ConfigMutationAuthz::Denied {
303                reason,
304                mut evidence,
305            } => {
306                evidence.payload = Some(value.clone());
307                let _ = self.emit_config_mutation_event(
308                    crate::runtime::control_events::EventKind::ConfigWrite,
309                    crate::runtime::control_events::Outcome::Denied,
310                    "config:write",
311                    collection,
312                    key,
313                    Some(reason.clone()),
314                    &evidence,
315                );
316                return Err(RedDBError::Query(reason));
317            }
318        };
319        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
320            let _ = self.emit_config_mutation_event(
321                crate::runtime::control_events::EventKind::ConfigWrite,
322                crate::runtime::control_events::Outcome::Error,
323                "config:write",
324                collection,
325                key,
326                Some(err.to_string()),
327                &evidence,
328            );
329            return Err(err);
330        }
331        // #712 / S5A: reject invalid values for the enforcement-mode
332        // config key before any storage mutation. The allowlist lives
333        // on PolicyEnforcementMode::parse so it stays in lockstep with
334        // the evaluator's understanding of the modes.
335        if is_enforcement_mode_config(collection, key) {
336            if let Err(err) = validate_enforcement_mode_value(&value) {
337                let _ = self.emit_config_mutation_event(
338                    crate::runtime::control_events::EventKind::ConfigWrite,
339                    crate::runtime::control_events::Outcome::Denied,
340                    "config:write",
341                    collection,
342                    key,
343                    Some(err.to_string()),
344                    &evidence,
345                );
346                return Err(err);
347            }
348        }
349        if let Err(err) = self.ensure_config_collection(collection) {
350            let _ = self.emit_config_mutation_event(
351                crate::runtime::control_events::EventKind::ConfigWrite,
352                crate::runtime::control_events::Outcome::Error,
353                "config:write",
354                collection,
355                key,
356                Some(err.to_string()),
357                &evidence,
358            );
359            return Err(err);
360        }
361        let latest = match self.latest_config_version(collection, key) {
362            Ok(latest) => latest,
363            Err(err) => {
364                let _ = self.emit_config_mutation_event(
365                    crate::runtime::control_events::EventKind::ConfigWrite,
366                    crate::runtime::control_events::Outcome::Error,
367                    "config:write",
368                    collection,
369                    key,
370                    Some(err.to_string()),
371                    &evidence,
372                );
373                return Err(err);
374            }
375        };
376        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
377        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
378        if let Some(value_type) = value_type {
379            if let Err(err) = validate_config_value_type(&value, value_type) {
380                let _ = self.emit_config_mutation_event(
381                    crate::runtime::control_events::EventKind::ConfigWrite,
382                    crate::runtime::control_events::Outcome::Error,
383                    "config:write",
384                    collection,
385                    key,
386                    Some(err.to_string()),
387                    &evidence,
388                );
389                return Err(err);
390            }
391        }
392        evidence.payload = Some(value.clone());
393        let before = latest.as_ref().and_then(|version| {
394            if version.tombstone {
395                None
396            } else {
397                Some(crate::presentation::entity_json::storage_value_to_json(
398                    &version.value,
399                ))
400            }
401        });
402        let after = Some(crate::presentation::entity_json::storage_value_to_json(
403            &value,
404        ));
405        let change_op = if latest.is_some() {
406            crate::replication::cdc::ChangeOperation::Update
407        } else {
408            crate::replication::cdc::ChangeOperation::Insert
409        };
410        let id = match self.append_config_version(
411            collection,
412            key,
413            value,
414            version,
415            false,
416            op,
417            value_type,
418            schema_version,
419            tags,
420        ) {
421            Ok(id) => id,
422            Err(err) => {
423                let _ = self.emit_config_mutation_event(
424                    crate::runtime::control_events::EventKind::ConfigWrite,
425                    crate::runtime::control_events::Outcome::Error,
426                    "config:write",
427                    collection,
428                    key,
429                    Some(err.to_string()),
430                    &evidence,
431                );
432                return Err(err);
433            }
434        };
435        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
436        if let Err(err) = self.prune_config_history(collection, key) {
437            let _ = self.emit_config_mutation_event(
438                crate::runtime::control_events::EventKind::ConfigWrite,
439                crate::runtime::control_events::Outcome::Error,
440                "config:write",
441                collection,
442                key,
443                Some(err.to_string()),
444                &evidence,
445            );
446            return Err(err);
447        }
448        self.invalidate_result_cache();
449        if let Err(err) = self.emit_config_mutation_event(
450            crate::runtime::control_events::EventKind::ConfigWrite,
451            crate::runtime::control_events::Outcome::Allowed,
452            "config:write",
453            collection,
454            key,
455            None,
456            &evidence,
457        ) {
458            let _ = self.inner.db.store().delete(collection, id);
459            self.invalidate_result_cache();
460            return Err(err);
461        }
462        // #712 / S5A: now that the write is durable and audited, push
463        // the new mode into the live AuthStore so subsequent IAM
464        // decisions honour it without waiting for a restart.
465        if is_enforcement_mode_config(collection, key) {
466            if let Some(auth_store) = self.inner.auth_store.read().clone() {
467                if let Value::Text(text) =
468                    &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
469                {
470                    if let Some(mode) =
471                        crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
472                    {
473                        auth_store.set_enforcement_mode(mode);
474                    }
475                }
476            }
477        }
478        Ok(config_write_output(
479            raw_query,
480            collection,
481            key,
482            version,
483            id,
484            value_type,
485            schema_version,
486            tags,
487            match op {
488                "rotate" => "config_rotate",
489                _ => "config_put",
490            },
491            1,
492        ))
493    }
494
495    fn config_delete_result(
496        &self,
497        raw_query: &str,
498        collection: &str,
499        key: &str,
500    ) -> RedDBResult<RuntimeQueryResult> {
501        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
502            ConfigMutationAuthz::Allowed(evidence) => evidence,
503            ConfigMutationAuthz::Denied { reason, evidence } => {
504                let _ = self.emit_config_mutation_event(
505                    crate::runtime::control_events::EventKind::ConfigDelete,
506                    crate::runtime::control_events::Outcome::Denied,
507                    "config:delete",
508                    collection,
509                    key,
510                    Some(reason.clone()),
511                    &evidence,
512                );
513                return Err(RedDBError::Query(reason));
514            }
515        };
516        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
517            let _ = self.emit_config_mutation_event(
518                crate::runtime::control_events::EventKind::ConfigDelete,
519                crate::runtime::control_events::Outcome::Error,
520                "config:delete",
521                collection,
522                key,
523                Some(err.to_string()),
524                &evidence,
525            );
526            return Err(err);
527        }
528        if let Err(err) = self.ensure_config_collection(collection) {
529            let _ = self.emit_config_mutation_event(
530                crate::runtime::control_events::EventKind::ConfigDelete,
531                crate::runtime::control_events::Outcome::Error,
532                "config:delete",
533                collection,
534                key,
535                Some(err.to_string()),
536                &evidence,
537            );
538            return Err(err);
539        }
540        let latest = match self.latest_config_version(collection, key) {
541            Ok(latest) => latest,
542            Err(err) => {
543                let _ = self.emit_config_mutation_event(
544                    crate::runtime::control_events::EventKind::ConfigDelete,
545                    crate::runtime::control_events::Outcome::Error,
546                    "config:delete",
547                    collection,
548                    key,
549                    Some(err.to_string()),
550                    &evidence,
551                );
552                return Err(err);
553            }
554        };
555        evidence.payload = latest.as_ref().map(|version| version.value.clone());
556        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
557        let value_type = latest.as_ref().and_then(|version| version.value_type);
558        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
559        let id = match self.append_config_version(
560            collection,
561            key,
562            Value::Null,
563            version,
564            true,
565            "delete",
566            value_type,
567            schema_version,
568            &[],
569        ) {
570            Ok(id) => id,
571            Err(err) => {
572                let _ = self.emit_config_mutation_event(
573                    crate::runtime::control_events::EventKind::ConfigDelete,
574                    crate::runtime::control_events::Outcome::Error,
575                    "config:delete",
576                    collection,
577                    key,
578                    Some(err.to_string()),
579                    &evidence,
580                );
581                return Err(err);
582            }
583        };
584        if let Some(before) = latest.as_ref().and_then(|version| {
585            if version.tombstone {
586                None
587            } else {
588                Some(crate::presentation::entity_json::storage_value_to_json(
589                    &version.value,
590                ))
591            }
592        }) {
593            self.record_kv_watch_event(
594                crate::replication::cdc::ChangeOperation::Delete,
595                collection,
596                key,
597                id.raw(),
598                Some(before),
599                None,
600            );
601        }
602        if let Err(err) = self.prune_config_history(collection, key) {
603            let _ = self.emit_config_mutation_event(
604                crate::runtime::control_events::EventKind::ConfigDelete,
605                crate::runtime::control_events::Outcome::Error,
606                "config:delete",
607                collection,
608                key,
609                Some(err.to_string()),
610                &evidence,
611            );
612            return Err(err);
613        }
614        self.invalidate_result_cache();
615        if let Err(err) = self.emit_config_mutation_event(
616            crate::runtime::control_events::EventKind::ConfigDelete,
617            crate::runtime::control_events::Outcome::Allowed,
618            "config:delete",
619            collection,
620            key,
621            None,
622            &evidence,
623        ) {
624            let _ = self.inner.db.store().delete(collection, id);
625            self.invalidate_result_cache();
626            return Err(err);
627        }
628        Ok(config_write_output(
629            raw_query,
630            collection,
631            key,
632            version,
633            id,
634            value_type,
635            schema_version,
636            &[],
637            "delete",
638            1,
639        ))
640    }
641
642    fn config_get_result(
643        &self,
644        raw_query: &str,
645        collection: &str,
646        key: &str,
647    ) -> RedDBResult<RuntimeQueryResult> {
648        self.check_system_config_capability("config:read", collection, key)
649            .map_err(RedDBError::Query)?;
650        let latest = self.latest_config_version(collection, key)?;
651        let mut result = UnifiedResult::with_columns(vec![
652            "collection".into(),
653            "key".into(),
654            "value".into(),
655            "version".into(),
656            "value_type".into(),
657            "schema_version".into(),
658            "tags".into(),
659            "tombstone".into(),
660        ]);
661        let mut record = UnifiedRecord::new();
662        record.set("collection", Value::text(collection.to_string()));
663        record.set("key", Value::text(key.to_string()));
664        if let Some(version) = latest {
665            record.set("value", version.value);
666            record.set("version", Value::Integer(version.version));
667            record.set("value_type", config_value_type_value(version.value_type));
668            record.set(
669                "schema_version",
670                version
671                    .schema_version
672                    .map(Value::Integer)
673                    .unwrap_or(Value::Null),
674            );
675            record.set("tags", config_tags_value(&version.tags));
676            record.set("tombstone", Value::Boolean(version.tombstone));
677        } else {
678            record.set("value", Value::Null);
679            record.set("version", Value::Null);
680            record.set("value_type", Value::Null);
681            record.set("schema_version", Value::Null);
682            record.set("tags", Value::Null);
683            record.set("tombstone", Value::Boolean(false));
684        }
685        result.push(record);
686        Ok(RuntimeQueryResult {
687            query: raw_query.to_string(),
688            mode: crate::storage::query::modes::QueryMode::Sql,
689            statement: "config_get",
690            engine: "config",
691            result,
692            affected_rows: 0,
693            statement_type: "select",
694        })
695    }
696
697    fn config_history_result(
698        &self,
699        raw_query: &str,
700        collection: &str,
701        key: &str,
702    ) -> RedDBResult<RuntimeQueryResult> {
703        self.check_system_config_capability("config:read", collection, key)
704            .map_err(RedDBError::Query)?;
705        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
706        let mut result = UnifiedResult::with_columns(vec![
707            "collection".into(),
708            "key".into(),
709            "version".into(),
710            "value".into(),
711            "value_type".into(),
712            "schema_version".into(),
713            "tags".into(),
714            "tombstone".into(),
715            "op".into(),
716            "created_at_ms".into(),
717        ]);
718        for version in versions {
719            let mut record = UnifiedRecord::new();
720            record.set("collection", Value::text(collection.to_string()));
721            record.set("key", Value::text(key.to_string()));
722            record.set("version", Value::Integer(version.version));
723            record.set("value", version.value);
724            record.set("value_type", config_value_type_value(version.value_type));
725            record.set(
726                "schema_version",
727                version
728                    .schema_version
729                    .map(Value::Integer)
730                    .unwrap_or(Value::Null),
731            );
732            record.set("tags", Value::Null);
733            record.set("tombstone", Value::Boolean(version.tombstone));
734            record.set("op", Value::text(version.op));
735            record.set("created_at_ms", Value::Integer(version.created_at_ms));
736            result.push(record);
737        }
738        Ok(RuntimeQueryResult {
739            query: raw_query.to_string(),
740            mode: crate::storage::query::modes::QueryMode::Sql,
741            statement: "config_history",
742            engine: "config",
743            result,
744            affected_rows: 0,
745            statement_type: "select",
746        })
747    }
748
749    fn config_list_result(
750        &self,
751        raw_query: &str,
752        collection: &str,
753        prefix: Option<&str>,
754        limit: Option<usize>,
755        offset: usize,
756    ) -> RedDBResult<RuntimeQueryResult> {
757        let mut versions = self.latest_config_versions(collection, prefix)?;
758        versions.sort_by(|left, right| left.key.cmp(&right.key));
759        let mut result = UnifiedResult::with_columns(vec![
760            "collection".into(),
761            "key".into(),
762            "value".into(),
763            "version".into(),
764            "value_type".into(),
765            "schema_version".into(),
766            "tags".into(),
767            "tombstone".into(),
768            "op".into(),
769            "created_at_ms".into(),
770        ]);
771        for version in versions
772            .into_iter()
773            .filter(|version| {
774                self.check_config_capability("config:read", collection, &version.key)
775                    .is_ok()
776            })
777            .skip(offset)
778            .take(limit.unwrap_or(usize::MAX))
779        {
780            let mut record = UnifiedRecord::new();
781            record.set("collection", Value::text(collection.to_string()));
782            record.set("key", Value::text(version.key));
783            record.set("value", version.value);
784            record.set("version", Value::Integer(version.version));
785            record.set("value_type", config_value_type_value(version.value_type));
786            record.set(
787                "schema_version",
788                version
789                    .schema_version
790                    .map(Value::Integer)
791                    .unwrap_or(Value::Null),
792            );
793            record.set("tags", config_tags_value(&version.tags));
794            record.set("tombstone", Value::Boolean(version.tombstone));
795            record.set("op", Value::text(version.op));
796            record.set("created_at_ms", Value::Integer(version.created_at_ms));
797            result.push(record);
798        }
799        Ok(RuntimeQueryResult {
800            query: raw_query.to_string(),
801            mode: crate::storage::query::modes::QueryMode::Sql,
802            statement: "config_list",
803            engine: "config",
804            result,
805            affected_rows: 0,
806            statement_type: "select",
807        })
808    }
809
810    fn config_watch_result(
811        &self,
812        raw_query: &str,
813        collection: &str,
814        key: &str,
815        prefix: bool,
816        from_lsn: Option<u64>,
817    ) -> RedDBResult<RuntimeQueryResult> {
818        let watch_key = if prefix {
819            format!("{key}.*")
820        } else {
821            key.to_string()
822        };
823        let endpoint = match from_lsn {
824            Some(lsn) => {
825                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
826            }
827            None => format!("/collections/{collection}/config/{watch_key}/watch"),
828        };
829        let mut result = UnifiedResult::with_columns(vec![
830            "collection".into(),
831            "key".into(),
832            "prefix".into(),
833            "from_lsn".into(),
834            "watch_url".into(),
835            "streaming".into(),
836        ]);
837        let mut record = UnifiedRecord::new();
838        record.set("collection", Value::text(collection.to_string()));
839        record.set("key", Value::text(watch_key));
840        record.set("prefix", Value::Boolean(prefix));
841        record.set(
842            "from_lsn",
843            from_lsn
844                .map(Value::UnsignedInteger)
845                .unwrap_or(crate::storage::schema::Value::Null),
846        );
847        record.set("watch_url", Value::text(endpoint));
848        record.set("streaming", Value::Boolean(true));
849        result.push(record);
850        Ok(RuntimeQueryResult {
851            query: raw_query.to_string(),
852            mode: crate::storage::query::modes::QueryMode::Sql,
853            statement: "config_watch",
854            engine: "config",
855            result,
856            affected_rows: 0,
857            statement_type: "stream",
858        })
859    }
860
861    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
862        let store = self.inner.db.store();
863        if store.get_collection(collection).is_none() {
864            store
865                .create_collection(collection)
866                .map_err(|err| RedDBError::Internal(err.to_string()))?;
867        }
868        if let Some(contract) = self.inner.db.collection_contract(collection) {
869            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
870                CollectionModel::Config,
871                contract.declared_model,
872            )?;
873            return Ok(());
874        }
875        let now = current_unix_ms();
876        self.inner
877            .db
878            .save_collection_contract(CollectionContract {
879                name: collection.to_string(),
880                declared_model: CollectionModel::Config,
881                schema_mode: SchemaMode::Dynamic,
882                origin: ContractOrigin::Explicit,
883                version: 1,
884                created_at_unix_ms: now as u128,
885                updated_at_unix_ms: now as u128,
886                default_ttl_ms: None,
887                vector_dimension: None,
888                vector_metric: None,
889                context_index_fields: Vec::new(),
890                declared_columns: Vec::new(),
891                table_def: None,
892                timestamps_enabled: false,
893                context_index_enabled: false,
894                metrics_raw_retention_ms: None,
895                metrics_rollup_policies: Vec::new(),
896                metrics_tenant_identity: None,
897                metrics_namespace: None,
898                append_only: false,
899                subscriptions: Vec::new(),
900                session_key: None,
901                session_gap_ms: None,
902                retention_duration_ms: None,
903            })
904            .map(|_| ())
905            .map_err(|err| RedDBError::Internal(err.to_string()))
906    }
907
908    fn append_config_version(
909        &self,
910        collection: &str,
911        key: &str,
912        value: Value,
913        version: i64,
914        tombstone: bool,
915        op: &str,
916        value_type: Option<ConfigValueType>,
917        schema_version: Option<i64>,
918        tags: &[String],
919    ) -> RedDBResult<EntityId> {
920        let now = current_unix_ms() as i64;
921        let fields = vec![
922            ("key".to_string(), Value::text(key.to_string())),
923            ("value".to_string(), value),
924            ("version".to_string(), Value::Integer(version)),
925            (
926                "value_type".to_string(),
927                config_value_type_value(value_type),
928            ),
929            (
930                "schema_version".to_string(),
931                schema_version.map(Value::Integer).unwrap_or(Value::Null),
932            ),
933            ("tombstone".to_string(), Value::Boolean(tombstone)),
934            ("op".to_string(), Value::text(op.to_string())),
935            ("created_at_ms".to_string(), Value::Integer(now)),
936            ("tags".to_string(), config_tags_value(tags)),
937        ];
938        let mut row = RowData::new(Vec::new());
939        row.named = Some(fields.into_iter().collect());
940        let entity = UnifiedEntity::new(
941            EntityId::new(0),
942            EntityKind::TableRow {
943                table: Arc::from(collection),
944                row_id: 0,
945            },
946            EntityData::Row(row),
947        );
948        self.inner
949            .db
950            .store()
951            .insert(collection, entity)
952            .map_err(|err| RedDBError::Internal(err.to_string()))
953    }
954
955    fn latest_config_version(
956        &self,
957        collection: &str,
958        key: &str,
959    ) -> RedDBResult<Option<ConfigVersion>> {
960        Ok(super::keyed_spine::latest_version(
961            self.config_versions(collection, key)?,
962        ))
963    }
964
965    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
966        let store = self.inner.db.store();
967        let Some(manager) = store.get_collection(collection) else {
968            return Ok(Vec::new());
969        };
970        let mut versions = Vec::new();
971        for entity in manager.query_all(|_| true) {
972            let EntityData::Row(row) = &entity.data else {
973                continue;
974            };
975            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
976                continue;
977            };
978            if version.key != key {
979                continue;
980            }
981            versions.push(ConfigVersion::from_keyed_row(version, row));
982        }
983        Ok(versions)
984    }
985
986    fn latest_config_versions(
987        &self,
988        collection: &str,
989        prefix: Option<&str>,
990    ) -> RedDBResult<Vec<ConfigVersion>> {
991        let store = self.inner.db.store();
992        let Some(manager) = store.get_collection(collection) else {
993            return Ok(Vec::new());
994        };
995        let mut versions = Vec::new();
996        for entity in manager.query_all(|_| true) {
997            let EntityData::Row(row) = &entity.data else {
998                continue;
999            };
1000            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1001                continue;
1002            };
1003            versions.push(ConfigVersion::from_keyed_row(version, row));
1004        }
1005        Ok(super::keyed_spine::latest_versions(versions, prefix))
1006    }
1007
1008    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1009        let mut versions = self.config_versions(collection, key)?;
1010        if versions.len() <= CONFIG_HISTORY_LIMIT {
1011            return Ok(());
1012        }
1013        versions = super::keyed_spine::history_versions(versions);
1014        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1015        let store = self.inner.db.store();
1016        for version in versions.into_iter().take(drop_count) {
1017            store
1018                .delete(collection, version.id)
1019                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1020        }
1021        Ok(())
1022    }
1023
1024    fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1025        let default_evidence = self.default_config_mutation_evidence(collection, key);
1026        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1027            return ConfigMutationAuthz::Allowed(default_evidence);
1028        };
1029        if !auth_store.iam_authorization_enabled() {
1030            return ConfigMutationAuthz::Allowed(default_evidence);
1031        }
1032        let Some((principal, role)) = current_auth_identity() else {
1033            return ConfigMutationAuthz::Denied {
1034                reason:
1035                    "IAM authorization is enabled; config capability check requires an authenticated principal"
1036                        .to_string(),
1037                evidence: default_evidence,
1038            };
1039        };
1040        let tenant = current_tenant();
1041        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1042        let ctx = crate::auth::policies::EvalContext {
1043            principal_tenant: tenant.clone(),
1044            current_tenant: tenant.clone(),
1045            peer_ip: None,
1046            mfa_present: false,
1047            now_ms: crate::utils::now_unix_millis() as u128,
1048            principal_is_admin_role: role == crate::auth::Role::Admin,
1049            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1050            principal_is_platform_scoped: principal_id.tenant.is_none(),
1051        };
1052        let managed_key = if collection == "red.config" {
1053            format!("red.config.{key}")
1054        } else {
1055            key.to_string()
1056        };
1057        let gate = crate::auth::managed_config::ManagedConfigGate::new(
1058            self.inner.config_registry.as_ref(),
1059        );
1060        match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1061            crate::auth::managed_config::ManagedConfigDecision::Allow {
1062                entry_id,
1063                resource_type,
1064                managed,
1065                mutability,
1066                matched_action,
1067                matched_resource,
1068                ..
1069            } => {
1070                return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1071                    id: entry_id,
1072                    resource_type,
1073                    managed,
1074                    mutability,
1075                    matched_action: Some(matched_action),
1076                    matched_resource: Some(matched_resource),
1077                    payload: None,
1078                });
1079            }
1080            crate::auth::managed_config::ManagedConfigDecision::Deny {
1081                entry_id,
1082                resource_type,
1083                managed,
1084                mutability,
1085                matched_action,
1086                matched_resource,
1087                reason,
1088                ..
1089            } => {
1090                return ConfigMutationAuthz::Denied {
1091                    reason: format!(
1092                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1093                    ),
1094                    evidence: ConfigMutationEvidence {
1095                        id: entry_id,
1096                        resource_type,
1097                        managed,
1098                        mutability,
1099                        matched_action: Some(matched_action),
1100                        matched_resource: Some(matched_resource),
1101                        payload: None,
1102                    },
1103                };
1104            }
1105            crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1106        }
1107
1108        let mut resource = crate::auth::policies::ResourceRef::new(
1109            "config",
1110            config_target_resource(collection, key),
1111        );
1112        if let Some(ref tenant) = tenant {
1113            resource = resource.with_tenant(tenant.clone());
1114        }
1115        if auth_store.check_policy_authz_with_role(
1116            &principal_id,
1117            "config:write",
1118            &resource,
1119            &ctx,
1120            role,
1121        ) {
1122            ConfigMutationAuthz::Allowed(default_evidence)
1123        } else {
1124            ConfigMutationAuthz::Denied {
1125                reason: format!(
1126                    "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1127                    principal,
1128                    config_target_resource(collection, key)
1129                ),
1130                evidence: default_evidence,
1131            }
1132        }
1133    }
1134
1135    fn default_config_mutation_evidence(
1136        &self,
1137        collection: &str,
1138        key: &str,
1139    ) -> ConfigMutationEvidence {
1140        let id = if collection == "red.config" {
1141            format!("red.config.{key}")
1142        } else {
1143            key.to_string()
1144        };
1145        ConfigMutationEvidence {
1146            id,
1147            resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1148            managed: false,
1149            mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1150            matched_action: None,
1151            matched_resource: None,
1152            payload: None,
1153        }
1154    }
1155
1156    fn emit_config_mutation_event(
1157        &self,
1158        kind: crate::runtime::control_events::EventKind,
1159        outcome: crate::runtime::control_events::Outcome,
1160        action: &'static str,
1161        collection: &str,
1162        key: &str,
1163        reason: Option<String>,
1164        evidence: &ConfigMutationEvidence,
1165    ) -> RedDBResult<()> {
1166        use crate::runtime::control_events::{
1167            ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1168        };
1169
1170        let tenant = current_tenant();
1171        let principal = current_auth_identity();
1172        let actor_user = principal
1173            .as_ref()
1174            .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1175        let actor = actor_user
1176            .as_ref()
1177            .map(ActorRef::User)
1178            .unwrap_or(ActorRef::Anonymous);
1179        let ctx = ControlEventCtx {
1180            actor,
1181            scope: tenant
1182                .as_ref()
1183                .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1184            request_id: Some(std::borrow::Cow::Owned(format!(
1185                "conn-{}",
1186                current_connection_id()
1187            ))),
1188            trace_id: None,
1189        };
1190
1191        let mut fields = HashMap::new();
1192        fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1193        fields.insert(
1194            "resource_type".to_string(),
1195            Sensitivity::raw(evidence.resource_type.clone()),
1196        );
1197        fields.insert(
1198            "managed".to_string(),
1199            Sensitivity::raw(evidence.managed.to_string()),
1200        );
1201        fields.insert(
1202            "mutability".to_string(),
1203            Sensitivity::raw(config_mutability_label(evidence.mutability)),
1204        );
1205        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1206        fields.insert("key".to_string(), Sensitivity::raw(key));
1207        fields.insert(
1208            "connection_id".to_string(),
1209            Sensitivity::raw(current_connection_id().to_string()),
1210        );
1211        if let Some((_, role)) = principal {
1212            fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1213        }
1214        if let Some(matched_action) = &evidence.matched_action {
1215            fields.insert(
1216                "matched_action".to_string(),
1217                Sensitivity::raw(matched_action.clone()),
1218            );
1219        }
1220        if let Some(matched_resource) = &evidence.matched_resource {
1221            fields.insert(
1222                "matched_resource".to_string(),
1223                Sensitivity::raw(matched_resource.clone()),
1224            );
1225        }
1226        if let Some(payload) = &evidence.payload {
1227            fields.insert(
1228                "payload".to_string(),
1229                config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1230            );
1231        }
1232
1233        let event = ControlEvent {
1234            kind,
1235            outcome,
1236            action: std::borrow::Cow::Borrowed(action),
1237            resource: Some(format!(
1238                "config:{}",
1239                config_target_resource(collection, key)
1240            )),
1241            reason,
1242            matched_policy_id: None,
1243            fields,
1244        };
1245        let ledger = self.inner.control_event_ledger.read();
1246        match ledger.emit(&ctx, event) {
1247            Ok(_) => Ok(()),
1248            Err(err) if self.inner.control_event_config.require_persistence() => {
1249                Err(RedDBError::Internal(err.to_string()))
1250            }
1251            Err(_) => Ok(()),
1252        }
1253    }
1254
1255    fn check_config_capability(
1256        &self,
1257        action: &str,
1258        collection: &str,
1259        key: &str,
1260    ) -> Result<(), String> {
1261        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1262            return Ok(());
1263        };
1264        if !auth_store.iam_authorization_enabled() {
1265            return Ok(());
1266        }
1267        let Some((principal, role)) = current_auth_identity() else {
1268            return Err(
1269                "IAM authorization is enabled; config capability check requires an authenticated principal"
1270                    .to_string(),
1271            );
1272        };
1273        let tenant = current_tenant();
1274        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1275        let mut resource = crate::auth::policies::ResourceRef::new(
1276            "config",
1277            config_target_resource(collection, key),
1278        );
1279        if let Some(ref tenant) = tenant {
1280            resource = resource.with_tenant(tenant.clone());
1281        }
1282        let ctx = crate::auth::policies::EvalContext {
1283            principal_tenant: tenant.clone(),
1284            current_tenant: tenant,
1285            peer_ip: None,
1286            mfa_present: false,
1287            now_ms: crate::utils::now_unix_millis() as u128,
1288            principal_is_admin_role: role == crate::auth::Role::Admin,
1289            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1290            principal_is_platform_scoped: principal_id.tenant.is_none(),
1291        };
1292        if action == "config:write" {
1293            let managed_key = if collection == "red.config" {
1294                format!("red.config.{key}")
1295            } else {
1296                key.to_string()
1297            };
1298            let gate = crate::auth::managed_config::ManagedConfigGate::new(
1299                self.inner.config_registry.as_ref(),
1300            );
1301            match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1302                crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1303                crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1304                crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1305                    return Err(format!(
1306                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1307                    ));
1308                }
1309            }
1310        }
1311        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1312            Ok(())
1313        } else {
1314            Err(format!(
1315                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1316                principal,
1317                action,
1318                config_target_resource(collection, key)
1319            ))
1320        }
1321    }
1322
1323    fn check_system_config_capability(
1324        &self,
1325        action: &str,
1326        collection: &str,
1327        key: &str,
1328    ) -> Result<(), String> {
1329        if collection != "red.config" {
1330            return Ok(());
1331        }
1332        self.check_config_capability(action, collection, key)
1333    }
1334
1335    pub fn config_watch_events_since(
1336        &self,
1337        collection: &str,
1338        key: &str,
1339        since_lsn: u64,
1340        max_count: usize,
1341    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1342        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1343            .into_iter()
1344            .map(|event| self.policy_filter_config_watch_event(event))
1345            .collect()
1346    }
1347
1348    pub fn config_watch_events_since_prefix(
1349        &self,
1350        collection: &str,
1351        prefix: &str,
1352        since_lsn: u64,
1353        max_count: usize,
1354    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1355        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1356            .into_iter()
1357            .map(|event| self.policy_filter_config_watch_event(event))
1358            .collect()
1359    }
1360
1361    fn policy_filter_config_watch_event(
1362        &self,
1363        mut event: crate::replication::cdc::KvWatchEvent,
1364    ) -> crate::replication::cdc::KvWatchEvent {
1365        if self
1366            .check_config_capability("config:read", &event.collection, &event.key)
1367            .is_err()
1368        {
1369            event.before = None;
1370            event.after = None;
1371        }
1372        event
1373    }
1374
1375    fn audit_config_resolve(
1376        &self,
1377        collection: &str,
1378        key: &str,
1379        secret_ref: Option<&ConfigSecretRef>,
1380        outcome: crate::runtime::audit_log::Outcome,
1381        reason: &str,
1382    ) {
1383        let actor = current_auth_identity()
1384            .map(|(principal, _)| principal)
1385            .unwrap_or_else(|| "anonymous".to_string());
1386        let request_id = match current_connection_id() {
1387            0 => "embedded".to_string(),
1388            id => format!("conn-{id}"),
1389        };
1390        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1391            .principal(actor.clone())
1392            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1393            .resource(format!(
1394                "config:{}",
1395                config_target_resource(collection, key)
1396            ))
1397            .outcome(outcome)
1398            .correlation_id(request_id.clone())
1399            .fields([
1400                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1401                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1402                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1403                crate::runtime::audit_log::AuditFieldEscaper::field(
1404                    "target",
1405                    config_target_resource(collection, key),
1406                ),
1407                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1408                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1409                crate::runtime::audit_log::AuditFieldEscaper::field(
1410                    "connection_id",
1411                    current_connection_id(),
1412                ),
1413            ]);
1414        if let Some(tenant) = current_tenant() {
1415            builder = builder.tenant(tenant);
1416        }
1417        if let Some(secret_ref) = secret_ref {
1418            builder = builder.fields([
1419                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1420                crate::runtime::audit_log::AuditFieldEscaper::field(
1421                    "resolved_collection",
1422                    secret_ref.collection.as_str(),
1423                ),
1424                crate::runtime::audit_log::AuditFieldEscaper::field(
1425                    "resolved_key",
1426                    secret_ref.key.as_str(),
1427                ),
1428                crate::runtime::audit_log::AuditFieldEscaper::field(
1429                    "resolved_target",
1430                    format!("{}.{}", secret_ref.collection, secret_ref.key),
1431                ),
1432            ]);
1433        }
1434        self.audit_log().record_event(builder.build());
1435    }
1436}
1437
1438fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1439    let Value::Json(bytes) = value else {
1440        return Err(RedDBError::InvalidConfig(
1441            "CONFIG value is not a SecretRef".to_string(),
1442        ));
1443    };
1444    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1445        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1446    })?;
1447    let Some(object) = json.as_object() else {
1448        return Err(RedDBError::InvalidConfig(
1449            "CONFIG SecretRef must be an object".to_string(),
1450        ));
1451    };
1452    let get_str = |field: &str| -> RedDBResult<&str> {
1453        object
1454            .get(field)
1455            .and_then(|value| value.as_str())
1456            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1457    };
1458    if get_str("type")? != "secret_ref" {
1459        return Err(RedDBError::InvalidConfig(
1460            "CONFIG value is not a SecretRef".to_string(),
1461        ));
1462    }
1463    if get_str("store")? != "vault" {
1464        return Err(RedDBError::InvalidConfig(
1465            "CONFIG SecretRef store is unsupported".to_string(),
1466        ));
1467    }
1468    Ok(ConfigSecretRef {
1469        collection: get_str("collection")?.to_string(),
1470        key: get_str("key")?.to_string(),
1471    })
1472}
1473
1474fn config_target_resource(collection: &str, key: &str) -> String {
1475    if collection == "red.config" {
1476        format!("red.config/{}", key.to_ascii_lowercase())
1477    } else {
1478        format!("{collection}.{key}")
1479    }
1480}
1481
1482fn config_write_output(
1483    raw_query: &str,
1484    collection: &str,
1485    key: &str,
1486    version: i64,
1487    id: EntityId,
1488    value_type: Option<ConfigValueType>,
1489    schema_version: Option<i64>,
1490    tags: &[String],
1491    statement: &'static str,
1492    affected_rows: u64,
1493) -> RuntimeQueryResult {
1494    let mut result = UnifiedResult::with_columns(vec![
1495        "ok".into(),
1496        "collection".into(),
1497        "key".into(),
1498        "version".into(),
1499        "value_type".into(),
1500        "schema_version".into(),
1501        "tags".into(),
1502        "id".into(),
1503    ]);
1504    let mut record = UnifiedRecord::new();
1505    record.set("ok", Value::Boolean(true));
1506    record.set("collection", Value::text(collection.to_string()));
1507    record.set("key", Value::text(key.to_string()));
1508    record.set("version", Value::Integer(version));
1509    record.set("value_type", config_value_type_value(value_type));
1510    record.set(
1511        "schema_version",
1512        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1513    );
1514    record.set("tags", config_tags_value(tags));
1515    record.set("id", Value::Integer(id.raw() as i64));
1516    result.push(record);
1517    RuntimeQueryResult {
1518        query: raw_query.to_string(),
1519        mode: crate::storage::query::modes::QueryMode::Sql,
1520        statement,
1521        engine: "config",
1522        result,
1523        affected_rows,
1524        statement_type: if statement == "delete" {
1525            "delete"
1526        } else {
1527            "update"
1528        },
1529    }
1530}
1531
1532fn invalid_config_volatility(operation: &str) -> RedDBError {
1533    RedDBError::InvalidOperation(format!(
1534        "CONFIG does not support KV-only volatility operation {operation}"
1535    ))
1536}
1537
1538fn resolve_config_schema(
1539    latest: Option<&ConfigVersion>,
1540    requested_type: Option<ConfigValueType>,
1541) -> (Option<ConfigValueType>, Option<i64>) {
1542    let previous_type = latest.and_then(|version| version.value_type);
1543    let previous_schema_version = latest.and_then(|version| version.schema_version);
1544    match requested_type {
1545        Some(value_type) if Some(value_type) != previous_type => (
1546            Some(value_type),
1547            Some(previous_schema_version.unwrap_or(0) + 1),
1548        ),
1549        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1550        None => (previous_type, previous_schema_version),
1551    }
1552}
1553
1554fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1555    let valid = match value_type {
1556        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1557        ConfigValueType::Int => matches!(
1558            value,
1559            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1560        ),
1561        ConfigValueType::String => matches!(value, Value::Text(_)),
1562        ConfigValueType::Url => validate_config_url(value),
1563        ConfigValueType::Object => validate_config_json_shape(value, true),
1564        ConfigValueType::Array => {
1565            matches!(value, Value::Array(_) | Value::Vector(_))
1566                || validate_config_json_shape(value, false)
1567        }
1568    };
1569    if valid {
1570        Ok(())
1571    } else {
1572        Err(RedDBError::InvalidConfig(format!(
1573            "CONFIG value type mismatch: expected {}, got {}",
1574            value_type.as_str(),
1575            config_actual_value_type(value),
1576        )))
1577    }
1578}
1579
1580/// `true` when `(collection, key)` addresses the policy enforcement
1581/// mode flag (#712 / S5A). The flag lives under `red.config` so the
1582/// rest of the config infrastructure (managed-config registry, audit,
1583/// history) governs it for free.
1584fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1585    collection == "red.config" && key == "policy.enforcement_mode"
1586}
1587
1588/// Reject any value the policy evaluator does not understand. Surfaced
1589/// as `InvalidConfig` so the SQL error path mirrors how other config
1590/// validation failures are reported.
1591fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1592    let text = match value {
1593        Value::Text(text) => text.as_ref(),
1594        _ => {
1595            return Err(RedDBError::InvalidConfig(format!(
1596                "config key `{}` must be a string ({} or {}); got {}",
1597                crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1598                crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1599                crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1600                config_actual_value_type(value),
1601            )));
1602        }
1603    };
1604    if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1605        Ok(())
1606    } else {
1607        Err(RedDBError::InvalidConfig(format!(
1608            "config key `{}` accepts only `{}` or `{}`, got `{}`",
1609            crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1610            crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1611            crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1612            text,
1613        )))
1614    }
1615}
1616
1617fn validate_config_url(value: &Value) -> bool {
1618    let url = match value {
1619        Value::Url(value) => value.as_str(),
1620        Value::Text(value) => value.as_ref(),
1621        _ => return false,
1622    };
1623    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1624}
1625
1626fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1627    let Value::Json(bytes) = value else {
1628        return false;
1629    };
1630    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1631        return false;
1632    };
1633    matches!(
1634        (object, json),
1635        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1636    )
1637}
1638
1639fn config_actual_value_type(value: &Value) -> &'static str {
1640    match value {
1641        Value::Null => "null",
1642        Value::Boolean(_) => "bool",
1643        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1644        Value::Text(_) => "string",
1645        Value::Url(_) => "url",
1646        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1647            Ok(crate::json::Value::Object(_)) => "object",
1648            Ok(crate::json::Value::Array(_)) => "array",
1649            _ => "json",
1650        },
1651        Value::Array(_) | Value::Vector(_) => "array",
1652        _ => "other",
1653    }
1654}
1655
1656fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1657    value_type
1658        .map(|value_type| Value::text(value_type.as_str()))
1659        .unwrap_or(Value::Null)
1660}
1661
1662fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1663    match value {
1664        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1665        _ => None,
1666    }
1667}
1668
1669fn config_tags_value(tags: &[String]) -> Value {
1670    if tags.is_empty() {
1671        return Value::Null;
1672    }
1673    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1674}
1675
1676fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1677    match value {
1678        Some(Value::Array(values)) => values
1679            .iter()
1680            .filter_map(|value| match value {
1681                Value::Text(tag) => Some(tag.to_string()),
1682                _ => None,
1683            })
1684            .collect(),
1685        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1686            .ok()
1687            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1688            .map(|values| {
1689                values
1690                    .into_iter()
1691                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1692                    .collect()
1693            })
1694            .unwrap_or_default(),
1695        _ => Vec::new(),
1696    }
1697}
1698
1699fn config_payload_sensitivity(
1700    resource_type: &str,
1701    field: &str,
1702    value: &Value,
1703) -> crate::runtime::control_events::Sensitivity {
1704    let payload = config_payload_bytes(value);
1705    if config_payload_raw_allowed(resource_type, field) {
1706        crate::runtime::control_events::Sensitivity::raw(
1707            String::from_utf8_lossy(&payload).into_owned(),
1708        )
1709    } else {
1710        crate::runtime::control_events::Sensitivity::hashed(&payload)
1711    }
1712}
1713
1714fn config_payload_bytes(value: &Value) -> Vec<u8> {
1715    let json = crate::presentation::entity_json::storage_value_to_json(value);
1716    crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1717}
1718
1719fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1720    const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1721    RAW_PAYLOAD_FIELDS
1722        .iter()
1723        .any(|(allowed_type, allowed_field)| {
1724            *allowed_type == resource_type && *allowed_field == field
1725        })
1726}
1727
1728fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1729    match mutability {
1730        crate::auth::registry::Mutability::Immutable => "immutable",
1731        crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1732    }
1733}
1734
1735fn current_unix_ms() -> u64 {
1736    std::time::SystemTime::now()
1737        .duration_since(std::time::UNIX_EPOCH)
1738        .unwrap_or_default()
1739        .as_millis() as u64
1740}