Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18    id: EntityId,
19    key: String,
20    version: i64,
21    value: Value,
22    tombstone: bool,
23    created_at_ms: i64,
24    op: String,
25    value_type: Option<ConfigValueType>,
26    schema_version: Option<i64>,
27    tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31    fn key(&self) -> &str {
32        &self.key
33    }
34
35    fn version(&self) -> i64 {
36        self.version
37    }
38}
39
40impl ConfigVersion {
41    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42        Self {
43            id: version.id,
44            key: version.key,
45            version: version.version,
46            value: version.value,
47            tombstone: version.tombstone,
48            created_at_ms: version.created_at_ms,
49            op: version.op,
50            value_type: row
51                .get_field("value_type")
52                .and_then(config_value_type_from_value),
53            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54            tags: config_tags_from_value(row.get_field("tags")),
55        }
56    }
57}
58
59struct ConfigSecretRef {
60    collection: String,
61    key: String,
62}
63
64struct ConfigMutationEvidence {
65    id: String,
66    resource_type: String,
67    managed: bool,
68    mutability: crate::auth::registry::Mutability,
69    matched_action: Option<String>,
70    matched_resource: Option<String>,
71    payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75    Allowed(ConfigMutationEvidence),
76    Denied {
77        reason: String,
78        evidence: ConfigMutationEvidence,
79    },
80}
81
82impl RedDBRuntime {
83    pub fn execute_config_command(
84        &self,
85        raw_query: &str,
86        cmd: &crate::storage::query::ast::ConfigCommand,
87    ) -> RedDBResult<RuntimeQueryResult> {
88        use crate::storage::query::ast::ConfigCommand;
89
90        match cmd {
91            ConfigCommand::Put {
92                collection,
93                key,
94                value,
95                value_type,
96                tags,
97            } => self.config_write_result(
98                raw_query,
99                collection,
100                key,
101                value.clone(),
102                *value_type,
103                tags,
104                "put",
105            ),
106            ConfigCommand::Rotate {
107                collection,
108                key,
109                value,
110                value_type,
111                tags,
112            } => self.config_write_result(
113                raw_query,
114                collection,
115                key,
116                value.clone(),
117                *value_type,
118                tags,
119                "rotate",
120            ),
121            ConfigCommand::Get { collection, key } => {
122                self.config_get_result(raw_query, collection, key)
123            }
124            ConfigCommand::Resolve { collection, key } => {
125                self.config_resolve_result(raw_query, collection, key)
126            }
127            ConfigCommand::Delete { collection, key } => {
128                self.config_delete_result(raw_query, collection, key)
129            }
130            ConfigCommand::History { collection, key } => {
131                self.config_history_result(raw_query, collection, key)
132            }
133            ConfigCommand::List {
134                collection,
135                prefix,
136                limit,
137                offset,
138            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139            ConfigCommand::Watch {
140                collection,
141                key,
142                prefix,
143                from_lsn,
144            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146                Err(invalid_config_volatility(operation))
147            }
148        }
149    }
150
151    pub(crate) fn validate_config_command_before_auth(
152        &self,
153        cmd: &crate::storage::query::ast::ConfigCommand,
154    ) -> RedDBResult<()> {
155        use crate::storage::query::ast::ConfigCommand;
156        match cmd {
157            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158                Err(invalid_config_volatility(operation))
159            }
160            ConfigCommand::Put { collection, .. }
161            | ConfigCommand::Get { collection, .. }
162            | ConfigCommand::Resolve { collection, .. }
163            | ConfigCommand::Rotate { collection, .. }
164            | ConfigCommand::Delete { collection, .. }
165            | ConfigCommand::History { collection, .. }
166            | ConfigCommand::List { collection, .. }
167            | ConfigCommand::Watch { collection, .. } => {
168                let snapshot = self.inner.db.catalog_model_snapshot();
169                let Some(actual_model) = snapshot
170                    .collections
171                    .iter()
172                    .find(|c| c.name == *collection)
173                    .map(|c| c.declared_model.unwrap_or(c.model))
174                else {
175                    return Ok(());
176                };
177                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178                    CollectionModel::Config,
179                    actual_model,
180                )
181            }
182        }
183    }
184
185    fn config_resolve_result(
186        &self,
187        raw_query: &str,
188        collection: &str,
189        key: &str,
190    ) -> RedDBResult<RuntimeQueryResult> {
191        let latest = self.latest_config_version(collection, key)?;
192        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193            self.audit_config_resolve(
194                collection,
195                key,
196                None,
197                crate::runtime::audit_log::Outcome::Denied,
198                &reason,
199            );
200            return Err(RedDBError::Query(reason));
201        }
202
203        let Some(version) = latest else {
204            let reason = "not_found";
205            self.audit_config_resolve(
206                collection,
207                key,
208                None,
209                crate::runtime::audit_log::Outcome::Denied,
210                reason,
211            );
212            return Err(RedDBError::NotFound(format!(
213                "config '{}.{}' not found",
214                collection, key
215            )));
216        };
217        if version.tombstone {
218            let reason = "deleted";
219            self.audit_config_resolve(
220                collection,
221                key,
222                None,
223                crate::runtime::audit_log::Outcome::Denied,
224                reason,
225            );
226            return Err(RedDBError::NotFound(format!(
227                "config '{}.{}' is deleted",
228                collection, key
229            )));
230        }
231
232        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233            self.audit_config_resolve(
234                collection,
235                key,
236                None,
237                crate::runtime::audit_log::Outcome::Error,
238                &err.to_string(),
239            );
240        })?;
241
242        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243            Ok(value) => {
244                if value_looks_like_secret_ref(&value) {
245                    let err = secret_ref_chain_error(
246                        collection,
247                        key,
248                        &secret_ref.collection,
249                        &secret_ref.key,
250                    );
251                    let reason = err.to_string();
252                    self.audit_config_resolve(
253                        collection,
254                        key,
255                        Some(&secret_ref),
256                        crate::runtime::audit_log::Outcome::Error,
257                        &reason,
258                    );
259                    return Err(err);
260                }
261                self.audit_config_resolve(
262                    collection,
263                    key,
264                    Some(&secret_ref),
265                    crate::runtime::audit_log::Outcome::Success,
266                    "ok",
267                );
268                let mut result = UnifiedResult::with_columns(vec![
269                    "collection".into(),
270                    "key".into(),
271                    "value".into(),
272                    "resolved_store".into(),
273                    "resolved_collection".into(),
274                    "resolved_key".into(),
275                ]);
276                let mut record = UnifiedRecord::new();
277                record.set("collection", Value::text(collection.to_string()));
278                record.set("key", Value::text(key.to_string()));
279                record.set("value", value);
280                record.set("resolved_store", Value::text("vault"));
281                record.set("resolved_collection", Value::text(secret_ref.collection));
282                record.set("resolved_key", Value::text(secret_ref.key));
283                result.push(record);
284                Ok(RuntimeQueryResult {
285                    query: raw_query.to_string(),
286                    mode: crate::storage::query::modes::QueryMode::Sql,
287                    statement: "config_resolve",
288                    engine: "config",
289                    result,
290                    affected_rows: 0,
291                    statement_type: "select",
292                    bookmark: None,
293                })
294            }
295            Err(err) => {
296                let reason = err.to_string();
297                let outcome = if reason.contains("denied") {
298                    crate::runtime::audit_log::Outcome::Denied
299                } else {
300                    crate::runtime::audit_log::Outcome::Error
301                };
302                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
303                Err(err)
304            }
305        }
306    }
307
308    fn config_write_result(
309        &self,
310        raw_query: &str,
311        collection: &str,
312        key: &str,
313        value: Value,
314        requested_type: Option<ConfigValueType>,
315        tags: &[String],
316        op: &str,
317    ) -> RedDBResult<RuntimeQueryResult> {
318        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
319            ConfigMutationAuthz::Allowed(evidence) => evidence,
320            ConfigMutationAuthz::Denied {
321                reason,
322                mut evidence,
323            } => {
324                evidence.payload = Some(value.clone());
325                let _ = self.emit_config_mutation_event(
326                    crate::runtime::control_events::EventKind::ConfigWrite,
327                    crate::runtime::control_events::Outcome::Denied,
328                    "config:write",
329                    collection,
330                    key,
331                    Some(reason.clone()),
332                    &evidence,
333                );
334                return Err(RedDBError::Query(reason));
335            }
336        };
337        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
338            let _ = self.emit_config_mutation_event(
339                crate::runtime::control_events::EventKind::ConfigWrite,
340                crate::runtime::control_events::Outcome::Error,
341                "config:write",
342                collection,
343                key,
344                Some(err.to_string()),
345                &evidence,
346            );
347            return Err(err);
348        }
349        // #712 / S5A: reject invalid values for the enforcement-mode
350        // config key before any storage mutation. The allowlist lives
351        // on PolicyEnforcementMode::parse so it stays in lockstep with
352        // the evaluator's understanding of the modes.
353        if is_enforcement_mode_config(collection, key) {
354            if let Err(err) = validate_enforcement_mode_value(&value) {
355                let _ = self.emit_config_mutation_event(
356                    crate::runtime::control_events::EventKind::ConfigWrite,
357                    crate::runtime::control_events::Outcome::Denied,
358                    "config:write",
359                    collection,
360                    key,
361                    Some(err.to_string()),
362                    &evidence,
363                );
364                return Err(err);
365            }
366        }
367        if let Err(err) = self.ensure_config_collection(collection) {
368            let _ = self.emit_config_mutation_event(
369                crate::runtime::control_events::EventKind::ConfigWrite,
370                crate::runtime::control_events::Outcome::Error,
371                "config:write",
372                collection,
373                key,
374                Some(err.to_string()),
375                &evidence,
376            );
377            return Err(err);
378        }
379        let latest = match self.latest_config_version(collection, key) {
380            Ok(latest) => latest,
381            Err(err) => {
382                let _ = self.emit_config_mutation_event(
383                    crate::runtime::control_events::EventKind::ConfigWrite,
384                    crate::runtime::control_events::Outcome::Error,
385                    "config:write",
386                    collection,
387                    key,
388                    Some(err.to_string()),
389                    &evidence,
390                );
391                return Err(err);
392            }
393        };
394        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
395        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
396        if let Some(value_type) = value_type {
397            if let Err(err) = validate_config_value_type(&value, value_type) {
398                let _ = self.emit_config_mutation_event(
399                    crate::runtime::control_events::EventKind::ConfigWrite,
400                    crate::runtime::control_events::Outcome::Error,
401                    "config:write",
402                    collection,
403                    key,
404                    Some(err.to_string()),
405                    &evidence,
406                );
407                return Err(err);
408            }
409        }
410        evidence.payload = Some(value.clone());
411        if let Some(reason) = self.secret_ref_guard_write_check(collection, key, &value) {
412            let _ = self.emit_config_mutation_event(
413                crate::runtime::control_events::EventKind::ConfigWrite,
414                crate::runtime::control_events::Outcome::Denied,
415                "config:write",
416                collection,
417                key,
418                Some(reason.to_string()),
419                &evidence,
420            );
421            return Err(reason);
422        }
423        let before = latest.as_ref().and_then(|version| {
424            if version.tombstone {
425                None
426            } else {
427                Some(crate::presentation::entity_json::storage_value_to_json(
428                    &version.value,
429                ))
430            }
431        });
432        let after = Some(crate::presentation::entity_json::storage_value_to_json(
433            &value,
434        ));
435        let change_op = if latest.is_some() {
436            crate::replication::cdc::ChangeOperation::Update
437        } else {
438            crate::replication::cdc::ChangeOperation::Insert
439        };
440        let id = match self.append_config_version(
441            collection,
442            key,
443            value,
444            version,
445            false,
446            op,
447            value_type,
448            schema_version,
449            tags,
450        ) {
451            Ok(id) => id,
452            Err(err) => {
453                let _ = self.emit_config_mutation_event(
454                    crate::runtime::control_events::EventKind::ConfigWrite,
455                    crate::runtime::control_events::Outcome::Error,
456                    "config:write",
457                    collection,
458                    key,
459                    Some(err.to_string()),
460                    &evidence,
461                );
462                return Err(err);
463            }
464        };
465        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
466        if let Err(err) = self.prune_config_history(collection, key) {
467            let _ = self.emit_config_mutation_event(
468                crate::runtime::control_events::EventKind::ConfigWrite,
469                crate::runtime::control_events::Outcome::Error,
470                "config:write",
471                collection,
472                key,
473                Some(err.to_string()),
474                &evidence,
475            );
476            return Err(err);
477        }
478        self.invalidate_result_cache();
479        if let Err(err) = self.emit_config_mutation_event(
480            crate::runtime::control_events::EventKind::ConfigWrite,
481            crate::runtime::control_events::Outcome::Allowed,
482            "config:write",
483            collection,
484            key,
485            None,
486            &evidence,
487        ) {
488            let _ = self.inner.db.store().delete(collection, id);
489            self.invalidate_result_cache();
490            return Err(err);
491        }
492        // #712 / S5A: now that the write is durable and audited, push
493        // the new mode into the live AuthStore so subsequent IAM
494        // decisions honour it without waiting for a restart.
495        if is_enforcement_mode_config(collection, key) {
496            if let Some(auth_store) = self.inner.auth_store.read().clone() {
497                if let Value::Text(text) =
498                    &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
499                {
500                    if let Some(mode) =
501                        crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
502                    {
503                        auth_store.set_enforcement_mode(mode);
504                    }
505                }
506            }
507        }
508        Ok(config_write_output(
509            raw_query,
510            collection,
511            key,
512            version,
513            id,
514            value_type,
515            schema_version,
516            tags,
517            match op {
518                "rotate" => "config_rotate",
519                _ => "config_put",
520            },
521            1,
522        ))
523    }
524
525    fn config_delete_result(
526        &self,
527        raw_query: &str,
528        collection: &str,
529        key: &str,
530    ) -> RedDBResult<RuntimeQueryResult> {
531        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
532            ConfigMutationAuthz::Allowed(evidence) => evidence,
533            ConfigMutationAuthz::Denied { reason, evidence } => {
534                let _ = self.emit_config_mutation_event(
535                    crate::runtime::control_events::EventKind::ConfigDelete,
536                    crate::runtime::control_events::Outcome::Denied,
537                    "config:delete",
538                    collection,
539                    key,
540                    Some(reason.clone()),
541                    &evidence,
542                );
543                return Err(RedDBError::Query(reason));
544            }
545        };
546        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
547            let _ = self.emit_config_mutation_event(
548                crate::runtime::control_events::EventKind::ConfigDelete,
549                crate::runtime::control_events::Outcome::Error,
550                "config:delete",
551                collection,
552                key,
553                Some(err.to_string()),
554                &evidence,
555            );
556            return Err(err);
557        }
558        if let Err(err) = self.ensure_config_collection(collection) {
559            let _ = self.emit_config_mutation_event(
560                crate::runtime::control_events::EventKind::ConfigDelete,
561                crate::runtime::control_events::Outcome::Error,
562                "config:delete",
563                collection,
564                key,
565                Some(err.to_string()),
566                &evidence,
567            );
568            return Err(err);
569        }
570        let latest = match self.latest_config_version(collection, key) {
571            Ok(latest) => latest,
572            Err(err) => {
573                let _ = self.emit_config_mutation_event(
574                    crate::runtime::control_events::EventKind::ConfigDelete,
575                    crate::runtime::control_events::Outcome::Error,
576                    "config:delete",
577                    collection,
578                    key,
579                    Some(err.to_string()),
580                    &evidence,
581                );
582                return Err(err);
583            }
584        };
585        evidence.payload = latest.as_ref().map(|version| version.value.clone());
586        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
587        let value_type = latest.as_ref().and_then(|version| version.value_type);
588        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
589        let id = match self.append_config_version(
590            collection,
591            key,
592            Value::Null,
593            version,
594            true,
595            "delete",
596            value_type,
597            schema_version,
598            &[],
599        ) {
600            Ok(id) => id,
601            Err(err) => {
602                let _ = self.emit_config_mutation_event(
603                    crate::runtime::control_events::EventKind::ConfigDelete,
604                    crate::runtime::control_events::Outcome::Error,
605                    "config:delete",
606                    collection,
607                    key,
608                    Some(err.to_string()),
609                    &evidence,
610                );
611                return Err(err);
612            }
613        };
614        if let Some(before) = latest.as_ref().and_then(|version| {
615            if version.tombstone {
616                None
617            } else {
618                Some(crate::presentation::entity_json::storage_value_to_json(
619                    &version.value,
620                ))
621            }
622        }) {
623            self.record_kv_watch_event(
624                crate::replication::cdc::ChangeOperation::Delete,
625                collection,
626                key,
627                id.raw(),
628                Some(before),
629                None,
630            );
631        }
632        if let Err(err) = self.prune_config_history(collection, key) {
633            let _ = self.emit_config_mutation_event(
634                crate::runtime::control_events::EventKind::ConfigDelete,
635                crate::runtime::control_events::Outcome::Error,
636                "config:delete",
637                collection,
638                key,
639                Some(err.to_string()),
640                &evidence,
641            );
642            return Err(err);
643        }
644        self.invalidate_result_cache();
645        if let Err(err) = self.emit_config_mutation_event(
646            crate::runtime::control_events::EventKind::ConfigDelete,
647            crate::runtime::control_events::Outcome::Allowed,
648            "config:delete",
649            collection,
650            key,
651            None,
652            &evidence,
653        ) {
654            let _ = self.inner.db.store().delete(collection, id);
655            self.invalidate_result_cache();
656            return Err(err);
657        }
658        Ok(config_write_output(
659            raw_query,
660            collection,
661            key,
662            version,
663            id,
664            value_type,
665            schema_version,
666            &[],
667            "delete",
668            1,
669        ))
670    }
671
672    fn config_get_result(
673        &self,
674        raw_query: &str,
675        collection: &str,
676        key: &str,
677    ) -> RedDBResult<RuntimeQueryResult> {
678        self.check_system_config_capability("config:read", collection, key)
679            .map_err(RedDBError::Query)?;
680        let latest = self.latest_config_version(collection, key)?;
681        let mut result = UnifiedResult::with_columns(vec![
682            "collection".into(),
683            "key".into(),
684            "value".into(),
685            "version".into(),
686            "value_type".into(),
687            "schema_version".into(),
688            "tags".into(),
689            "tombstone".into(),
690        ]);
691        let mut record = UnifiedRecord::new();
692        record.set("collection", Value::text(collection.to_string()));
693        record.set("key", Value::text(key.to_string()));
694        if let Some(version) = latest {
695            record.set("value", version.value);
696            record.set("version", Value::Integer(version.version));
697            record.set("value_type", config_value_type_value(version.value_type));
698            record.set(
699                "schema_version",
700                version
701                    .schema_version
702                    .map(Value::Integer)
703                    .unwrap_or(Value::Null),
704            );
705            record.set("tags", config_tags_value(&version.tags));
706            record.set("tombstone", Value::Boolean(version.tombstone));
707        } else {
708            record.set("value", Value::Null);
709            record.set("version", Value::Null);
710            record.set("value_type", Value::Null);
711            record.set("schema_version", Value::Null);
712            record.set("tags", Value::Null);
713            record.set("tombstone", Value::Boolean(false));
714        }
715        result.push(record);
716        Ok(RuntimeQueryResult {
717            query: raw_query.to_string(),
718            mode: crate::storage::query::modes::QueryMode::Sql,
719            statement: "config_get",
720            engine: "config",
721            result,
722            affected_rows: 0,
723            statement_type: "select",
724            bookmark: None,
725        })
726    }
727
728    fn config_history_result(
729        &self,
730        raw_query: &str,
731        collection: &str,
732        key: &str,
733    ) -> RedDBResult<RuntimeQueryResult> {
734        self.check_system_config_capability("config:read", collection, key)
735            .map_err(RedDBError::Query)?;
736        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
737        let mut result = UnifiedResult::with_columns(vec![
738            "collection".into(),
739            "key".into(),
740            "version".into(),
741            "value".into(),
742            "value_type".into(),
743            "schema_version".into(),
744            "tags".into(),
745            "tombstone".into(),
746            "op".into(),
747            "created_at_ms".into(),
748        ]);
749        for version in versions {
750            let mut record = UnifiedRecord::new();
751            record.set("collection", Value::text(collection.to_string()));
752            record.set("key", Value::text(key.to_string()));
753            record.set("version", Value::Integer(version.version));
754            record.set("value", version.value);
755            record.set("value_type", config_value_type_value(version.value_type));
756            record.set(
757                "schema_version",
758                version
759                    .schema_version
760                    .map(Value::Integer)
761                    .unwrap_or(Value::Null),
762            );
763            record.set("tags", Value::Null);
764            record.set("tombstone", Value::Boolean(version.tombstone));
765            record.set("op", Value::text(version.op));
766            record.set("created_at_ms", Value::Integer(version.created_at_ms));
767            result.push(record);
768        }
769        Ok(RuntimeQueryResult {
770            query: raw_query.to_string(),
771            mode: crate::storage::query::modes::QueryMode::Sql,
772            statement: "config_history",
773            engine: "config",
774            result,
775            affected_rows: 0,
776            statement_type: "select",
777            bookmark: None,
778        })
779    }
780
781    fn config_list_result(
782        &self,
783        raw_query: &str,
784        collection: &str,
785        prefix: Option<&str>,
786        limit: Option<usize>,
787        offset: usize,
788    ) -> RedDBResult<RuntimeQueryResult> {
789        let mut versions = self.latest_config_versions(collection, prefix)?;
790        versions.sort_by(|left, right| left.key.cmp(&right.key));
791        let mut result = UnifiedResult::with_columns(vec![
792            "collection".into(),
793            "key".into(),
794            "value".into(),
795            "version".into(),
796            "value_type".into(),
797            "schema_version".into(),
798            "tags".into(),
799            "tombstone".into(),
800            "op".into(),
801            "created_at_ms".into(),
802        ]);
803        for version in versions
804            .into_iter()
805            .filter(|version| {
806                self.check_config_capability("config:read", collection, &version.key)
807                    .is_ok()
808            })
809            .skip(offset)
810            .take(limit.unwrap_or(usize::MAX))
811        {
812            let mut record = UnifiedRecord::new();
813            record.set("collection", Value::text(collection.to_string()));
814            record.set("key", Value::text(version.key));
815            record.set("value", version.value);
816            record.set("version", Value::Integer(version.version));
817            record.set("value_type", config_value_type_value(version.value_type));
818            record.set(
819                "schema_version",
820                version
821                    .schema_version
822                    .map(Value::Integer)
823                    .unwrap_or(Value::Null),
824            );
825            record.set("tags", config_tags_value(&version.tags));
826            record.set("tombstone", Value::Boolean(version.tombstone));
827            record.set("op", Value::text(version.op));
828            record.set("created_at_ms", Value::Integer(version.created_at_ms));
829            result.push(record);
830        }
831        Ok(RuntimeQueryResult {
832            query: raw_query.to_string(),
833            mode: crate::storage::query::modes::QueryMode::Sql,
834            statement: "config_list",
835            engine: "config",
836            result,
837            affected_rows: 0,
838            statement_type: "select",
839            bookmark: None,
840        })
841    }
842
843    fn config_watch_result(
844        &self,
845        raw_query: &str,
846        collection: &str,
847        key: &str,
848        prefix: bool,
849        from_lsn: Option<u64>,
850    ) -> RedDBResult<RuntimeQueryResult> {
851        let watch_key = if prefix {
852            format!("{key}.*")
853        } else {
854            key.to_string()
855        };
856        let endpoint = match from_lsn {
857            Some(lsn) => {
858                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
859            }
860            None => format!("/collections/{collection}/config/{watch_key}/watch"),
861        };
862        let mut result = UnifiedResult::with_columns(vec![
863            "collection".into(),
864            "key".into(),
865            "prefix".into(),
866            "from_lsn".into(),
867            "watch_url".into(),
868            "streaming".into(),
869        ]);
870        let mut record = UnifiedRecord::new();
871        record.set("collection", Value::text(collection.to_string()));
872        record.set("key", Value::text(watch_key));
873        record.set("prefix", Value::Boolean(prefix));
874        record.set(
875            "from_lsn",
876            from_lsn
877                .map(Value::UnsignedInteger)
878                .unwrap_or(crate::storage::schema::Value::Null),
879        );
880        record.set("watch_url", Value::text(endpoint));
881        record.set("streaming", Value::Boolean(true));
882        result.push(record);
883        Ok(RuntimeQueryResult {
884            query: raw_query.to_string(),
885            mode: crate::storage::query::modes::QueryMode::Sql,
886            statement: "config_watch",
887            engine: "config",
888            result,
889            affected_rows: 0,
890            statement_type: "stream",
891            bookmark: None,
892        })
893    }
894
895    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
896        let store = self.inner.db.store();
897        if store.get_collection(collection).is_none() {
898            store
899                .create_collection(collection)
900                .map_err(|err| RedDBError::Internal(err.to_string()))?;
901        }
902        if let Some(contract) = self.inner.db.collection_contract(collection) {
903            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
904                CollectionModel::Config,
905                contract.declared_model,
906            )?;
907            return Ok(());
908        }
909        let now = current_unix_ms();
910        self.inner
911            .db
912            .save_collection_contract(CollectionContract {
913                name: collection.to_string(),
914                declared_model: CollectionModel::Config,
915                schema_mode: SchemaMode::Dynamic,
916                origin: ContractOrigin::Explicit,
917                version: 1,
918                created_at_unix_ms: now as u128,
919                updated_at_unix_ms: now as u128,
920                default_ttl_ms: None,
921                vector_dimension: None,
922                vector_metric: None,
923                context_index_fields: Vec::new(),
924                declared_columns: Vec::new(),
925                table_def: None,
926                timestamps_enabled: false,
927                context_index_enabled: false,
928                metrics_raw_retention_ms: None,
929                metrics_rollup_policies: Vec::new(),
930                metrics_tenant_identity: None,
931                metrics_namespace: None,
932                append_only: false,
933                subscriptions: Vec::new(),
934                analytics_config: Vec::new(),
935                session_key: None,
936                session_gap_ms: None,
937                retention_duration_ms: None,
938            })
939            .map(|_| ())
940            .map_err(|err| RedDBError::Internal(err.to_string()))
941    }
942
943    fn append_config_version(
944        &self,
945        collection: &str,
946        key: &str,
947        value: Value,
948        version: i64,
949        tombstone: bool,
950        op: &str,
951        value_type: Option<ConfigValueType>,
952        schema_version: Option<i64>,
953        tags: &[String],
954    ) -> RedDBResult<EntityId> {
955        let now = current_unix_ms() as i64;
956        let fields = vec![
957            ("key".to_string(), Value::text(key.to_string())),
958            ("value".to_string(), value),
959            ("version".to_string(), Value::Integer(version)),
960            (
961                "value_type".to_string(),
962                config_value_type_value(value_type),
963            ),
964            (
965                "schema_version".to_string(),
966                schema_version.map(Value::Integer).unwrap_or(Value::Null),
967            ),
968            ("tombstone".to_string(), Value::Boolean(tombstone)),
969            ("op".to_string(), Value::text(op.to_string())),
970            ("created_at_ms".to_string(), Value::Integer(now)),
971            ("tags".to_string(), config_tags_value(tags)),
972        ];
973        let mut row = RowData::new(Vec::new());
974        row.named = Some(fields.into_iter().collect());
975        let entity = UnifiedEntity::new(
976            EntityId::new(0),
977            EntityKind::TableRow {
978                table: Arc::from(collection),
979                row_id: 0,
980            },
981            EntityData::Row(row),
982        );
983        self.inner
984            .db
985            .store()
986            .insert(collection, entity)
987            .map_err(|err| RedDBError::Internal(err.to_string()))
988    }
989
990    fn latest_config_version(
991        &self,
992        collection: &str,
993        key: &str,
994    ) -> RedDBResult<Option<ConfigVersion>> {
995        Ok(super::keyed_spine::latest_version(
996            self.config_versions(collection, key)?,
997        ))
998    }
999
1000    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
1001        let store = self.inner.db.store();
1002        let Some(manager) = store.get_collection(collection) else {
1003            return Ok(Vec::new());
1004        };
1005        let mut versions = Vec::new();
1006        for entity in manager.query_all(|_| true) {
1007            let EntityData::Row(row) = &entity.data else {
1008                continue;
1009            };
1010            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1011                continue;
1012            };
1013            if version.key != key {
1014                continue;
1015            }
1016            versions.push(ConfigVersion::from_keyed_row(version, row));
1017        }
1018        Ok(versions)
1019    }
1020
1021    fn latest_config_versions(
1022        &self,
1023        collection: &str,
1024        prefix: Option<&str>,
1025    ) -> RedDBResult<Vec<ConfigVersion>> {
1026        let store = self.inner.db.store();
1027        let Some(manager) = store.get_collection(collection) else {
1028            return Ok(Vec::new());
1029        };
1030        let mut versions = Vec::new();
1031        for entity in manager.query_all(|_| true) {
1032            let EntityData::Row(row) = &entity.data else {
1033                continue;
1034            };
1035            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1036                continue;
1037            };
1038            versions.push(ConfigVersion::from_keyed_row(version, row));
1039        }
1040        Ok(super::keyed_spine::latest_versions(versions, prefix))
1041    }
1042
1043    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1044        let mut versions = self.config_versions(collection, key)?;
1045        if versions.len() <= CONFIG_HISTORY_LIMIT {
1046            return Ok(());
1047        }
1048        versions = super::keyed_spine::history_versions(versions);
1049        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1050        let store = self.inner.db.store();
1051        for version in versions.into_iter().take(drop_count) {
1052            store
1053                .delete(collection, version.id)
1054                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1055        }
1056        Ok(())
1057    }
1058
1059    fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1060        let default_evidence = self.default_config_mutation_evidence(collection, key);
1061        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1062            return ConfigMutationAuthz::Allowed(default_evidence);
1063        };
1064        if !auth_store.iam_authorization_enabled() {
1065            return ConfigMutationAuthz::Allowed(default_evidence);
1066        }
1067        let Some((principal, role)) = current_auth_identity() else {
1068            return ConfigMutationAuthz::Denied {
1069                reason:
1070                    "IAM authorization is enabled; config capability check requires an authenticated principal"
1071                        .to_string(),
1072                evidence: default_evidence,
1073            };
1074        };
1075        let tenant = current_tenant();
1076        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1077        let ctx = crate::auth::policies::EvalContext {
1078            principal_tenant: tenant.clone(),
1079            current_tenant: tenant.clone(),
1080            peer_ip: None,
1081            mfa_present: false,
1082            now_ms: crate::utils::now_unix_millis() as u128,
1083            principal_is_admin_role: role == crate::auth::Role::Admin,
1084            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1085            principal_is_platform_scoped: principal_id.tenant.is_none(),
1086        };
1087        let managed_key = if collection == "red.config" {
1088            format!("red.config.{key}")
1089        } else {
1090            key.to_string()
1091        };
1092        let gate = crate::auth::managed_config::ManagedConfigGate::new(
1093            self.inner.config_registry.as_ref(),
1094        );
1095        match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1096            crate::auth::managed_config::ManagedConfigDecision::Allow {
1097                entry_id,
1098                resource_type,
1099                managed,
1100                mutability,
1101                matched_action,
1102                matched_resource,
1103                ..
1104            } => {
1105                return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1106                    id: entry_id,
1107                    resource_type,
1108                    managed,
1109                    mutability,
1110                    matched_action: Some(matched_action),
1111                    matched_resource: Some(matched_resource),
1112                    payload: None,
1113                });
1114            }
1115            crate::auth::managed_config::ManagedConfigDecision::Deny {
1116                entry_id,
1117                resource_type,
1118                managed,
1119                mutability,
1120                matched_action,
1121                matched_resource,
1122                reason,
1123                ..
1124            } => {
1125                return ConfigMutationAuthz::Denied {
1126                    reason: format!(
1127                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1128                    ),
1129                    evidence: ConfigMutationEvidence {
1130                        id: entry_id,
1131                        resource_type,
1132                        managed,
1133                        mutability,
1134                        matched_action: Some(matched_action),
1135                        matched_resource: Some(matched_resource),
1136                        payload: None,
1137                    },
1138                };
1139            }
1140            crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1141        }
1142
1143        let mut resource = crate::auth::policies::ResourceRef::new(
1144            "config",
1145            config_target_resource(collection, key),
1146        );
1147        if let Some(ref tenant) = tenant {
1148            resource = resource.with_tenant(tenant.clone());
1149        }
1150        if auth_store.check_policy_authz_with_role(
1151            &principal_id,
1152            "config:write",
1153            &resource,
1154            &ctx,
1155            role,
1156        ) {
1157            ConfigMutationAuthz::Allowed(default_evidence)
1158        } else {
1159            ConfigMutationAuthz::Denied {
1160                reason: format!(
1161                    "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1162                    principal,
1163                    config_target_resource(collection, key)
1164                ),
1165                evidence: default_evidence,
1166            }
1167        }
1168    }
1169
1170    fn default_config_mutation_evidence(
1171        &self,
1172        collection: &str,
1173        key: &str,
1174    ) -> ConfigMutationEvidence {
1175        let id = if collection == "red.config" {
1176            format!("red.config.{key}")
1177        } else {
1178            key.to_string()
1179        };
1180        ConfigMutationEvidence {
1181            id,
1182            resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1183            managed: false,
1184            mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1185            matched_action: None,
1186            matched_resource: None,
1187            payload: None,
1188        }
1189    }
1190
1191    fn emit_config_mutation_event(
1192        &self,
1193        kind: crate::runtime::control_events::EventKind,
1194        outcome: crate::runtime::control_events::Outcome,
1195        action: &'static str,
1196        collection: &str,
1197        key: &str,
1198        reason: Option<String>,
1199        evidence: &ConfigMutationEvidence,
1200    ) -> RedDBResult<()> {
1201        use crate::runtime::control_events::{
1202            ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1203        };
1204
1205        let tenant = current_tenant();
1206        let principal = current_auth_identity();
1207        let actor_user = principal
1208            .as_ref()
1209            .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1210        let actor = actor_user
1211            .as_ref()
1212            .map(ActorRef::User)
1213            .unwrap_or(ActorRef::Anonymous);
1214        let ctx = ControlEventCtx {
1215            actor,
1216            scope: tenant
1217                .as_ref()
1218                .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1219            request_id: Some(std::borrow::Cow::Owned(format!(
1220                "conn-{}",
1221                current_connection_id()
1222            ))),
1223            trace_id: None,
1224        };
1225
1226        let mut fields = HashMap::new();
1227        fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1228        fields.insert(
1229            "resource_type".to_string(),
1230            Sensitivity::raw(evidence.resource_type.clone()),
1231        );
1232        fields.insert(
1233            "managed".to_string(),
1234            Sensitivity::raw(evidence.managed.to_string()),
1235        );
1236        fields.insert(
1237            "mutability".to_string(),
1238            Sensitivity::raw(config_mutability_label(evidence.mutability)),
1239        );
1240        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1241        fields.insert("key".to_string(), Sensitivity::raw(key));
1242        fields.insert(
1243            "connection_id".to_string(),
1244            Sensitivity::raw(current_connection_id().to_string()),
1245        );
1246        if let Some((_, role)) = principal {
1247            fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1248        }
1249        if let Some(matched_action) = &evidence.matched_action {
1250            fields.insert(
1251                "matched_action".to_string(),
1252                Sensitivity::raw(matched_action.clone()),
1253            );
1254        }
1255        if let Some(matched_resource) = &evidence.matched_resource {
1256            fields.insert(
1257                "matched_resource".to_string(),
1258                Sensitivity::raw(matched_resource.clone()),
1259            );
1260        }
1261        if let Some(payload) = &evidence.payload {
1262            fields.insert(
1263                "payload".to_string(),
1264                config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1265            );
1266        }
1267
1268        let event = ControlEvent {
1269            kind,
1270            outcome,
1271            action: std::borrow::Cow::Borrowed(action),
1272            resource: Some(format!(
1273                "config:{}",
1274                config_target_resource(collection, key)
1275            )),
1276            reason,
1277            matched_policy_id: None,
1278            fields,
1279        };
1280        let ledger = self.inner.control_event_ledger.read();
1281        match ledger.emit(&ctx, event) {
1282            Ok(_) => Ok(()),
1283            Err(err) if self.inner.control_event_config.require_persistence() => {
1284                Err(RedDBError::Internal(err.to_string()))
1285            }
1286            Err(_) => Ok(()),
1287        }
1288    }
1289
1290    fn check_config_capability(
1291        &self,
1292        action: &str,
1293        collection: &str,
1294        key: &str,
1295    ) -> Result<(), String> {
1296        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1297            return Ok(());
1298        };
1299        if !auth_store.iam_authorization_enabled() {
1300            return Ok(());
1301        }
1302        let Some((principal, role)) = current_auth_identity() else {
1303            return Err(
1304                "IAM authorization is enabled; config capability check requires an authenticated principal"
1305                    .to_string(),
1306            );
1307        };
1308        let tenant = current_tenant();
1309        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1310        let mut resource = crate::auth::policies::ResourceRef::new(
1311            "config",
1312            config_target_resource(collection, key),
1313        );
1314        if let Some(ref tenant) = tenant {
1315            resource = resource.with_tenant(tenant.clone());
1316        }
1317        let ctx = crate::auth::policies::EvalContext {
1318            principal_tenant: tenant.clone(),
1319            current_tenant: tenant,
1320            peer_ip: None,
1321            mfa_present: false,
1322            now_ms: crate::utils::now_unix_millis() as u128,
1323            principal_is_admin_role: role == crate::auth::Role::Admin,
1324            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1325            principal_is_platform_scoped: principal_id.tenant.is_none(),
1326        };
1327        if action == "config:write" {
1328            let managed_key = if collection == "red.config" {
1329                format!("red.config.{key}")
1330            } else {
1331                key.to_string()
1332            };
1333            let gate = crate::auth::managed_config::ManagedConfigGate::new(
1334                self.inner.config_registry.as_ref(),
1335            );
1336            match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1337                crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1338                crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1339                crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1340                    return Err(format!(
1341                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1342                    ));
1343                }
1344            }
1345        }
1346        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1347            Ok(())
1348        } else {
1349            Err(format!(
1350                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1351                principal,
1352                action,
1353                config_target_resource(collection, key)
1354            ))
1355        }
1356    }
1357
1358    fn check_system_config_capability(
1359        &self,
1360        action: &str,
1361        collection: &str,
1362        key: &str,
1363    ) -> Result<(), String> {
1364        if collection != "red.config" {
1365            return Ok(());
1366        }
1367        self.check_config_capability(action, collection, key)
1368    }
1369
1370    pub fn config_watch_events_since(
1371        &self,
1372        collection: &str,
1373        key: &str,
1374        since_lsn: u64,
1375        max_count: usize,
1376    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1377        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1378            .into_iter()
1379            .map(|event| self.policy_filter_config_watch_event(event))
1380            .collect()
1381    }
1382
1383    pub fn config_watch_events_since_prefix(
1384        &self,
1385        collection: &str,
1386        prefix: &str,
1387        since_lsn: u64,
1388        max_count: usize,
1389    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1390        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1391            .into_iter()
1392            .map(|event| self.policy_filter_config_watch_event(event))
1393            .collect()
1394    }
1395
1396    fn policy_filter_config_watch_event(
1397        &self,
1398        mut event: crate::replication::cdc::KvWatchEvent,
1399    ) -> crate::replication::cdc::KvWatchEvent {
1400        if self
1401            .check_config_capability("config:read", &event.collection, &event.key)
1402            .is_err()
1403        {
1404            event.before = None;
1405            event.after = None;
1406        }
1407        event
1408    }
1409
1410    /// `SecretRefGuard` write-side enforcement. When the inbound config
1411    /// value is a `secret_ref`, peek the vault target without auditing and
1412    /// reject the write if the target already resolves to another
1413    /// `secret_ref` (depth ≥ 2 or a self/mutual cycle). Returns `Some(err)`
1414    /// when the write must be refused; `None` otherwise.
1415    fn secret_ref_guard_write_check(
1416        &self,
1417        collection: &str,
1418        key: &str,
1419        value: &Value,
1420    ) -> Option<RedDBError> {
1421        if !value_looks_like_secret_ref(value) {
1422            return None;
1423        }
1424        let Ok(secret_ref) = parse_config_secret_ref(value) else {
1425            return None;
1426        };
1427        let unsealed = match self.peek_vault_unsealed(&secret_ref.collection, &secret_ref.key) {
1428            Ok(Some(value)) => value,
1429            Ok(None) => return None,
1430            Err(_) => return None,
1431        };
1432        if value_looks_like_secret_ref(&unsealed) {
1433            return Some(secret_ref_chain_error(
1434                collection,
1435                key,
1436                &secret_ref.collection,
1437                &secret_ref.key,
1438            ));
1439        }
1440        None
1441    }
1442
1443    fn audit_config_resolve(
1444        &self,
1445        collection: &str,
1446        key: &str,
1447        secret_ref: Option<&ConfigSecretRef>,
1448        outcome: crate::runtime::audit_log::Outcome,
1449        reason: &str,
1450    ) {
1451        let actor = current_auth_identity()
1452            .map(|(principal, _)| principal)
1453            .unwrap_or_else(|| "anonymous".to_string());
1454        let request_id = match current_connection_id() {
1455            0 => "embedded".to_string(),
1456            id => format!("conn-{id}"),
1457        };
1458        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1459            .principal(actor.clone())
1460            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1461            .resource(format!(
1462                "config:{}",
1463                config_target_resource(collection, key)
1464            ))
1465            .outcome(outcome)
1466            .correlation_id(request_id.clone())
1467            .fields([
1468                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1469                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1470                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1471                crate::runtime::audit_log::AuditFieldEscaper::field(
1472                    "target",
1473                    config_target_resource(collection, key),
1474                ),
1475                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1476                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1477                crate::runtime::audit_log::AuditFieldEscaper::field(
1478                    "connection_id",
1479                    current_connection_id(),
1480                ),
1481            ]);
1482        if let Some(tenant) = current_tenant() {
1483            builder = builder.tenant(tenant);
1484        }
1485        if let Some(secret_ref) = secret_ref {
1486            builder = builder.fields([
1487                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1488                crate::runtime::audit_log::AuditFieldEscaper::field(
1489                    "resolved_collection",
1490                    secret_ref.collection.as_str(),
1491                ),
1492                crate::runtime::audit_log::AuditFieldEscaper::field(
1493                    "resolved_key",
1494                    secret_ref.key.as_str(),
1495                ),
1496                crate::runtime::audit_log::AuditFieldEscaper::field(
1497                    "resolved_target",
1498                    format!("{}.{}", secret_ref.collection, secret_ref.key),
1499                ),
1500            ]);
1501        }
1502        self.audit_log().record_event(builder.build());
1503    }
1504}
1505
1506/// `SecretRefGuard` — returns `true` when the storage value carries the
1507/// canonical `secret_ref` JSON shape (`{"type":"secret_ref", …}`). Drives
1508/// the depth-1 invariant: a value that looks like a secret_ref is treated
1509/// as indirection, never as terminal plaintext.
1510fn value_looks_like_secret_ref(value: &Value) -> bool {
1511    let Value::Json(bytes) = value else {
1512        return false;
1513    };
1514    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1515        return false;
1516    };
1517    let Some(object) = json.as_object() else {
1518        return false;
1519    };
1520    object.get("type").and_then(|value| value.as_str()) == Some("secret_ref")
1521}
1522
1523fn secret_ref_chain_error(
1524    source_collection: &str,
1525    source_key: &str,
1526    target_collection: &str,
1527    target_key: &str,
1528) -> RedDBError {
1529    RedDBError::InvalidConfig(format!(
1530        "secret_ref chain rejected: config `{source_collection}.{source_key}` points at vault `{target_collection}.{target_key}` which is itself a secret_ref; depth-1 invariant requires the target to resolve to a non-secret_ref value"
1531    ))
1532}
1533
1534fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1535    let Value::Json(bytes) = value else {
1536        return Err(RedDBError::InvalidConfig(
1537            "CONFIG value is not a SecretRef".to_string(),
1538        ));
1539    };
1540    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1541        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1542    })?;
1543    let Some(object) = json.as_object() else {
1544        return Err(RedDBError::InvalidConfig(
1545            "CONFIG SecretRef must be an object".to_string(),
1546        ));
1547    };
1548    let get_str = |field: &str| -> RedDBResult<&str> {
1549        object
1550            .get(field)
1551            .and_then(|value| value.as_str())
1552            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1553    };
1554    if get_str("type")? != "secret_ref" {
1555        return Err(RedDBError::InvalidConfig(
1556            "CONFIG value is not a SecretRef".to_string(),
1557        ));
1558    }
1559    if get_str("store")? != "vault" {
1560        return Err(RedDBError::InvalidConfig(
1561            "CONFIG SecretRef store is unsupported".to_string(),
1562        ));
1563    }
1564    Ok(ConfigSecretRef {
1565        collection: get_str("collection")?.to_string(),
1566        key: get_str("key")?.to_string(),
1567    })
1568}
1569
1570fn config_target_resource(collection: &str, key: &str) -> String {
1571    if collection == "red.config" {
1572        format!("red.config/{}", key.to_ascii_lowercase())
1573    } else {
1574        format!("{collection}.{key}")
1575    }
1576}
1577
1578fn config_write_output(
1579    raw_query: &str,
1580    collection: &str,
1581    key: &str,
1582    version: i64,
1583    id: EntityId,
1584    value_type: Option<ConfigValueType>,
1585    schema_version: Option<i64>,
1586    tags: &[String],
1587    statement: &'static str,
1588    affected_rows: u64,
1589) -> RuntimeQueryResult {
1590    let mut result = UnifiedResult::with_columns(vec![
1591        "ok".into(),
1592        "collection".into(),
1593        "key".into(),
1594        "version".into(),
1595        "value_type".into(),
1596        "schema_version".into(),
1597        "tags".into(),
1598        "id".into(),
1599    ]);
1600    let mut record = UnifiedRecord::new();
1601    record.set("ok", Value::Boolean(true));
1602    record.set("collection", Value::text(collection.to_string()));
1603    record.set("key", Value::text(key.to_string()));
1604    record.set("version", Value::Integer(version));
1605    record.set("value_type", config_value_type_value(value_type));
1606    record.set(
1607        "schema_version",
1608        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1609    );
1610    record.set("tags", config_tags_value(tags));
1611    record.set("id", Value::Integer(id.raw() as i64));
1612    result.push(record);
1613    RuntimeQueryResult {
1614        query: raw_query.to_string(),
1615        mode: crate::storage::query::modes::QueryMode::Sql,
1616        statement,
1617        engine: "config",
1618        result,
1619        affected_rows,
1620        statement_type: if statement == "delete" {
1621            "delete"
1622        } else {
1623            "update"
1624        },
1625        bookmark: None,
1626    }
1627}
1628
1629fn invalid_config_volatility(operation: &str) -> RedDBError {
1630    RedDBError::InvalidOperation(format!(
1631        "CONFIG does not support KV-only volatility operation {operation}"
1632    ))
1633}
1634
1635fn resolve_config_schema(
1636    latest: Option<&ConfigVersion>,
1637    requested_type: Option<ConfigValueType>,
1638) -> (Option<ConfigValueType>, Option<i64>) {
1639    let previous_type = latest.and_then(|version| version.value_type);
1640    let previous_schema_version = latest.and_then(|version| version.schema_version);
1641    match requested_type {
1642        Some(value_type) if Some(value_type) != previous_type => (
1643            Some(value_type),
1644            Some(previous_schema_version.unwrap_or(0) + 1),
1645        ),
1646        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1647        None => (previous_type, previous_schema_version),
1648    }
1649}
1650
1651fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1652    let valid = match value_type {
1653        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1654        ConfigValueType::Int => matches!(
1655            value,
1656            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1657        ),
1658        ConfigValueType::String => matches!(value, Value::Text(_)),
1659        ConfigValueType::Url => validate_config_url(value),
1660        ConfigValueType::Object => validate_config_json_shape(value, true),
1661        ConfigValueType::Array => {
1662            matches!(value, Value::Array(_) | Value::Vector(_))
1663                || validate_config_json_shape(value, false)
1664        }
1665    };
1666    if valid {
1667        Ok(())
1668    } else {
1669        Err(RedDBError::InvalidConfig(format!(
1670            "CONFIG value type mismatch: expected {}, got {}",
1671            value_type.as_str(),
1672            config_actual_value_type(value),
1673        )))
1674    }
1675}
1676
1677/// `true` when `(collection, key)` addresses the policy enforcement
1678/// mode flag (#712 / S5A). The flag lives under `red.config` so the
1679/// rest of the config infrastructure (managed-config registry, audit,
1680/// history) governs it for free.
1681fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1682    collection == "red.config" && key == "policy.enforcement_mode"
1683}
1684
1685/// Reject any value the policy evaluator does not understand. Surfaced
1686/// as `InvalidConfig` so the SQL error path mirrors how other config
1687/// validation failures are reported.
1688fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1689    let text = match value {
1690        Value::Text(text) => text.as_ref(),
1691        _ => {
1692            return Err(RedDBError::InvalidConfig(format!(
1693                "config key `{}` must be a string ({} or {}); got {}",
1694                crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1695                crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1696                crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1697                config_actual_value_type(value),
1698            )));
1699        }
1700    };
1701    if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1702        Ok(())
1703    } else {
1704        Err(RedDBError::InvalidConfig(format!(
1705            "config key `{}` accepts only `{}` or `{}`, got `{}`",
1706            crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1707            crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1708            crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1709            text,
1710        )))
1711    }
1712}
1713
1714fn validate_config_url(value: &Value) -> bool {
1715    let url = match value {
1716        Value::Url(value) => value.as_str(),
1717        Value::Text(value) => value.as_ref(),
1718        _ => return false,
1719    };
1720    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1721}
1722
1723fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1724    let Value::Json(bytes) = value else {
1725        return false;
1726    };
1727    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1728        return false;
1729    };
1730    matches!(
1731        (object, json),
1732        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1733    )
1734}
1735
1736fn config_actual_value_type(value: &Value) -> &'static str {
1737    match value {
1738        Value::Null => "null",
1739        Value::Boolean(_) => "bool",
1740        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1741        Value::Text(_) => "string",
1742        Value::Url(_) => "url",
1743        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1744            Ok(crate::json::Value::Object(_)) => "object",
1745            Ok(crate::json::Value::Array(_)) => "array",
1746            _ => "json",
1747        },
1748        Value::Array(_) | Value::Vector(_) => "array",
1749        _ => "other",
1750    }
1751}
1752
1753fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1754    value_type
1755        .map(|value_type| Value::text(value_type.as_str()))
1756        .unwrap_or(Value::Null)
1757}
1758
1759fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1760    match value {
1761        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1762        _ => None,
1763    }
1764}
1765
1766fn config_tags_value(tags: &[String]) -> Value {
1767    if tags.is_empty() {
1768        return Value::Null;
1769    }
1770    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1771}
1772
1773fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1774    match value {
1775        Some(Value::Array(values)) => values
1776            .iter()
1777            .filter_map(|value| match value {
1778                Value::Text(tag) => Some(tag.to_string()),
1779                _ => None,
1780            })
1781            .collect(),
1782        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1783            .ok()
1784            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1785            .map(|values| {
1786                values
1787                    .into_iter()
1788                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1789                    .collect()
1790            })
1791            .unwrap_or_default(),
1792        _ => Vec::new(),
1793    }
1794}
1795
1796fn config_payload_sensitivity(
1797    resource_type: &str,
1798    field: &str,
1799    value: &Value,
1800) -> crate::runtime::control_events::Sensitivity {
1801    let payload = config_payload_bytes(value);
1802    if config_payload_raw_allowed(resource_type, field) {
1803        crate::runtime::control_events::Sensitivity::raw(
1804            String::from_utf8_lossy(&payload).into_owned(),
1805        )
1806    } else {
1807        crate::runtime::control_events::Sensitivity::hashed(&payload)
1808    }
1809}
1810
1811fn config_payload_bytes(value: &Value) -> Vec<u8> {
1812    let json = crate::presentation::entity_json::storage_value_to_json(value);
1813    crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1814}
1815
1816fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1817    const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1818    RAW_PAYLOAD_FIELDS
1819        .iter()
1820        .any(|(allowed_type, allowed_field)| {
1821            *allowed_type == resource_type && *allowed_field == field
1822        })
1823}
1824
1825fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1826    match mutability {
1827        crate::auth::registry::Mutability::Immutable => "immutable",
1828        crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1829    }
1830}
1831
1832fn current_unix_ms() -> u64 {
1833    std::time::SystemTime::now()
1834        .duration_since(std::time::UNIX_EPOCH)
1835        .unwrap_or_default()
1836        .as_millis() as u64
1837}