Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::catalog::{CollectionModel, SchemaMode};
7use crate::physical::{CollectionContract, ContractOrigin};
8use crate::storage::query::ast::ConfigValueType;
9use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
10
11use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
12use super::*;
13
14const CONFIG_HISTORY_LIMIT: usize = 16;
15
16#[derive(Clone)]
17struct ConfigVersion {
18    id: EntityId,
19    key: String,
20    version: i64,
21    value: Value,
22    tombstone: bool,
23    created_at_ms: i64,
24    op: String,
25    value_type: Option<ConfigValueType>,
26    schema_version: Option<i64>,
27    tags: Vec<String>,
28}
29
30impl super::keyed_spine::KeyedVersion for ConfigVersion {
31    fn key(&self) -> &str {
32        &self.key
33    }
34
35    fn version(&self) -> i64 {
36        self.version
37    }
38}
39
40impl ConfigVersion {
41    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
42        Self {
43            id: version.id,
44            key: version.key,
45            version: version.version,
46            value: version.value,
47            tombstone: version.tombstone,
48            created_at_ms: version.created_at_ms,
49            op: version.op,
50            value_type: row
51                .get_field("value_type")
52                .and_then(config_value_type_from_value),
53            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
54            tags: config_tags_from_value(row.get_field("tags")),
55        }
56    }
57}
58
59struct ConfigSecretRef {
60    collection: String,
61    key: String,
62}
63
64struct ConfigMutationEvidence {
65    id: String,
66    resource_type: String,
67    managed: bool,
68    mutability: crate::auth::registry::Mutability,
69    matched_action: Option<String>,
70    matched_resource: Option<String>,
71    payload: Option<Value>,
72}
73
74enum ConfigMutationAuthz {
75    Allowed(ConfigMutationEvidence),
76    Denied {
77        reason: String,
78        evidence: ConfigMutationEvidence,
79    },
80}
81
82impl RedDBRuntime {
83    pub fn execute_config_command(
84        &self,
85        raw_query: &str,
86        cmd: &crate::storage::query::ast::ConfigCommand,
87    ) -> RedDBResult<RuntimeQueryResult> {
88        use crate::storage::query::ast::ConfigCommand;
89
90        match cmd {
91            ConfigCommand::Put {
92                collection,
93                key,
94                value,
95                value_type,
96                tags,
97            } => self.config_write_result(
98                raw_query,
99                collection,
100                key,
101                value.clone(),
102                *value_type,
103                tags,
104                "put",
105            ),
106            ConfigCommand::Rotate {
107                collection,
108                key,
109                value,
110                value_type,
111                tags,
112            } => self.config_write_result(
113                raw_query,
114                collection,
115                key,
116                value.clone(),
117                *value_type,
118                tags,
119                "rotate",
120            ),
121            ConfigCommand::Get { collection, key } => {
122                self.config_get_result(raw_query, collection, key)
123            }
124            ConfigCommand::Resolve { collection, key } => {
125                self.config_resolve_result(raw_query, collection, key)
126            }
127            ConfigCommand::Delete { collection, key } => {
128                self.config_delete_result(raw_query, collection, key)
129            }
130            ConfigCommand::History { collection, key } => {
131                self.config_history_result(raw_query, collection, key)
132            }
133            ConfigCommand::List {
134                collection,
135                prefix,
136                limit,
137                offset,
138            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
139            ConfigCommand::Watch {
140                collection,
141                key,
142                prefix,
143                from_lsn,
144            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
145            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
146                Err(invalid_config_volatility(operation))
147            }
148        }
149    }
150
151    pub(crate) fn validate_config_command_before_auth(
152        &self,
153        cmd: &crate::storage::query::ast::ConfigCommand,
154    ) -> RedDBResult<()> {
155        use crate::storage::query::ast::ConfigCommand;
156        match cmd {
157            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
158                Err(invalid_config_volatility(operation))
159            }
160            ConfigCommand::Put { collection, .. }
161            | ConfigCommand::Get { collection, .. }
162            | ConfigCommand::Resolve { collection, .. }
163            | ConfigCommand::Rotate { collection, .. }
164            | ConfigCommand::Delete { collection, .. }
165            | ConfigCommand::History { collection, .. }
166            | ConfigCommand::List { collection, .. }
167            | ConfigCommand::Watch { collection, .. } => {
168                let snapshot = self.inner.db.catalog_model_snapshot();
169                let Some(actual_model) = snapshot
170                    .collections
171                    .iter()
172                    .find(|c| c.name == *collection)
173                    .map(|c| c.declared_model.unwrap_or(c.model))
174                else {
175                    return Ok(());
176                };
177                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
178                    CollectionModel::Config,
179                    actual_model,
180                )
181            }
182        }
183    }
184
185    fn config_resolve_result(
186        &self,
187        raw_query: &str,
188        collection: &str,
189        key: &str,
190    ) -> RedDBResult<RuntimeQueryResult> {
191        let latest = self.latest_config_version(collection, key)?;
192        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
193            self.audit_config_resolve(
194                collection,
195                key,
196                None,
197                crate::runtime::audit_log::Outcome::Denied,
198                &reason,
199            );
200            return Err(RedDBError::Query(reason));
201        }
202
203        let Some(version) = latest else {
204            let reason = "not_found";
205            self.audit_config_resolve(
206                collection,
207                key,
208                None,
209                crate::runtime::audit_log::Outcome::Denied,
210                reason,
211            );
212            return Err(RedDBError::NotFound(format!(
213                "config '{}.{}' not found",
214                collection, key
215            )));
216        };
217        if version.tombstone {
218            let reason = "deleted";
219            self.audit_config_resolve(
220                collection,
221                key,
222                None,
223                crate::runtime::audit_log::Outcome::Denied,
224                reason,
225            );
226            return Err(RedDBError::NotFound(format!(
227                "config '{}.{}' is deleted",
228                collection, key
229            )));
230        }
231
232        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
233            self.audit_config_resolve(
234                collection,
235                key,
236                None,
237                crate::runtime::audit_log::Outcome::Error,
238                &err.to_string(),
239            );
240        })?;
241
242        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
243            Ok(value) => {
244                if value_looks_like_secret_ref(&value) {
245                    let err = secret_ref_chain_error(
246                        collection,
247                        key,
248                        &secret_ref.collection,
249                        &secret_ref.key,
250                    );
251                    let reason = err.to_string();
252                    self.audit_config_resolve(
253                        collection,
254                        key,
255                        Some(&secret_ref),
256                        crate::runtime::audit_log::Outcome::Error,
257                        &reason,
258                    );
259                    return Err(err);
260                }
261                self.audit_config_resolve(
262                    collection,
263                    key,
264                    Some(&secret_ref),
265                    crate::runtime::audit_log::Outcome::Success,
266                    "ok",
267                );
268                let mut result = UnifiedResult::with_columns(vec![
269                    "collection".into(),
270                    "key".into(),
271                    "value".into(),
272                    "resolved_store".into(),
273                    "resolved_collection".into(),
274                    "resolved_key".into(),
275                ]);
276                let mut record = UnifiedRecord::new();
277                record.set("collection", Value::text(collection.to_string()));
278                record.set("key", Value::text(key.to_string()));
279                record.set("value", value);
280                record.set("resolved_store", Value::text("vault"));
281                record.set("resolved_collection", Value::text(secret_ref.collection));
282                record.set("resolved_key", Value::text(secret_ref.key));
283                result.push(record);
284                Ok(RuntimeQueryResult {
285                    query: raw_query.to_string(),
286                    mode: crate::storage::query::modes::QueryMode::Sql,
287                    statement: "config_resolve",
288                    engine: "config",
289                    result,
290                    affected_rows: 0,
291                    statement_type: "select",
292                    bookmark: None,
293                })
294            }
295            Err(err) => {
296                let reason = err.to_string();
297                let outcome = if reason.contains("denied") {
298                    crate::runtime::audit_log::Outcome::Denied
299                } else {
300                    crate::runtime::audit_log::Outcome::Error
301                };
302                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
303                Err(err)
304            }
305        }
306    }
307
308    fn config_write_result(
309        &self,
310        raw_query: &str,
311        collection: &str,
312        key: &str,
313        value: Value,
314        requested_type: Option<ConfigValueType>,
315        tags: &[String],
316        op: &str,
317    ) -> RedDBResult<RuntimeQueryResult> {
318        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
319            ConfigMutationAuthz::Allowed(evidence) => evidence,
320            ConfigMutationAuthz::Denied {
321                reason,
322                mut evidence,
323            } => {
324                evidence.payload = Some(value.clone());
325                let _ = self.emit_config_mutation_event(
326                    crate::runtime::control_events::EventKind::ConfigWrite,
327                    crate::runtime::control_events::Outcome::Denied,
328                    "config:write",
329                    collection,
330                    key,
331                    Some(reason.clone()),
332                    &evidence,
333                );
334                return Err(RedDBError::Query(reason));
335            }
336        };
337        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
338            let _ = self.emit_config_mutation_event(
339                crate::runtime::control_events::EventKind::ConfigWrite,
340                crate::runtime::control_events::Outcome::Error,
341                "config:write",
342                collection,
343                key,
344                Some(err.to_string()),
345                &evidence,
346            );
347            return Err(err);
348        }
349        // #712 / S5A: reject invalid values for the enforcement-mode
350        // config key before any storage mutation. The allowlist lives
351        // on PolicyEnforcementMode::parse so it stays in lockstep with
352        // the evaluator's understanding of the modes.
353        if is_enforcement_mode_config(collection, key) {
354            if let Err(err) = validate_enforcement_mode_value(&value) {
355                let _ = self.emit_config_mutation_event(
356                    crate::runtime::control_events::EventKind::ConfigWrite,
357                    crate::runtime::control_events::Outcome::Denied,
358                    "config:write",
359                    collection,
360                    key,
361                    Some(err.to_string()),
362                    &evidence,
363                );
364                return Err(err);
365            }
366        }
367        if let Err(err) = self.ensure_config_collection(collection) {
368            let _ = self.emit_config_mutation_event(
369                crate::runtime::control_events::EventKind::ConfigWrite,
370                crate::runtime::control_events::Outcome::Error,
371                "config:write",
372                collection,
373                key,
374                Some(err.to_string()),
375                &evidence,
376            );
377            return Err(err);
378        }
379        let latest = match self.latest_config_version(collection, key) {
380            Ok(latest) => latest,
381            Err(err) => {
382                let _ = self.emit_config_mutation_event(
383                    crate::runtime::control_events::EventKind::ConfigWrite,
384                    crate::runtime::control_events::Outcome::Error,
385                    "config:write",
386                    collection,
387                    key,
388                    Some(err.to_string()),
389                    &evidence,
390                );
391                return Err(err);
392            }
393        };
394        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
395        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
396        if let Some(value_type) = value_type {
397            if let Err(err) = validate_config_value_type(&value, value_type) {
398                let _ = self.emit_config_mutation_event(
399                    crate::runtime::control_events::EventKind::ConfigWrite,
400                    crate::runtime::control_events::Outcome::Error,
401                    "config:write",
402                    collection,
403                    key,
404                    Some(err.to_string()),
405                    &evidence,
406                );
407                return Err(err);
408            }
409        }
410        evidence.payload = Some(value.clone());
411        if let Some(reason) = self.secret_ref_guard_write_check(collection, key, &value) {
412            let _ = self.emit_config_mutation_event(
413                crate::runtime::control_events::EventKind::ConfigWrite,
414                crate::runtime::control_events::Outcome::Denied,
415                "config:write",
416                collection,
417                key,
418                Some(reason.to_string()),
419                &evidence,
420            );
421            return Err(reason);
422        }
423        let before = latest.as_ref().and_then(|version| {
424            if version.tombstone {
425                None
426            } else {
427                Some(crate::presentation::entity_json::storage_value_to_json(
428                    &version.value,
429                ))
430            }
431        });
432        let after = Some(crate::presentation::entity_json::storage_value_to_json(
433            &value,
434        ));
435        let change_op = if latest.is_some() {
436            crate::replication::cdc::ChangeOperation::Update
437        } else {
438            crate::replication::cdc::ChangeOperation::Insert
439        };
440        let id = match self.append_config_version(
441            collection,
442            key,
443            value,
444            version,
445            false,
446            op,
447            value_type,
448            schema_version,
449            tags,
450        ) {
451            Ok(id) => id,
452            Err(err) => {
453                let _ = self.emit_config_mutation_event(
454                    crate::runtime::control_events::EventKind::ConfigWrite,
455                    crate::runtime::control_events::Outcome::Error,
456                    "config:write",
457                    collection,
458                    key,
459                    Some(err.to_string()),
460                    &evidence,
461                );
462                return Err(err);
463            }
464        };
465        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
466        if let Err(err) = self.prune_config_history(collection, key) {
467            let _ = self.emit_config_mutation_event(
468                crate::runtime::control_events::EventKind::ConfigWrite,
469                crate::runtime::control_events::Outcome::Error,
470                "config:write",
471                collection,
472                key,
473                Some(err.to_string()),
474                &evidence,
475            );
476            return Err(err);
477        }
478        self.invalidate_result_cache();
479        if let Err(err) = self.emit_config_mutation_event(
480            crate::runtime::control_events::EventKind::ConfigWrite,
481            crate::runtime::control_events::Outcome::Allowed,
482            "config:write",
483            collection,
484            key,
485            None,
486            &evidence,
487        ) {
488            let _ = self.inner.db.store().delete(collection, id);
489            self.invalidate_result_cache();
490            return Err(err);
491        }
492        // #712 / S5A: now that the write is durable and audited, push
493        // the new mode into the live AuthStore so subsequent IAM
494        // decisions honour it without waiting for a restart.
495        if is_enforcement_mode_config(collection, key) {
496            if let Some(auth_store) = self.inner.auth_store.read().clone() {
497                if let Value::Text(text) =
498                    &evidence.payload.as_ref().cloned().unwrap_or(Value::Null)
499                {
500                    if let Some(mode) =
501                        crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text)
502                    {
503                        auth_store.set_enforcement_mode(mode);
504                    }
505                }
506            }
507        }
508        Ok(config_write_output(
509            raw_query,
510            collection,
511            key,
512            version,
513            id,
514            value_type,
515            schema_version,
516            tags,
517            match op {
518                "rotate" => "config_rotate",
519                _ => "config_put",
520            },
521            1,
522        ))
523    }
524
525    fn config_delete_result(
526        &self,
527        raw_query: &str,
528        collection: &str,
529        key: &str,
530    ) -> RedDBResult<RuntimeQueryResult> {
531        let mut evidence = match self.authorize_config_write_for_event(collection, key) {
532            ConfigMutationAuthz::Allowed(evidence) => evidence,
533            ConfigMutationAuthz::Denied { reason, evidence } => {
534                let _ = self.emit_config_mutation_event(
535                    crate::runtime::control_events::EventKind::ConfigDelete,
536                    crate::runtime::control_events::Outcome::Denied,
537                    "config:delete",
538                    collection,
539                    key,
540                    Some(reason.clone()),
541                    &evidence,
542                );
543                return Err(RedDBError::Query(reason));
544            }
545        };
546        if let Err(err) = self.check_write(crate::runtime::write_gate::WriteKind::Dml) {
547            let _ = self.emit_config_mutation_event(
548                crate::runtime::control_events::EventKind::ConfigDelete,
549                crate::runtime::control_events::Outcome::Error,
550                "config:delete",
551                collection,
552                key,
553                Some(err.to_string()),
554                &evidence,
555            );
556            return Err(err);
557        }
558        if let Err(err) = self.ensure_config_collection(collection) {
559            let _ = self.emit_config_mutation_event(
560                crate::runtime::control_events::EventKind::ConfigDelete,
561                crate::runtime::control_events::Outcome::Error,
562                "config:delete",
563                collection,
564                key,
565                Some(err.to_string()),
566                &evidence,
567            );
568            return Err(err);
569        }
570        let latest = match self.latest_config_version(collection, key) {
571            Ok(latest) => latest,
572            Err(err) => {
573                let _ = self.emit_config_mutation_event(
574                    crate::runtime::control_events::EventKind::ConfigDelete,
575                    crate::runtime::control_events::Outcome::Error,
576                    "config:delete",
577                    collection,
578                    key,
579                    Some(err.to_string()),
580                    &evidence,
581                );
582                return Err(err);
583            }
584        };
585        evidence.payload = latest.as_ref().map(|version| version.value.clone());
586        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
587        let value_type = latest.as_ref().and_then(|version| version.value_type);
588        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
589        let id = match self.append_config_version(
590            collection,
591            key,
592            Value::Null,
593            version,
594            true,
595            "delete",
596            value_type,
597            schema_version,
598            &[],
599        ) {
600            Ok(id) => id,
601            Err(err) => {
602                let _ = self.emit_config_mutation_event(
603                    crate::runtime::control_events::EventKind::ConfigDelete,
604                    crate::runtime::control_events::Outcome::Error,
605                    "config:delete",
606                    collection,
607                    key,
608                    Some(err.to_string()),
609                    &evidence,
610                );
611                return Err(err);
612            }
613        };
614        if let Some(before) = latest.as_ref().and_then(|version| {
615            if version.tombstone {
616                None
617            } else {
618                Some(crate::presentation::entity_json::storage_value_to_json(
619                    &version.value,
620                ))
621            }
622        }) {
623            self.record_kv_watch_event(
624                crate::replication::cdc::ChangeOperation::Delete,
625                collection,
626                key,
627                id.raw(),
628                Some(before),
629                None,
630            );
631        }
632        if let Err(err) = self.prune_config_history(collection, key) {
633            let _ = self.emit_config_mutation_event(
634                crate::runtime::control_events::EventKind::ConfigDelete,
635                crate::runtime::control_events::Outcome::Error,
636                "config:delete",
637                collection,
638                key,
639                Some(err.to_string()),
640                &evidence,
641            );
642            return Err(err);
643        }
644        self.invalidate_result_cache();
645        if let Err(err) = self.emit_config_mutation_event(
646            crate::runtime::control_events::EventKind::ConfigDelete,
647            crate::runtime::control_events::Outcome::Allowed,
648            "config:delete",
649            collection,
650            key,
651            None,
652            &evidence,
653        ) {
654            let _ = self.inner.db.store().delete(collection, id);
655            self.invalidate_result_cache();
656            return Err(err);
657        }
658        Ok(config_write_output(
659            raw_query,
660            collection,
661            key,
662            version,
663            id,
664            value_type,
665            schema_version,
666            &[],
667            "delete",
668            1,
669        ))
670    }
671
672    fn config_get_result(
673        &self,
674        raw_query: &str,
675        collection: &str,
676        key: &str,
677    ) -> RedDBResult<RuntimeQueryResult> {
678        self.check_system_config_capability("config:read", collection, key)
679            .map_err(RedDBError::Query)?;
680        let latest = self.latest_config_version(collection, key)?;
681        let mut result = UnifiedResult::with_columns(vec![
682            "collection".into(),
683            "key".into(),
684            "value".into(),
685            "version".into(),
686            "value_type".into(),
687            "schema_version".into(),
688            "tags".into(),
689            "tombstone".into(),
690        ]);
691        let mut record = UnifiedRecord::new();
692        record.set("collection", Value::text(collection.to_string()));
693        record.set("key", Value::text(key.to_string()));
694        if let Some(version) = latest {
695            record.set("value", version.value);
696            record.set("version", Value::Integer(version.version));
697            record.set("value_type", config_value_type_value(version.value_type));
698            record.set(
699                "schema_version",
700                version
701                    .schema_version
702                    .map(Value::Integer)
703                    .unwrap_or(Value::Null),
704            );
705            record.set("tags", config_tags_value(&version.tags));
706            record.set("tombstone", Value::Boolean(version.tombstone));
707        } else {
708            record.set("value", Value::Null);
709            record.set("version", Value::Null);
710            record.set("value_type", Value::Null);
711            record.set("schema_version", Value::Null);
712            record.set("tags", Value::Null);
713            record.set("tombstone", Value::Boolean(false));
714        }
715        result.push(record);
716        Ok(RuntimeQueryResult {
717            query: raw_query.to_string(),
718            mode: crate::storage::query::modes::QueryMode::Sql,
719            statement: "config_get",
720            engine: "config",
721            result,
722            affected_rows: 0,
723            statement_type: "select",
724            bookmark: None,
725        })
726    }
727
728    fn config_history_result(
729        &self,
730        raw_query: &str,
731        collection: &str,
732        key: &str,
733    ) -> RedDBResult<RuntimeQueryResult> {
734        self.check_system_config_capability("config:read", collection, key)
735            .map_err(RedDBError::Query)?;
736        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
737        let mut result = UnifiedResult::with_columns(vec![
738            "collection".into(),
739            "key".into(),
740            "version".into(),
741            "value".into(),
742            "value_type".into(),
743            "schema_version".into(),
744            "tags".into(),
745            "tombstone".into(),
746            "op".into(),
747            "created_at_ms".into(),
748        ]);
749        for version in versions {
750            let mut record = UnifiedRecord::new();
751            record.set("collection", Value::text(collection.to_string()));
752            record.set("key", Value::text(key.to_string()));
753            record.set("version", Value::Integer(version.version));
754            record.set("value", version.value);
755            record.set("value_type", config_value_type_value(version.value_type));
756            record.set(
757                "schema_version",
758                version
759                    .schema_version
760                    .map(Value::Integer)
761                    .unwrap_or(Value::Null),
762            );
763            record.set("tags", Value::Null);
764            record.set("tombstone", Value::Boolean(version.tombstone));
765            record.set("op", Value::text(version.op));
766            record.set("created_at_ms", Value::Integer(version.created_at_ms));
767            result.push(record);
768        }
769        Ok(RuntimeQueryResult {
770            query: raw_query.to_string(),
771            mode: crate::storage::query::modes::QueryMode::Sql,
772            statement: "config_history",
773            engine: "config",
774            result,
775            affected_rows: 0,
776            statement_type: "select",
777            bookmark: None,
778        })
779    }
780
781    fn config_list_result(
782        &self,
783        raw_query: &str,
784        collection: &str,
785        prefix: Option<&str>,
786        limit: Option<usize>,
787        offset: usize,
788    ) -> RedDBResult<RuntimeQueryResult> {
789        let mut versions = self.latest_config_versions(collection, prefix)?;
790        versions.sort_by(|left, right| left.key.cmp(&right.key));
791        let mut result = UnifiedResult::with_columns(vec![
792            "collection".into(),
793            "key".into(),
794            "value".into(),
795            "version".into(),
796            "value_type".into(),
797            "schema_version".into(),
798            "tags".into(),
799            "tombstone".into(),
800            "op".into(),
801            "created_at_ms".into(),
802        ]);
803        for version in versions
804            .into_iter()
805            .filter(|version| {
806                self.check_config_capability("config:read", collection, &version.key)
807                    .is_ok()
808            })
809            .skip(offset)
810            .take(limit.unwrap_or(usize::MAX))
811        {
812            let mut record = UnifiedRecord::new();
813            record.set("collection", Value::text(collection.to_string()));
814            record.set("key", Value::text(version.key));
815            record.set("value", version.value);
816            record.set("version", Value::Integer(version.version));
817            record.set("value_type", config_value_type_value(version.value_type));
818            record.set(
819                "schema_version",
820                version
821                    .schema_version
822                    .map(Value::Integer)
823                    .unwrap_or(Value::Null),
824            );
825            record.set("tags", config_tags_value(&version.tags));
826            record.set("tombstone", Value::Boolean(version.tombstone));
827            record.set("op", Value::text(version.op));
828            record.set("created_at_ms", Value::Integer(version.created_at_ms));
829            result.push(record);
830        }
831        Ok(RuntimeQueryResult {
832            query: raw_query.to_string(),
833            mode: crate::storage::query::modes::QueryMode::Sql,
834            statement: "config_list",
835            engine: "config",
836            result,
837            affected_rows: 0,
838            statement_type: "select",
839            bookmark: None,
840        })
841    }
842
843    fn config_watch_result(
844        &self,
845        raw_query: &str,
846        collection: &str,
847        key: &str,
848        prefix: bool,
849        from_lsn: Option<u64>,
850    ) -> RedDBResult<RuntimeQueryResult> {
851        let watch_key = if prefix {
852            format!("{key}.*")
853        } else {
854            key.to_string()
855        };
856        let endpoint = match from_lsn {
857            Some(lsn) => {
858                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
859            }
860            None => format!("/collections/{collection}/config/{watch_key}/watch"),
861        };
862        let mut result = UnifiedResult::with_columns(vec![
863            "collection".into(),
864            "key".into(),
865            "prefix".into(),
866            "from_lsn".into(),
867            "watch_url".into(),
868            "streaming".into(),
869        ]);
870        let mut record = UnifiedRecord::new();
871        record.set("collection", Value::text(collection.to_string()));
872        record.set("key", Value::text(watch_key));
873        record.set("prefix", Value::Boolean(prefix));
874        record.set(
875            "from_lsn",
876            from_lsn
877                .map(Value::UnsignedInteger)
878                .unwrap_or(crate::storage::schema::Value::Null),
879        );
880        record.set("watch_url", Value::text(endpoint));
881        record.set("streaming", Value::Boolean(true));
882        result.push(record);
883        Ok(RuntimeQueryResult {
884            query: raw_query.to_string(),
885            mode: crate::storage::query::modes::QueryMode::Sql,
886            statement: "config_watch",
887            engine: "config",
888            result,
889            affected_rows: 0,
890            statement_type: "stream",
891            bookmark: None,
892        })
893    }
894
895    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
896        let store = self.inner.db.store();
897        if store.get_collection(collection).is_none() {
898            store
899                .create_collection(collection)
900                .map_err(|err| RedDBError::Internal(err.to_string()))?;
901        }
902        if let Some(contract) = self.inner.db.collection_contract(collection) {
903            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
904                CollectionModel::Config,
905                contract.declared_model,
906            )?;
907            return Ok(());
908        }
909        let now = current_unix_ms();
910        self.inner
911            .db
912            .save_collection_contract(CollectionContract {
913                name: collection.to_string(),
914                declared_model: CollectionModel::Config,
915                schema_mode: SchemaMode::Dynamic,
916                origin: ContractOrigin::Explicit,
917                version: 1,
918                created_at_unix_ms: now as u128,
919                updated_at_unix_ms: now as u128,
920                default_ttl_ms: None,
921                vector_dimension: None,
922                vector_metric: None,
923                context_index_fields: Vec::new(),
924                declared_columns: Vec::new(),
925                table_def: None,
926                timestamps_enabled: false,
927                context_index_enabled: false,
928                metrics_raw_retention_ms: None,
929                metrics_rollup_policies: Vec::new(),
930                metrics_tenant_identity: None,
931                metrics_namespace: None,
932                append_only: false,
933                subscriptions: Vec::new(),
934                analytics_config: Vec::new(),
935                session_key: None,
936                session_gap_ms: None,
937                retention_duration_ms: None,
938                analytical_storage: None,
939            })
940            .map(|_| ())
941            .map_err(|err| RedDBError::Internal(err.to_string()))
942    }
943
944    fn append_config_version(
945        &self,
946        collection: &str,
947        key: &str,
948        value: Value,
949        version: i64,
950        tombstone: bool,
951        op: &str,
952        value_type: Option<ConfigValueType>,
953        schema_version: Option<i64>,
954        tags: &[String],
955    ) -> RedDBResult<EntityId> {
956        let now = current_unix_ms() as i64;
957        let fields = vec![
958            ("key".to_string(), Value::text(key.to_string())),
959            ("value".to_string(), value),
960            ("version".to_string(), Value::Integer(version)),
961            (
962                "value_type".to_string(),
963                config_value_type_value(value_type),
964            ),
965            (
966                "schema_version".to_string(),
967                schema_version.map(Value::Integer).unwrap_or(Value::Null),
968            ),
969            ("tombstone".to_string(), Value::Boolean(tombstone)),
970            ("op".to_string(), Value::text(op.to_string())),
971            ("created_at_ms".to_string(), Value::Integer(now)),
972            ("tags".to_string(), config_tags_value(tags)),
973        ];
974        let mut row = RowData::new(Vec::new());
975        row.named = Some(fields.into_iter().collect());
976        let entity = UnifiedEntity::new(
977            EntityId::new(0),
978            EntityKind::TableRow {
979                table: Arc::from(collection),
980                row_id: 0,
981            },
982            EntityData::Row(row),
983        );
984        self.inner
985            .db
986            .store()
987            .insert(collection, entity)
988            .map_err(|err| RedDBError::Internal(err.to_string()))
989    }
990
991    fn latest_config_version(
992        &self,
993        collection: &str,
994        key: &str,
995    ) -> RedDBResult<Option<ConfigVersion>> {
996        Ok(super::keyed_spine::latest_version(
997            self.config_versions(collection, key)?,
998        ))
999    }
1000
1001    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
1002        let store = self.inner.db.store();
1003        let Some(manager) = store.get_collection(collection) else {
1004            return Ok(Vec::new());
1005        };
1006        let mut versions = Vec::new();
1007        for entity in manager.query_all(|_| true) {
1008            let EntityData::Row(row) = &entity.data else {
1009                continue;
1010            };
1011            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1012                continue;
1013            };
1014            if version.key != key {
1015                continue;
1016            }
1017            versions.push(ConfigVersion::from_keyed_row(version, row));
1018        }
1019        Ok(versions)
1020    }
1021
1022    fn latest_config_versions(
1023        &self,
1024        collection: &str,
1025        prefix: Option<&str>,
1026    ) -> RedDBResult<Vec<ConfigVersion>> {
1027        let store = self.inner.db.store();
1028        let Some(manager) = store.get_collection(collection) else {
1029            return Ok(Vec::new());
1030        };
1031        let mut versions = Vec::new();
1032        for entity in manager.query_all(|_| true) {
1033            let EntityData::Row(row) = &entity.data else {
1034                continue;
1035            };
1036            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
1037                continue;
1038            };
1039            versions.push(ConfigVersion::from_keyed_row(version, row));
1040        }
1041        Ok(super::keyed_spine::latest_versions(versions, prefix))
1042    }
1043
1044    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
1045        let mut versions = self.config_versions(collection, key)?;
1046        if versions.len() <= CONFIG_HISTORY_LIMIT {
1047            return Ok(());
1048        }
1049        versions = super::keyed_spine::history_versions(versions);
1050        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
1051        let store = self.inner.db.store();
1052        for version in versions.into_iter().take(drop_count) {
1053            store
1054                .delete(collection, version.id)
1055                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1056        }
1057        Ok(())
1058    }
1059
1060    fn authorize_config_write_for_event(&self, collection: &str, key: &str) -> ConfigMutationAuthz {
1061        let default_evidence = self.default_config_mutation_evidence(collection, key);
1062        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1063            return ConfigMutationAuthz::Allowed(default_evidence);
1064        };
1065        if !auth_store.iam_authorization_enabled() {
1066            return ConfigMutationAuthz::Allowed(default_evidence);
1067        }
1068        let Some((principal, role)) = current_auth_identity() else {
1069            return ConfigMutationAuthz::Denied {
1070                reason:
1071                    "IAM authorization is enabled; config capability check requires an authenticated principal"
1072                        .to_string(),
1073                evidence: default_evidence,
1074            };
1075        };
1076        let tenant = current_tenant();
1077        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1078        let ctx = crate::auth::policies::EvalContext {
1079            principal_tenant: tenant.clone(),
1080            current_tenant: tenant.clone(),
1081            peer_ip: None,
1082            mfa_present: false,
1083            now_ms: crate::utils::now_unix_millis() as u128,
1084            principal_is_admin_role: role == crate::auth::Role::Admin,
1085            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1086            principal_is_platform_scoped: principal_id.tenant.is_none(),
1087        };
1088        let managed_key = if collection == "red.config" {
1089            format!("red.config.{key}")
1090        } else {
1091            key.to_string()
1092        };
1093        let gate = crate::auth::managed_config::ManagedConfigGate::new(
1094            self.inner.config_registry.as_ref(),
1095        );
1096        match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1097            crate::auth::managed_config::ManagedConfigDecision::Allow {
1098                entry_id,
1099                resource_type,
1100                managed,
1101                mutability,
1102                matched_action,
1103                matched_resource,
1104                ..
1105            } => {
1106                return ConfigMutationAuthz::Allowed(ConfigMutationEvidence {
1107                    id: entry_id,
1108                    resource_type,
1109                    managed,
1110                    mutability,
1111                    matched_action: Some(matched_action),
1112                    matched_resource: Some(matched_resource),
1113                    payload: None,
1114                });
1115            }
1116            crate::auth::managed_config::ManagedConfigDecision::Deny {
1117                entry_id,
1118                resource_type,
1119                managed,
1120                mutability,
1121                matched_action,
1122                matched_resource,
1123                reason,
1124                ..
1125            } => {
1126                return ConfigMutationAuthz::Denied {
1127                    reason: format!(
1128                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1129                    ),
1130                    evidence: ConfigMutationEvidence {
1131                        id: entry_id,
1132                        resource_type,
1133                        managed,
1134                        mutability,
1135                        matched_action: Some(matched_action),
1136                        matched_resource: Some(matched_resource),
1137                        payload: None,
1138                    },
1139                };
1140            }
1141            crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1142        }
1143
1144        let mut resource = crate::auth::policies::ResourceRef::new(
1145            "config",
1146            config_target_resource(collection, key),
1147        );
1148        if let Some(ref tenant) = tenant {
1149            resource = resource.with_tenant(tenant.clone());
1150        }
1151        if auth_store.check_policy_authz_with_role(
1152            &principal_id,
1153            "config:write",
1154            &resource,
1155            &ctx,
1156            role,
1157        ) {
1158            ConfigMutationAuthz::Allowed(default_evidence)
1159        } else {
1160            ConfigMutationAuthz::Denied {
1161                reason: format!(
1162                    "principal=`{}` action=`config:write` resource=`config:{}` denied by IAM policy",
1163                    principal,
1164                    config_target_resource(collection, key)
1165                ),
1166                evidence: default_evidence,
1167            }
1168        }
1169    }
1170
1171    fn default_config_mutation_evidence(
1172        &self,
1173        collection: &str,
1174        key: &str,
1175    ) -> ConfigMutationEvidence {
1176        let id = if collection == "red.config" {
1177            format!("red.config.{key}")
1178        } else {
1179            key.to_string()
1180        };
1181        ConfigMutationEvidence {
1182            id,
1183            resource_type: crate::auth::managed_config::RESOURCE_TYPE_CONFIG_KEY.to_string(),
1184            managed: false,
1185            mutability: crate::auth::registry::Mutability::MutableViaGovernance,
1186            matched_action: None,
1187            matched_resource: None,
1188            payload: None,
1189        }
1190    }
1191
1192    fn emit_config_mutation_event(
1193        &self,
1194        kind: crate::runtime::control_events::EventKind,
1195        outcome: crate::runtime::control_events::Outcome,
1196        action: &'static str,
1197        collection: &str,
1198        key: &str,
1199        reason: Option<String>,
1200        evidence: &ConfigMutationEvidence,
1201    ) -> RedDBResult<()> {
1202        use crate::runtime::control_events::{
1203            ActorRef, ControlEvent, ControlEventCtx, Sensitivity,
1204        };
1205
1206        let tenant = current_tenant();
1207        let principal = current_auth_identity();
1208        let actor_user = principal
1209            .as_ref()
1210            .map(|(principal, _)| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1211        let actor = actor_user
1212            .as_ref()
1213            .map(ActorRef::User)
1214            .unwrap_or(ActorRef::Anonymous);
1215        let ctx = ControlEventCtx {
1216            actor,
1217            scope: tenant
1218                .as_ref()
1219                .map(|scope| std::borrow::Cow::Borrowed(scope.as_str())),
1220            request_id: Some(std::borrow::Cow::Owned(format!(
1221                "conn-{}",
1222                current_connection_id()
1223            ))),
1224            trace_id: None,
1225        };
1226
1227        let mut fields = HashMap::new();
1228        fields.insert("id".to_string(), Sensitivity::raw(evidence.id.clone()));
1229        fields.insert(
1230            "resource_type".to_string(),
1231            Sensitivity::raw(evidence.resource_type.clone()),
1232        );
1233        fields.insert(
1234            "managed".to_string(),
1235            Sensitivity::raw(evidence.managed.to_string()),
1236        );
1237        fields.insert(
1238            "mutability".to_string(),
1239            Sensitivity::raw(config_mutability_label(evidence.mutability)),
1240        );
1241        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1242        fields.insert("key".to_string(), Sensitivity::raw(key));
1243        fields.insert(
1244            "connection_id".to_string(),
1245            Sensitivity::raw(current_connection_id().to_string()),
1246        );
1247        if let Some((_, role)) = principal {
1248            fields.insert("actor_role".to_string(), Sensitivity::raw(role.as_str()));
1249        }
1250        if let Some(matched_action) = &evidence.matched_action {
1251            fields.insert(
1252                "matched_action".to_string(),
1253                Sensitivity::raw(matched_action.clone()),
1254            );
1255        }
1256        if let Some(matched_resource) = &evidence.matched_resource {
1257            fields.insert(
1258                "matched_resource".to_string(),
1259                Sensitivity::raw(matched_resource.clone()),
1260            );
1261        }
1262        if let Some(payload) = &evidence.payload {
1263            fields.insert(
1264                "payload".to_string(),
1265                config_payload_sensitivity(&evidence.resource_type, "payload", payload),
1266            );
1267        }
1268
1269        let event = ControlEvent {
1270            kind,
1271            outcome,
1272            action: std::borrow::Cow::Borrowed(action),
1273            resource: Some(format!(
1274                "config:{}",
1275                config_target_resource(collection, key)
1276            )),
1277            reason,
1278            matched_policy_id: None,
1279            fields,
1280        };
1281        let ledger = self.inner.control_event_ledger.read();
1282        match ledger.emit(&ctx, event) {
1283            Ok(_) => Ok(()),
1284            Err(err) if self.inner.control_event_config.require_persistence() => {
1285                Err(RedDBError::Internal(err.to_string()))
1286            }
1287            Err(_) => Ok(()),
1288        }
1289    }
1290
1291    fn check_config_capability(
1292        &self,
1293        action: &str,
1294        collection: &str,
1295        key: &str,
1296    ) -> Result<(), String> {
1297        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1298            return Ok(());
1299        };
1300        if !auth_store.iam_authorization_enabled() {
1301            return Ok(());
1302        }
1303        let Some((principal, role)) = current_auth_identity() else {
1304            return Err(
1305                "IAM authorization is enabled; config capability check requires an authenticated principal"
1306                    .to_string(),
1307            );
1308        };
1309        let tenant = current_tenant();
1310        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1311        let mut resource = crate::auth::policies::ResourceRef::new(
1312            "config",
1313            config_target_resource(collection, key),
1314        );
1315        if let Some(ref tenant) = tenant {
1316            resource = resource.with_tenant(tenant.clone());
1317        }
1318        let ctx = crate::auth::policies::EvalContext {
1319            principal_tenant: tenant.clone(),
1320            current_tenant: tenant,
1321            peer_ip: None,
1322            mfa_present: false,
1323            now_ms: crate::utils::now_unix_millis() as u128,
1324            principal_is_admin_role: role == crate::auth::Role::Admin,
1325            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1326            principal_is_platform_scoped: principal_id.tenant.is_none(),
1327        };
1328        if action == "config:write" {
1329            let managed_key = if collection == "red.config" {
1330                format!("red.config.{key}")
1331            } else {
1332                key.to_string()
1333            };
1334            let gate = crate::auth::managed_config::ManagedConfigGate::new(
1335                self.inner.config_registry.as_ref(),
1336            );
1337            match gate.check_write(&auth_store, &principal_id, &ctx, &managed_key) {
1338                crate::auth::managed_config::ManagedConfigDecision::PassThrough { .. } => {}
1339                crate::auth::managed_config::ManagedConfigDecision::Allow { .. } => return Ok(()),
1340                crate::auth::managed_config::ManagedConfigDecision::Deny { reason, .. } => {
1341                    return Err(format!(
1342                        "permission denied: managed config mutation blocked for `{managed_key}`: {reason}"
1343                    ));
1344                }
1345            }
1346        }
1347        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1348            Ok(())
1349        } else {
1350            Err(format!(
1351                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
1352                principal,
1353                action,
1354                config_target_resource(collection, key)
1355            ))
1356        }
1357    }
1358
1359    fn check_system_config_capability(
1360        &self,
1361        action: &str,
1362        collection: &str,
1363        key: &str,
1364    ) -> Result<(), String> {
1365        if collection != "red.config" {
1366            return Ok(());
1367        }
1368        self.check_config_capability(action, collection, key)
1369    }
1370
1371    pub fn config_watch_events_since(
1372        &self,
1373        collection: &str,
1374        key: &str,
1375        since_lsn: u64,
1376        max_count: usize,
1377    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1378        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1379            .into_iter()
1380            .map(|event| self.policy_filter_config_watch_event(event))
1381            .collect()
1382    }
1383
1384    pub fn config_watch_events_since_prefix(
1385        &self,
1386        collection: &str,
1387        prefix: &str,
1388        since_lsn: u64,
1389        max_count: usize,
1390    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1391        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1392            .into_iter()
1393            .map(|event| self.policy_filter_config_watch_event(event))
1394            .collect()
1395    }
1396
1397    fn policy_filter_config_watch_event(
1398        &self,
1399        mut event: crate::replication::cdc::KvWatchEvent,
1400    ) -> crate::replication::cdc::KvWatchEvent {
1401        if self
1402            .check_config_capability("config:read", &event.collection, &event.key)
1403            .is_err()
1404        {
1405            event.before = None;
1406            event.after = None;
1407        }
1408        event
1409    }
1410
1411    /// `SecretRefGuard` write-side enforcement. When the inbound config
1412    /// value is a `secret_ref`, peek the vault target without auditing and
1413    /// reject the write if the target already resolves to another
1414    /// `secret_ref` (depth ≥ 2 or a self/mutual cycle). Returns `Some(err)`
1415    /// when the write must be refused; `None` otherwise.
1416    fn secret_ref_guard_write_check(
1417        &self,
1418        collection: &str,
1419        key: &str,
1420        value: &Value,
1421    ) -> Option<RedDBError> {
1422        if !value_looks_like_secret_ref(value) {
1423            return None;
1424        }
1425        let Ok(secret_ref) = parse_config_secret_ref(value) else {
1426            return None;
1427        };
1428        let unsealed = match self.peek_vault_unsealed(&secret_ref.collection, &secret_ref.key) {
1429            Ok(Some(value)) => value,
1430            Ok(None) => return None,
1431            Err(_) => return None,
1432        };
1433        if value_looks_like_secret_ref(&unsealed) {
1434            return Some(secret_ref_chain_error(
1435                collection,
1436                key,
1437                &secret_ref.collection,
1438                &secret_ref.key,
1439            ));
1440        }
1441        None
1442    }
1443
1444    fn audit_config_resolve(
1445        &self,
1446        collection: &str,
1447        key: &str,
1448        secret_ref: Option<&ConfigSecretRef>,
1449        outcome: crate::runtime::audit_log::Outcome,
1450        reason: &str,
1451    ) {
1452        let actor = current_auth_identity()
1453            .map(|(principal, _)| principal)
1454            .unwrap_or_else(|| "anonymous".to_string());
1455        let request_id = match current_connection_id() {
1456            0 => "embedded".to_string(),
1457            id => format!("conn-{id}"),
1458        };
1459        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
1460            .principal(actor.clone())
1461            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1462            .resource(format!(
1463                "config:{}",
1464                config_target_resource(collection, key)
1465            ))
1466            .outcome(outcome)
1467            .correlation_id(request_id.clone())
1468            .fields([
1469                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1470                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1471                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1472                crate::runtime::audit_log::AuditFieldEscaper::field(
1473                    "target",
1474                    config_target_resource(collection, key),
1475                ),
1476                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1477                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1478                crate::runtime::audit_log::AuditFieldEscaper::field(
1479                    "connection_id",
1480                    current_connection_id(),
1481                ),
1482            ]);
1483        if let Some(tenant) = current_tenant() {
1484            builder = builder.tenant(tenant);
1485        }
1486        if let Some(secret_ref) = secret_ref {
1487            builder = builder.fields([
1488                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
1489                crate::runtime::audit_log::AuditFieldEscaper::field(
1490                    "resolved_collection",
1491                    secret_ref.collection.as_str(),
1492                ),
1493                crate::runtime::audit_log::AuditFieldEscaper::field(
1494                    "resolved_key",
1495                    secret_ref.key.as_str(),
1496                ),
1497                crate::runtime::audit_log::AuditFieldEscaper::field(
1498                    "resolved_target",
1499                    format!("{}.{}", secret_ref.collection, secret_ref.key),
1500                ),
1501            ]);
1502        }
1503        self.audit_log().record_event(builder.build());
1504    }
1505}
1506
1507/// `SecretRefGuard` — returns `true` when the storage value carries the
1508/// canonical `secret_ref` JSON shape (`{"type":"secret_ref", …}`). Drives
1509/// the depth-1 invariant: a value that looks like a secret_ref is treated
1510/// as indirection, never as terminal plaintext.
1511fn value_looks_like_secret_ref(value: &Value) -> bool {
1512    let Value::Json(bytes) = value else {
1513        return false;
1514    };
1515    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1516        return false;
1517    };
1518    let Some(object) = json.as_object() else {
1519        return false;
1520    };
1521    object.get("type").and_then(|value| value.as_str()) == Some("secret_ref")
1522}
1523
1524fn secret_ref_chain_error(
1525    source_collection: &str,
1526    source_key: &str,
1527    target_collection: &str,
1528    target_key: &str,
1529) -> RedDBError {
1530    RedDBError::InvalidConfig(format!(
1531        "secret_ref chain rejected: config `{source_collection}.{source_key}` points at vault `{target_collection}.{target_key}` which is itself a secret_ref; depth-1 invariant requires the target to resolve to a non-secret_ref value"
1532    ))
1533}
1534
1535fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
1536    let Value::Json(bytes) = value else {
1537        return Err(RedDBError::InvalidConfig(
1538            "CONFIG value is not a SecretRef".to_string(),
1539        ));
1540    };
1541    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1542        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
1543    })?;
1544    let Some(object) = json.as_object() else {
1545        return Err(RedDBError::InvalidConfig(
1546            "CONFIG SecretRef must be an object".to_string(),
1547        ));
1548    };
1549    let get_str = |field: &str| -> RedDBResult<&str> {
1550        object
1551            .get(field)
1552            .and_then(|value| value.as_str())
1553            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
1554    };
1555    if get_str("type")? != "secret_ref" {
1556        return Err(RedDBError::InvalidConfig(
1557            "CONFIG value is not a SecretRef".to_string(),
1558        ));
1559    }
1560    if get_str("store")? != "vault" {
1561        return Err(RedDBError::InvalidConfig(
1562            "CONFIG SecretRef store is unsupported".to_string(),
1563        ));
1564    }
1565    Ok(ConfigSecretRef {
1566        collection: get_str("collection")?.to_string(),
1567        key: get_str("key")?.to_string(),
1568    })
1569}
1570
1571fn config_target_resource(collection: &str, key: &str) -> String {
1572    if collection == "red.config" {
1573        format!("red.config/{}", key.to_ascii_lowercase())
1574    } else {
1575        format!("{collection}.{key}")
1576    }
1577}
1578
1579fn config_write_output(
1580    raw_query: &str,
1581    collection: &str,
1582    key: &str,
1583    version: i64,
1584    id: EntityId,
1585    value_type: Option<ConfigValueType>,
1586    schema_version: Option<i64>,
1587    tags: &[String],
1588    statement: &'static str,
1589    affected_rows: u64,
1590) -> RuntimeQueryResult {
1591    let mut result = UnifiedResult::with_columns(vec![
1592        "ok".into(),
1593        "collection".into(),
1594        "key".into(),
1595        "version".into(),
1596        "value_type".into(),
1597        "schema_version".into(),
1598        "tags".into(),
1599        "id".into(),
1600    ]);
1601    let mut record = UnifiedRecord::new();
1602    record.set("ok", Value::Boolean(true));
1603    record.set("collection", Value::text(collection.to_string()));
1604    record.set("key", Value::text(key.to_string()));
1605    record.set("version", Value::Integer(version));
1606    record.set("value_type", config_value_type_value(value_type));
1607    record.set(
1608        "schema_version",
1609        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1610    );
1611    record.set("tags", config_tags_value(tags));
1612    record.set("id", Value::Integer(id.raw() as i64));
1613    result.push(record);
1614    RuntimeQueryResult {
1615        query: raw_query.to_string(),
1616        mode: crate::storage::query::modes::QueryMode::Sql,
1617        statement,
1618        engine: "config",
1619        result,
1620        affected_rows,
1621        statement_type: if statement == "delete" {
1622            "delete"
1623        } else {
1624            "update"
1625        },
1626        bookmark: None,
1627    }
1628}
1629
1630fn invalid_config_volatility(operation: &str) -> RedDBError {
1631    RedDBError::InvalidOperation(format!(
1632        "CONFIG does not support KV-only volatility operation {operation}"
1633    ))
1634}
1635
1636fn resolve_config_schema(
1637    latest: Option<&ConfigVersion>,
1638    requested_type: Option<ConfigValueType>,
1639) -> (Option<ConfigValueType>, Option<i64>) {
1640    let previous_type = latest.and_then(|version| version.value_type);
1641    let previous_schema_version = latest.and_then(|version| version.schema_version);
1642    match requested_type {
1643        Some(value_type) if Some(value_type) != previous_type => (
1644            Some(value_type),
1645            Some(previous_schema_version.unwrap_or(0) + 1),
1646        ),
1647        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1648        None => (previous_type, previous_schema_version),
1649    }
1650}
1651
1652fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1653    let valid = match value_type {
1654        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1655        ConfigValueType::Int => matches!(
1656            value,
1657            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1658        ),
1659        ConfigValueType::String => matches!(value, Value::Text(_)),
1660        ConfigValueType::Url => validate_config_url(value),
1661        ConfigValueType::Object => validate_config_json_shape(value, true),
1662        ConfigValueType::Array => {
1663            matches!(value, Value::Array(_) | Value::Vector(_))
1664                || validate_config_json_shape(value, false)
1665        }
1666    };
1667    if valid {
1668        Ok(())
1669    } else {
1670        Err(RedDBError::InvalidConfig(format!(
1671            "CONFIG value type mismatch: expected {}, got {}",
1672            value_type.as_str(),
1673            config_actual_value_type(value),
1674        )))
1675    }
1676}
1677
1678/// `true` when `(collection, key)` addresses the policy enforcement
1679/// mode flag (#712 / S5A). The flag lives under `red.config` so the
1680/// rest of the config infrastructure (managed-config registry, audit,
1681/// history) governs it for free.
1682fn is_enforcement_mode_config(collection: &str, key: &str) -> bool {
1683    collection == "red.config" && key == "policy.enforcement_mode"
1684}
1685
1686/// Reject any value the policy evaluator does not understand. Surfaced
1687/// as `InvalidConfig` so the SQL error path mirrors how other config
1688/// validation failures are reported.
1689fn validate_enforcement_mode_value(value: &Value) -> RedDBResult<()> {
1690    let text = match value {
1691        Value::Text(text) => text.as_ref(),
1692        _ => {
1693            return Err(RedDBError::InvalidConfig(format!(
1694                "config key `{}` must be a string ({} or {}); got {}",
1695                crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1696                crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1697                crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1698                config_actual_value_type(value),
1699            )));
1700        }
1701    };
1702    if crate::auth::enforcement_mode::PolicyEnforcementMode::parse(text).is_some() {
1703        Ok(())
1704    } else {
1705        Err(RedDBError::InvalidConfig(format!(
1706            "config key `{}` accepts only `{}` or `{}`, got `{}`",
1707            crate::auth::enforcement_mode::ENFORCEMENT_MODE_CONFIG_KEY,
1708            crate::auth::enforcement_mode::PolicyEnforcementMode::LegacyRbac.as_str(),
1709            crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly.as_str(),
1710            text,
1711        )))
1712    }
1713}
1714
1715fn validate_config_url(value: &Value) -> bool {
1716    let url = match value {
1717        Value::Url(value) => value.as_str(),
1718        Value::Text(value) => value.as_ref(),
1719        _ => return false,
1720    };
1721    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1722}
1723
1724fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1725    let Value::Json(bytes) = value else {
1726        return false;
1727    };
1728    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1729        return false;
1730    };
1731    matches!(
1732        (object, json),
1733        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1734    )
1735}
1736
1737fn config_actual_value_type(value: &Value) -> &'static str {
1738    match value {
1739        Value::Null => "null",
1740        Value::Boolean(_) => "bool",
1741        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1742        Value::Text(_) => "string",
1743        Value::Url(_) => "url",
1744        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1745            Ok(crate::json::Value::Object(_)) => "object",
1746            Ok(crate::json::Value::Array(_)) => "array",
1747            _ => "json",
1748        },
1749        Value::Array(_) | Value::Vector(_) => "array",
1750        _ => "other",
1751    }
1752}
1753
1754fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1755    value_type
1756        .map(|value_type| Value::text(value_type.as_str()))
1757        .unwrap_or(Value::Null)
1758}
1759
1760fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1761    match value {
1762        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1763        _ => None,
1764    }
1765}
1766
1767fn config_tags_value(tags: &[String]) -> Value {
1768    if tags.is_empty() {
1769        return Value::Null;
1770    }
1771    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1772}
1773
1774fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1775    match value {
1776        Some(Value::Array(values)) => values
1777            .iter()
1778            .filter_map(|value| match value {
1779                Value::Text(tag) => Some(tag.to_string()),
1780                _ => None,
1781            })
1782            .collect(),
1783        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1784            .ok()
1785            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1786            .map(|values| {
1787                values
1788                    .into_iter()
1789                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1790                    .collect()
1791            })
1792            .unwrap_or_default(),
1793        _ => Vec::new(),
1794    }
1795}
1796
1797fn config_payload_sensitivity(
1798    resource_type: &str,
1799    field: &str,
1800    value: &Value,
1801) -> crate::runtime::control_events::Sensitivity {
1802    let payload = config_payload_bytes(value);
1803    if config_payload_raw_allowed(resource_type, field) {
1804        crate::runtime::control_events::Sensitivity::raw(
1805            String::from_utf8_lossy(&payload).into_owned(),
1806        )
1807    } else {
1808        crate::runtime::control_events::Sensitivity::hashed(&payload)
1809    }
1810}
1811
1812fn config_payload_bytes(value: &Value) -> Vec<u8> {
1813    let json = crate::presentation::entity_json::storage_value_to_json(value);
1814    crate::serde_json::to_vec(&json).unwrap_or_else(|_| value.to_string().into_bytes())
1815}
1816
1817fn config_payload_raw_allowed(resource_type: &str, field: &str) -> bool {
1818    const RAW_PAYLOAD_FIELDS: &[(&str, &str)] = &[("audit_surface", "payload")];
1819    RAW_PAYLOAD_FIELDS
1820        .iter()
1821        .any(|(allowed_type, allowed_field)| {
1822            *allowed_type == resource_type && *allowed_field == field
1823        })
1824}
1825
1826fn config_mutability_label(mutability: crate::auth::registry::Mutability) -> &'static str {
1827    match mutability {
1828        crate::auth::registry::Mutability::Immutable => "immutable",
1829        crate::auth::registry::Mutability::MutableViaGovernance => "mutable_via_governance",
1830    }
1831}
1832
1833fn current_unix_ms() -> u64 {
1834    std::time::SystemTime::now()
1835        .duration_since(std::time::UNIX_EPOCH)
1836        .unwrap_or_default()
1837        .as_millis() as u64
1838}