Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17    id: EntityId,
18    key: String,
19    version: i64,
20    value: Value,
21    tombstone: bool,
22    created_at_ms: i64,
23    op: String,
24    value_type: Option<ConfigValueType>,
25    schema_version: Option<i64>,
26    tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30    fn key(&self) -> &str {
31        &self.key
32    }
33
34    fn version(&self) -> i64 {
35        self.version
36    }
37}
38
39impl ConfigVersion {
40    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41        Self {
42            id: version.id,
43            key: version.key,
44            version: version.version,
45            value: version.value,
46            tombstone: version.tombstone,
47            created_at_ms: version.created_at_ms,
48            op: version.op,
49            value_type: row
50                .get_field("value_type")
51                .and_then(config_value_type_from_value),
52            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53            tags: config_tags_from_value(row.get_field("tags")),
54        }
55    }
56}
57
58struct ConfigSecretRef {
59    collection: String,
60    key: String,
61}
62
63impl RedDBRuntime {
64    pub fn execute_config_command(
65        &self,
66        raw_query: &str,
67        cmd: &crate::storage::query::ast::ConfigCommand,
68    ) -> RedDBResult<RuntimeQueryResult> {
69        use crate::storage::query::ast::ConfigCommand;
70
71        match cmd {
72            ConfigCommand::Put {
73                collection,
74                key,
75                value,
76                value_type,
77                tags,
78            } => self.config_write_result(
79                raw_query,
80                collection,
81                key,
82                value.clone(),
83                *value_type,
84                tags,
85                "put",
86            ),
87            ConfigCommand::Rotate {
88                collection,
89                key,
90                value,
91                value_type,
92                tags,
93            } => self.config_write_result(
94                raw_query,
95                collection,
96                key,
97                value.clone(),
98                *value_type,
99                tags,
100                "rotate",
101            ),
102            ConfigCommand::Get { collection, key } => {
103                self.config_get_result(raw_query, collection, key)
104            }
105            ConfigCommand::Resolve { collection, key } => {
106                self.config_resolve_result(raw_query, collection, key)
107            }
108            ConfigCommand::Delete { collection, key } => {
109                self.config_delete_result(raw_query, collection, key)
110            }
111            ConfigCommand::History { collection, key } => {
112                self.config_history_result(raw_query, collection, key)
113            }
114            ConfigCommand::List {
115                collection,
116                prefix,
117                limit,
118                offset,
119            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120            ConfigCommand::Watch {
121                collection,
122                key,
123                prefix,
124                from_lsn,
125            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127                Err(invalid_config_volatility(operation))
128            }
129        }
130    }
131
132    pub(crate) fn validate_config_command_before_auth(
133        &self,
134        cmd: &crate::storage::query::ast::ConfigCommand,
135    ) -> RedDBResult<()> {
136        use crate::storage::query::ast::ConfigCommand;
137        match cmd {
138            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139                Err(invalid_config_volatility(operation))
140            }
141            ConfigCommand::Put { collection, .. }
142            | ConfigCommand::Get { collection, .. }
143            | ConfigCommand::Resolve { collection, .. }
144            | ConfigCommand::Rotate { collection, .. }
145            | ConfigCommand::Delete { collection, .. }
146            | ConfigCommand::History { collection, .. }
147            | ConfigCommand::List { collection, .. }
148            | ConfigCommand::Watch { collection, .. } => {
149                let snapshot = self.inner.db.catalog_model_snapshot();
150                let Some(actual_model) = snapshot
151                    .collections
152                    .iter()
153                    .find(|c| c.name == *collection)
154                    .map(|c| c.declared_model.unwrap_or(c.model))
155                else {
156                    return Ok(());
157                };
158                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159                    CollectionModel::Config,
160                    actual_model,
161                )
162            }
163        }
164    }
165
166    fn config_resolve_result(
167        &self,
168        raw_query: &str,
169        collection: &str,
170        key: &str,
171    ) -> RedDBResult<RuntimeQueryResult> {
172        let latest = self.latest_config_version(collection, key)?;
173        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174            self.audit_config_resolve(
175                collection,
176                key,
177                None,
178                crate::runtime::audit_log::Outcome::Denied,
179                &reason,
180            );
181            return Err(RedDBError::Query(reason));
182        }
183
184        let Some(version) = latest else {
185            let reason = "not_found";
186            self.audit_config_resolve(
187                collection,
188                key,
189                None,
190                crate::runtime::audit_log::Outcome::Denied,
191                reason,
192            );
193            return Err(RedDBError::NotFound(format!(
194                "config '{}.{}' not found",
195                collection, key
196            )));
197        };
198        if version.tombstone {
199            let reason = "deleted";
200            self.audit_config_resolve(
201                collection,
202                key,
203                None,
204                crate::runtime::audit_log::Outcome::Denied,
205                reason,
206            );
207            return Err(RedDBError::NotFound(format!(
208                "config '{}.{}' is deleted",
209                collection, key
210            )));
211        }
212
213        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214            self.audit_config_resolve(
215                collection,
216                key,
217                None,
218                crate::runtime::audit_log::Outcome::Error,
219                &err.to_string(),
220            );
221        })?;
222
223        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224            Ok(value) => {
225                self.audit_config_resolve(
226                    collection,
227                    key,
228                    Some(&secret_ref),
229                    crate::runtime::audit_log::Outcome::Success,
230                    "ok",
231                );
232                let mut result = UnifiedResult::with_columns(vec![
233                    "collection".into(),
234                    "key".into(),
235                    "value".into(),
236                    "resolved_store".into(),
237                    "resolved_collection".into(),
238                    "resolved_key".into(),
239                ]);
240                let mut record = UnifiedRecord::new();
241                record.set("collection", Value::text(collection.to_string()));
242                record.set("key", Value::text(key.to_string()));
243                record.set("value", value);
244                record.set("resolved_store", Value::text("vault"));
245                record.set("resolved_collection", Value::text(secret_ref.collection));
246                record.set("resolved_key", Value::text(secret_ref.key));
247                result.push(record);
248                Ok(RuntimeQueryResult {
249                    query: raw_query.to_string(),
250                    mode: crate::storage::query::modes::QueryMode::Sql,
251                    statement: "config_resolve",
252                    engine: "config",
253                    result,
254                    affected_rows: 0,
255                    statement_type: "select",
256                })
257            }
258            Err(err) => {
259                let reason = err.to_string();
260                let outcome = if reason.contains("denied") {
261                    crate::runtime::audit_log::Outcome::Denied
262                } else {
263                    crate::runtime::audit_log::Outcome::Error
264                };
265                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266                Err(err)
267            }
268        }
269    }
270
271    fn config_write_result(
272        &self,
273        raw_query: &str,
274        collection: &str,
275        key: &str,
276        value: Value,
277        requested_type: Option<ConfigValueType>,
278        tags: &[String],
279        op: &str,
280    ) -> RedDBResult<RuntimeQueryResult> {
281        self.check_system_config_capability("config:write", collection, key)
282            .map_err(RedDBError::Query)?;
283        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284        self.ensure_config_collection(collection)?;
285        let latest = self.latest_config_version(collection, key)?;
286        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288        if let Some(value_type) = value_type {
289            validate_config_value_type(&value, value_type)?;
290        }
291        let before = latest.as_ref().and_then(|version| {
292            if version.tombstone {
293                None
294            } else {
295                Some(crate::presentation::entity_json::storage_value_to_json(
296                    &version.value,
297                ))
298            }
299        });
300        let after = Some(crate::presentation::entity_json::storage_value_to_json(
301            &value,
302        ));
303        let change_op = if latest.is_some() {
304            crate::replication::cdc::ChangeOperation::Update
305        } else {
306            crate::replication::cdc::ChangeOperation::Insert
307        };
308        let id = self.append_config_version(
309            collection,
310            key,
311            value,
312            version,
313            false,
314            op,
315            value_type,
316            schema_version,
317            tags,
318        )?;
319        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320        self.prune_config_history(collection, key)?;
321        self.invalidate_result_cache();
322        Ok(config_write_output(
323            raw_query,
324            collection,
325            key,
326            version,
327            id,
328            value_type,
329            schema_version,
330            tags,
331            match op {
332                "rotate" => "config_rotate",
333                _ => "config_put",
334            },
335            1,
336        ))
337    }
338
339    fn config_delete_result(
340        &self,
341        raw_query: &str,
342        collection: &str,
343        key: &str,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        self.check_system_config_capability("config:write", collection, key)
346            .map_err(RedDBError::Query)?;
347        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348        self.ensure_config_collection(collection)?;
349        let latest = self.latest_config_version(collection, key)?;
350        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351        let value_type = latest.as_ref().and_then(|version| version.value_type);
352        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353        let id = self.append_config_version(
354            collection,
355            key,
356            Value::Null,
357            version,
358            true,
359            "delete",
360            value_type,
361            schema_version,
362            &[],
363        )?;
364        if let Some(before) = latest.as_ref().and_then(|version| {
365            if version.tombstone {
366                None
367            } else {
368                Some(crate::presentation::entity_json::storage_value_to_json(
369                    &version.value,
370                ))
371            }
372        }) {
373            self.record_kv_watch_event(
374                crate::replication::cdc::ChangeOperation::Delete,
375                collection,
376                key,
377                id.raw(),
378                Some(before),
379                None,
380            );
381        }
382        self.prune_config_history(collection, key)?;
383        self.invalidate_result_cache();
384        Ok(config_write_output(
385            raw_query,
386            collection,
387            key,
388            version,
389            id,
390            value_type,
391            schema_version,
392            &[],
393            "delete",
394            1,
395        ))
396    }
397
398    fn config_get_result(
399        &self,
400        raw_query: &str,
401        collection: &str,
402        key: &str,
403    ) -> RedDBResult<RuntimeQueryResult> {
404        self.check_system_config_capability("config:read", collection, key)
405            .map_err(RedDBError::Query)?;
406        let latest = self.latest_config_version(collection, key)?;
407        let mut result = UnifiedResult::with_columns(vec![
408            "collection".into(),
409            "key".into(),
410            "value".into(),
411            "version".into(),
412            "value_type".into(),
413            "schema_version".into(),
414            "tags".into(),
415            "tombstone".into(),
416        ]);
417        let mut record = UnifiedRecord::new();
418        record.set("collection", Value::text(collection.to_string()));
419        record.set("key", Value::text(key.to_string()));
420        if let Some(version) = latest {
421            record.set("value", version.value);
422            record.set("version", Value::Integer(version.version));
423            record.set("value_type", config_value_type_value(version.value_type));
424            record.set(
425                "schema_version",
426                version
427                    .schema_version
428                    .map(Value::Integer)
429                    .unwrap_or(Value::Null),
430            );
431            record.set("tags", config_tags_value(&version.tags));
432            record.set("tombstone", Value::Boolean(version.tombstone));
433        } else {
434            record.set("value", Value::Null);
435            record.set("version", Value::Null);
436            record.set("value_type", Value::Null);
437            record.set("schema_version", Value::Null);
438            record.set("tags", Value::Null);
439            record.set("tombstone", Value::Boolean(false));
440        }
441        result.push(record);
442        Ok(RuntimeQueryResult {
443            query: raw_query.to_string(),
444            mode: crate::storage::query::modes::QueryMode::Sql,
445            statement: "config_get",
446            engine: "config",
447            result,
448            affected_rows: 0,
449            statement_type: "select",
450        })
451    }
452
453    fn config_history_result(
454        &self,
455        raw_query: &str,
456        collection: &str,
457        key: &str,
458    ) -> RedDBResult<RuntimeQueryResult> {
459        self.check_system_config_capability("config:read", collection, key)
460            .map_err(RedDBError::Query)?;
461        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462        let mut result = UnifiedResult::with_columns(vec![
463            "collection".into(),
464            "key".into(),
465            "version".into(),
466            "value".into(),
467            "value_type".into(),
468            "schema_version".into(),
469            "tags".into(),
470            "tombstone".into(),
471            "op".into(),
472            "created_at_ms".into(),
473        ]);
474        for version in versions {
475            let mut record = UnifiedRecord::new();
476            record.set("collection", Value::text(collection.to_string()));
477            record.set("key", Value::text(key.to_string()));
478            record.set("version", Value::Integer(version.version));
479            record.set("value", version.value);
480            record.set("value_type", config_value_type_value(version.value_type));
481            record.set(
482                "schema_version",
483                version
484                    .schema_version
485                    .map(Value::Integer)
486                    .unwrap_or(Value::Null),
487            );
488            record.set("tags", Value::Null);
489            record.set("tombstone", Value::Boolean(version.tombstone));
490            record.set("op", Value::text(version.op));
491            record.set("created_at_ms", Value::Integer(version.created_at_ms));
492            result.push(record);
493        }
494        Ok(RuntimeQueryResult {
495            query: raw_query.to_string(),
496            mode: crate::storage::query::modes::QueryMode::Sql,
497            statement: "config_history",
498            engine: "config",
499            result,
500            affected_rows: 0,
501            statement_type: "select",
502        })
503    }
504
505    fn config_list_result(
506        &self,
507        raw_query: &str,
508        collection: &str,
509        prefix: Option<&str>,
510        limit: Option<usize>,
511        offset: usize,
512    ) -> RedDBResult<RuntimeQueryResult> {
513        let mut versions = self.latest_config_versions(collection, prefix)?;
514        versions.sort_by(|left, right| left.key.cmp(&right.key));
515        let mut result = UnifiedResult::with_columns(vec![
516            "collection".into(),
517            "key".into(),
518            "value".into(),
519            "version".into(),
520            "value_type".into(),
521            "schema_version".into(),
522            "tags".into(),
523            "tombstone".into(),
524            "op".into(),
525            "created_at_ms".into(),
526        ]);
527        for version in versions
528            .into_iter()
529            .filter(|version| {
530                self.check_config_capability("config:read", collection, &version.key)
531                    .is_ok()
532            })
533            .skip(offset)
534            .take(limit.unwrap_or(usize::MAX))
535        {
536            let mut record = UnifiedRecord::new();
537            record.set("collection", Value::text(collection.to_string()));
538            record.set("key", Value::text(version.key));
539            record.set("value", version.value);
540            record.set("version", Value::Integer(version.version));
541            record.set("value_type", config_value_type_value(version.value_type));
542            record.set(
543                "schema_version",
544                version
545                    .schema_version
546                    .map(Value::Integer)
547                    .unwrap_or(Value::Null),
548            );
549            record.set("tags", config_tags_value(&version.tags));
550            record.set("tombstone", Value::Boolean(version.tombstone));
551            record.set("op", Value::text(version.op));
552            record.set("created_at_ms", Value::Integer(version.created_at_ms));
553            result.push(record);
554        }
555        Ok(RuntimeQueryResult {
556            query: raw_query.to_string(),
557            mode: crate::storage::query::modes::QueryMode::Sql,
558            statement: "config_list",
559            engine: "config",
560            result,
561            affected_rows: 0,
562            statement_type: "select",
563        })
564    }
565
566    fn config_watch_result(
567        &self,
568        raw_query: &str,
569        collection: &str,
570        key: &str,
571        prefix: bool,
572        from_lsn: Option<u64>,
573    ) -> RedDBResult<RuntimeQueryResult> {
574        let watch_key = if prefix {
575            format!("{key}.*")
576        } else {
577            key.to_string()
578        };
579        let endpoint = match from_lsn {
580            Some(lsn) => {
581                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582            }
583            None => format!("/collections/{collection}/config/{watch_key}/watch"),
584        };
585        let mut result = UnifiedResult::with_columns(vec![
586            "collection".into(),
587            "key".into(),
588            "prefix".into(),
589            "from_lsn".into(),
590            "watch_url".into(),
591            "streaming".into(),
592        ]);
593        let mut record = UnifiedRecord::new();
594        record.set("collection", Value::text(collection.to_string()));
595        record.set("key", Value::text(watch_key));
596        record.set("prefix", Value::Boolean(prefix));
597        record.set(
598            "from_lsn",
599            from_lsn
600                .map(Value::UnsignedInteger)
601                .unwrap_or(crate::storage::schema::Value::Null),
602        );
603        record.set("watch_url", Value::text(endpoint));
604        record.set("streaming", Value::Boolean(true));
605        result.push(record);
606        Ok(RuntimeQueryResult {
607            query: raw_query.to_string(),
608            mode: crate::storage::query::modes::QueryMode::Sql,
609            statement: "config_watch",
610            engine: "config",
611            result,
612            affected_rows: 0,
613            statement_type: "stream",
614        })
615    }
616
617    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618        let store = self.inner.db.store();
619        if store.get_collection(collection).is_none() {
620            store
621                .create_collection(collection)
622                .map_err(|err| RedDBError::Internal(err.to_string()))?;
623        }
624        if let Some(contract) = self.inner.db.collection_contract(collection) {
625            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626                CollectionModel::Config,
627                contract.declared_model,
628            )?;
629            return Ok(());
630        }
631        let now = current_unix_ms();
632        self.inner
633            .db
634            .save_collection_contract(CollectionContract {
635                name: collection.to_string(),
636                declared_model: CollectionModel::Config,
637                schema_mode: SchemaMode::Dynamic,
638                origin: ContractOrigin::Explicit,
639                version: 1,
640                created_at_unix_ms: now as u128,
641                updated_at_unix_ms: now as u128,
642                default_ttl_ms: None,
643                vector_dimension: None,
644                vector_metric: None,
645                context_index_fields: Vec::new(),
646                declared_columns: Vec::new(),
647                table_def: None,
648                timestamps_enabled: false,
649                context_index_enabled: false,
650                metrics_raw_retention_ms: None,
651                metrics_rollup_policies: Vec::new(),
652                metrics_tenant_identity: None,
653                metrics_namespace: None,
654                append_only: false,
655                subscriptions: Vec::new(),
656                session_key: None,
657                session_gap_ms: None,
658                retention_duration_ms: None,
659            })
660            .map(|_| ())
661            .map_err(|err| RedDBError::Internal(err.to_string()))
662    }
663
664    fn append_config_version(
665        &self,
666        collection: &str,
667        key: &str,
668        value: Value,
669        version: i64,
670        tombstone: bool,
671        op: &str,
672        value_type: Option<ConfigValueType>,
673        schema_version: Option<i64>,
674        tags: &[String],
675    ) -> RedDBResult<EntityId> {
676        let now = current_unix_ms() as i64;
677        let fields = vec![
678            ("key".to_string(), Value::text(key.to_string())),
679            ("value".to_string(), value),
680            ("version".to_string(), Value::Integer(version)),
681            (
682                "value_type".to_string(),
683                config_value_type_value(value_type),
684            ),
685            (
686                "schema_version".to_string(),
687                schema_version.map(Value::Integer).unwrap_or(Value::Null),
688            ),
689            ("tombstone".to_string(), Value::Boolean(tombstone)),
690            ("op".to_string(), Value::text(op.to_string())),
691            ("created_at_ms".to_string(), Value::Integer(now)),
692            ("tags".to_string(), config_tags_value(tags)),
693        ];
694        let mut row = RowData::new(Vec::new());
695        row.named = Some(fields.into_iter().collect());
696        let entity = UnifiedEntity::new(
697            EntityId::new(0),
698            EntityKind::TableRow {
699                table: Arc::from(collection),
700                row_id: 0,
701            },
702            EntityData::Row(row),
703        );
704        self.inner
705            .db
706            .store()
707            .insert(collection, entity)
708            .map_err(|err| RedDBError::Internal(err.to_string()))
709    }
710
711    fn latest_config_version(
712        &self,
713        collection: &str,
714        key: &str,
715    ) -> RedDBResult<Option<ConfigVersion>> {
716        Ok(super::keyed_spine::latest_version(
717            self.config_versions(collection, key)?,
718        ))
719    }
720
721    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
722        let store = self.inner.db.store();
723        let Some(manager) = store.get_collection(collection) else {
724            return Ok(Vec::new());
725        };
726        let mut versions = Vec::new();
727        for entity in manager.query_all(|_| true) {
728            let EntityData::Row(row) = &entity.data else {
729                continue;
730            };
731            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
732                continue;
733            };
734            if version.key != key {
735                continue;
736            }
737            versions.push(ConfigVersion::from_keyed_row(version, row));
738        }
739        Ok(versions)
740    }
741
742    fn latest_config_versions(
743        &self,
744        collection: &str,
745        prefix: Option<&str>,
746    ) -> RedDBResult<Vec<ConfigVersion>> {
747        let store = self.inner.db.store();
748        let Some(manager) = store.get_collection(collection) else {
749            return Ok(Vec::new());
750        };
751        let mut versions = Vec::new();
752        for entity in manager.query_all(|_| true) {
753            let EntityData::Row(row) = &entity.data else {
754                continue;
755            };
756            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
757                continue;
758            };
759            versions.push(ConfigVersion::from_keyed_row(version, row));
760        }
761        Ok(super::keyed_spine::latest_versions(versions, prefix))
762    }
763
764    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
765        let mut versions = self.config_versions(collection, key)?;
766        if versions.len() <= CONFIG_HISTORY_LIMIT {
767            return Ok(());
768        }
769        versions = super::keyed_spine::history_versions(versions);
770        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
771        let store = self.inner.db.store();
772        for version in versions.into_iter().take(drop_count) {
773            store
774                .delete(collection, version.id)
775                .map_err(|err| RedDBError::Internal(err.to_string()))?;
776        }
777        Ok(())
778    }
779
780    fn check_config_capability(
781        &self,
782        action: &str,
783        collection: &str,
784        key: &str,
785    ) -> Result<(), String> {
786        let Some(auth_store) = self.inner.auth_store.read().clone() else {
787            return Ok(());
788        };
789        if !auth_store.iam_authorization_enabled() {
790            return Ok(());
791        }
792        let Some((principal, role)) = current_auth_identity() else {
793            return Err(
794                "IAM authorization is enabled; config capability check requires an authenticated principal"
795                    .to_string(),
796            );
797        };
798        let tenant = current_tenant();
799        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
800        let mut resource = crate::auth::policies::ResourceRef::new(
801            "config",
802            config_target_resource(collection, key),
803        );
804        if let Some(ref tenant) = tenant {
805            resource = resource.with_tenant(tenant.clone());
806        }
807        let ctx = crate::auth::policies::EvalContext {
808            principal_tenant: tenant.clone(),
809            current_tenant: tenant,
810            peer_ip: None,
811            mfa_present: false,
812            now_ms: crate::utils::now_unix_millis() as u128,
813            principal_is_admin_role: role == crate::auth::Role::Admin,
814            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
815            principal_is_platform_scoped: principal_id.tenant.is_none(),
816        };
817        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
818            Ok(())
819        } else {
820            Err(format!(
821                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
822                principal,
823                action,
824                config_target_resource(collection, key)
825            ))
826        }
827    }
828
829    fn check_system_config_capability(
830        &self,
831        action: &str,
832        collection: &str,
833        key: &str,
834    ) -> Result<(), String> {
835        if collection != "red.config" {
836            return Ok(());
837        }
838        self.check_config_capability(action, collection, key)
839    }
840
841    pub fn config_watch_events_since(
842        &self,
843        collection: &str,
844        key: &str,
845        since_lsn: u64,
846        max_count: usize,
847    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
848        self.kv_watch_events_since(collection, key, since_lsn, max_count)
849            .into_iter()
850            .map(|event| self.policy_filter_config_watch_event(event))
851            .collect()
852    }
853
854    pub fn config_watch_events_since_prefix(
855        &self,
856        collection: &str,
857        prefix: &str,
858        since_lsn: u64,
859        max_count: usize,
860    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
861        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
862            .into_iter()
863            .map(|event| self.policy_filter_config_watch_event(event))
864            .collect()
865    }
866
867    fn policy_filter_config_watch_event(
868        &self,
869        mut event: crate::replication::cdc::KvWatchEvent,
870    ) -> crate::replication::cdc::KvWatchEvent {
871        if self
872            .check_config_capability("config:read", &event.collection, &event.key)
873            .is_err()
874        {
875            event.before = None;
876            event.after = None;
877        }
878        event
879    }
880
881    fn audit_config_resolve(
882        &self,
883        collection: &str,
884        key: &str,
885        secret_ref: Option<&ConfigSecretRef>,
886        outcome: crate::runtime::audit_log::Outcome,
887        reason: &str,
888    ) {
889        let actor = current_auth_identity()
890            .map(|(principal, _)| principal)
891            .unwrap_or_else(|| "anonymous".to_string());
892        let request_id = match current_connection_id() {
893            0 => "embedded".to_string(),
894            id => format!("conn-{id}"),
895        };
896        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
897            .principal(actor.clone())
898            .source(crate::runtime::audit_log::AuditAuthSource::Password)
899            .resource(format!(
900                "config:{}",
901                config_target_resource(collection, key)
902            ))
903            .outcome(outcome)
904            .correlation_id(request_id.clone())
905            .fields([
906                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
907                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
908                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
909                crate::runtime::audit_log::AuditFieldEscaper::field(
910                    "target",
911                    config_target_resource(collection, key),
912                ),
913                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
914                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
915                crate::runtime::audit_log::AuditFieldEscaper::field(
916                    "connection_id",
917                    current_connection_id(),
918                ),
919            ]);
920        if let Some(tenant) = current_tenant() {
921            builder = builder.tenant(tenant);
922        }
923        if let Some(secret_ref) = secret_ref {
924            builder = builder.fields([
925                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
926                crate::runtime::audit_log::AuditFieldEscaper::field(
927                    "resolved_collection",
928                    secret_ref.collection.as_str(),
929                ),
930                crate::runtime::audit_log::AuditFieldEscaper::field(
931                    "resolved_key",
932                    secret_ref.key.as_str(),
933                ),
934                crate::runtime::audit_log::AuditFieldEscaper::field(
935                    "resolved_target",
936                    format!("{}.{}", secret_ref.collection, secret_ref.key),
937                ),
938            ]);
939        }
940        self.audit_log().record_event(builder.build());
941    }
942}
943
944fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
945    let Value::Json(bytes) = value else {
946        return Err(RedDBError::InvalidConfig(
947            "CONFIG value is not a SecretRef".to_string(),
948        ));
949    };
950    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
951        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
952    })?;
953    let Some(object) = json.as_object() else {
954        return Err(RedDBError::InvalidConfig(
955            "CONFIG SecretRef must be an object".to_string(),
956        ));
957    };
958    let get_str = |field: &str| -> RedDBResult<&str> {
959        object
960            .get(field)
961            .and_then(|value| value.as_str())
962            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
963    };
964    if get_str("type")? != "secret_ref" {
965        return Err(RedDBError::InvalidConfig(
966            "CONFIG value is not a SecretRef".to_string(),
967        ));
968    }
969    if get_str("store")? != "vault" {
970        return Err(RedDBError::InvalidConfig(
971            "CONFIG SecretRef store is unsupported".to_string(),
972        ));
973    }
974    Ok(ConfigSecretRef {
975        collection: get_str("collection")?.to_string(),
976        key: get_str("key")?.to_string(),
977    })
978}
979
980fn config_target_resource(collection: &str, key: &str) -> String {
981    if collection == "red.config" {
982        format!("red.config/{}", key.to_ascii_lowercase())
983    } else {
984        format!("{collection}.{key}")
985    }
986}
987
988fn config_write_output(
989    raw_query: &str,
990    collection: &str,
991    key: &str,
992    version: i64,
993    id: EntityId,
994    value_type: Option<ConfigValueType>,
995    schema_version: Option<i64>,
996    tags: &[String],
997    statement: &'static str,
998    affected_rows: u64,
999) -> RuntimeQueryResult {
1000    let mut result = UnifiedResult::with_columns(vec![
1001        "ok".into(),
1002        "collection".into(),
1003        "key".into(),
1004        "version".into(),
1005        "value_type".into(),
1006        "schema_version".into(),
1007        "tags".into(),
1008        "id".into(),
1009    ]);
1010    let mut record = UnifiedRecord::new();
1011    record.set("ok", Value::Boolean(true));
1012    record.set("collection", Value::text(collection.to_string()));
1013    record.set("key", Value::text(key.to_string()));
1014    record.set("version", Value::Integer(version));
1015    record.set("value_type", config_value_type_value(value_type));
1016    record.set(
1017        "schema_version",
1018        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1019    );
1020    record.set("tags", config_tags_value(tags));
1021    record.set("id", Value::Integer(id.raw() as i64));
1022    result.push(record);
1023    RuntimeQueryResult {
1024        query: raw_query.to_string(),
1025        mode: crate::storage::query::modes::QueryMode::Sql,
1026        statement,
1027        engine: "config",
1028        result,
1029        affected_rows,
1030        statement_type: if statement == "delete" {
1031            "delete"
1032        } else {
1033            "update"
1034        },
1035    }
1036}
1037
1038fn invalid_config_volatility(operation: &str) -> RedDBError {
1039    RedDBError::InvalidOperation(format!(
1040        "CONFIG does not support KV-only volatility operation {operation}"
1041    ))
1042}
1043
1044fn resolve_config_schema(
1045    latest: Option<&ConfigVersion>,
1046    requested_type: Option<ConfigValueType>,
1047) -> (Option<ConfigValueType>, Option<i64>) {
1048    let previous_type = latest.and_then(|version| version.value_type);
1049    let previous_schema_version = latest.and_then(|version| version.schema_version);
1050    match requested_type {
1051        Some(value_type) if Some(value_type) != previous_type => (
1052            Some(value_type),
1053            Some(previous_schema_version.unwrap_or(0) + 1),
1054        ),
1055        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1056        None => (previous_type, previous_schema_version),
1057    }
1058}
1059
1060fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1061    let valid = match value_type {
1062        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1063        ConfigValueType::Int => matches!(
1064            value,
1065            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1066        ),
1067        ConfigValueType::String => matches!(value, Value::Text(_)),
1068        ConfigValueType::Url => validate_config_url(value),
1069        ConfigValueType::Object => validate_config_json_shape(value, true),
1070        ConfigValueType::Array => {
1071            matches!(value, Value::Array(_) | Value::Vector(_))
1072                || validate_config_json_shape(value, false)
1073        }
1074    };
1075    if valid {
1076        Ok(())
1077    } else {
1078        Err(RedDBError::InvalidConfig(format!(
1079            "CONFIG value type mismatch: expected {}, got {}",
1080            value_type.as_str(),
1081            config_actual_value_type(value),
1082        )))
1083    }
1084}
1085
1086fn validate_config_url(value: &Value) -> bool {
1087    let url = match value {
1088        Value::Url(value) => value.as_str(),
1089        Value::Text(value) => value.as_ref(),
1090        _ => return false,
1091    };
1092    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1093}
1094
1095fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1096    let Value::Json(bytes) = value else {
1097        return false;
1098    };
1099    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1100        return false;
1101    };
1102    matches!(
1103        (object, json),
1104        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1105    )
1106}
1107
1108fn config_actual_value_type(value: &Value) -> &'static str {
1109    match value {
1110        Value::Null => "null",
1111        Value::Boolean(_) => "bool",
1112        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1113        Value::Text(_) => "string",
1114        Value::Url(_) => "url",
1115        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1116            Ok(crate::json::Value::Object(_)) => "object",
1117            Ok(crate::json::Value::Array(_)) => "array",
1118            _ => "json",
1119        },
1120        Value::Array(_) | Value::Vector(_) => "array",
1121        _ => "other",
1122    }
1123}
1124
1125fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1126    value_type
1127        .map(|value_type| Value::text(value_type.as_str()))
1128        .unwrap_or(Value::Null)
1129}
1130
1131fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1132    match value {
1133        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1134        _ => None,
1135    }
1136}
1137
1138fn config_tags_value(tags: &[String]) -> Value {
1139    if tags.is_empty() {
1140        return Value::Null;
1141    }
1142    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1143}
1144
1145fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1146    match value {
1147        Some(Value::Array(values)) => values
1148            .iter()
1149            .filter_map(|value| match value {
1150                Value::Text(tag) => Some(tag.to_string()),
1151                _ => None,
1152            })
1153            .collect(),
1154        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1155            .ok()
1156            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1157            .map(|values| {
1158                values
1159                    .into_iter()
1160                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1161                    .collect()
1162            })
1163            .unwrap_or_default(),
1164        _ => Vec::new(),
1165    }
1166}
1167
1168fn current_unix_ms() -> u64 {
1169    std::time::SystemTime::now()
1170        .duration_since(std::time::UNIX_EPOCH)
1171        .unwrap_or_default()
1172        .as_millis() as u64
1173}