Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17    id: EntityId,
18    key: String,
19    version: i64,
20    value: Value,
21    tombstone: bool,
22    created_at_ms: i64,
23    op: String,
24    value_type: Option<ConfigValueType>,
25    schema_version: Option<i64>,
26    tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30    fn key(&self) -> &str {
31        &self.key
32    }
33
34    fn version(&self) -> i64 {
35        self.version
36    }
37}
38
39impl ConfigVersion {
40    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41        Self {
42            id: version.id,
43            key: version.key,
44            version: version.version,
45            value: version.value,
46            tombstone: version.tombstone,
47            created_at_ms: version.created_at_ms,
48            op: version.op,
49            value_type: row
50                .get_field("value_type")
51                .and_then(config_value_type_from_value),
52            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53            tags: config_tags_from_value(row.get_field("tags")),
54        }
55    }
56}
57
58struct ConfigSecretRef {
59    collection: String,
60    key: String,
61}
62
63impl RedDBRuntime {
64    pub fn execute_config_command(
65        &self,
66        raw_query: &str,
67        cmd: &crate::storage::query::ast::ConfigCommand,
68    ) -> RedDBResult<RuntimeQueryResult> {
69        use crate::storage::query::ast::ConfigCommand;
70
71        match cmd {
72            ConfigCommand::Put {
73                collection,
74                key,
75                value,
76                value_type,
77                tags,
78            } => self.config_write_result(
79                raw_query,
80                collection,
81                key,
82                value.clone(),
83                *value_type,
84                tags,
85                "put",
86            ),
87            ConfigCommand::Rotate {
88                collection,
89                key,
90                value,
91                value_type,
92                tags,
93            } => self.config_write_result(
94                raw_query,
95                collection,
96                key,
97                value.clone(),
98                *value_type,
99                tags,
100                "rotate",
101            ),
102            ConfigCommand::Get { collection, key } => {
103                self.config_get_result(raw_query, collection, key)
104            }
105            ConfigCommand::Resolve { collection, key } => {
106                self.config_resolve_result(raw_query, collection, key)
107            }
108            ConfigCommand::Delete { collection, key } => {
109                self.config_delete_result(raw_query, collection, key)
110            }
111            ConfigCommand::History { collection, key } => {
112                self.config_history_result(raw_query, collection, key)
113            }
114            ConfigCommand::List {
115                collection,
116                prefix,
117                limit,
118                offset,
119            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120            ConfigCommand::Watch {
121                collection,
122                key,
123                prefix,
124                from_lsn,
125            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127                Err(invalid_config_volatility(operation))
128            }
129        }
130    }
131
132    pub(crate) fn validate_config_command_before_auth(
133        &self,
134        cmd: &crate::storage::query::ast::ConfigCommand,
135    ) -> RedDBResult<()> {
136        use crate::storage::query::ast::ConfigCommand;
137        match cmd {
138            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139                Err(invalid_config_volatility(operation))
140            }
141            ConfigCommand::Put { collection, .. }
142            | ConfigCommand::Get { collection, .. }
143            | ConfigCommand::Resolve { collection, .. }
144            | ConfigCommand::Rotate { collection, .. }
145            | ConfigCommand::Delete { collection, .. }
146            | ConfigCommand::History { collection, .. }
147            | ConfigCommand::List { collection, .. }
148            | ConfigCommand::Watch { collection, .. } => {
149                let snapshot = self.inner.db.catalog_model_snapshot();
150                let Some(actual_model) = snapshot
151                    .collections
152                    .iter()
153                    .find(|c| c.name == *collection)
154                    .map(|c| c.declared_model.unwrap_or(c.model))
155                else {
156                    return Ok(());
157                };
158                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159                    CollectionModel::Config,
160                    actual_model,
161                )
162            }
163        }
164    }
165
166    fn config_resolve_result(
167        &self,
168        raw_query: &str,
169        collection: &str,
170        key: &str,
171    ) -> RedDBResult<RuntimeQueryResult> {
172        let latest = self.latest_config_version(collection, key)?;
173        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174            self.audit_config_resolve(
175                collection,
176                key,
177                None,
178                crate::runtime::audit_log::Outcome::Denied,
179                &reason,
180            );
181            return Err(RedDBError::Query(reason));
182        }
183
184        let Some(version) = latest else {
185            let reason = "not_found";
186            self.audit_config_resolve(
187                collection,
188                key,
189                None,
190                crate::runtime::audit_log::Outcome::Denied,
191                reason,
192            );
193            return Err(RedDBError::NotFound(format!(
194                "config '{}.{}' not found",
195                collection, key
196            )));
197        };
198        if version.tombstone {
199            let reason = "deleted";
200            self.audit_config_resolve(
201                collection,
202                key,
203                None,
204                crate::runtime::audit_log::Outcome::Denied,
205                reason,
206            );
207            return Err(RedDBError::NotFound(format!(
208                "config '{}.{}' is deleted",
209                collection, key
210            )));
211        }
212
213        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214            self.audit_config_resolve(
215                collection,
216                key,
217                None,
218                crate::runtime::audit_log::Outcome::Error,
219                &err.to_string(),
220            );
221        })?;
222
223        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224            Ok(value) => {
225                self.audit_config_resolve(
226                    collection,
227                    key,
228                    Some(&secret_ref),
229                    crate::runtime::audit_log::Outcome::Success,
230                    "ok",
231                );
232                let mut result = UnifiedResult::with_columns(vec![
233                    "collection".into(),
234                    "key".into(),
235                    "value".into(),
236                    "resolved_store".into(),
237                    "resolved_collection".into(),
238                    "resolved_key".into(),
239                ]);
240                let mut record = UnifiedRecord::new();
241                record.set("collection", Value::text(collection.to_string()));
242                record.set("key", Value::text(key.to_string()));
243                record.set("value", value);
244                record.set("resolved_store", Value::text("vault"));
245                record.set("resolved_collection", Value::text(secret_ref.collection));
246                record.set("resolved_key", Value::text(secret_ref.key));
247                result.push(record);
248                Ok(RuntimeQueryResult {
249                    query: raw_query.to_string(),
250                    mode: crate::storage::query::modes::QueryMode::Sql,
251                    statement: "config_resolve",
252                    engine: "config",
253                    result,
254                    affected_rows: 0,
255                    statement_type: "select",
256                })
257            }
258            Err(err) => {
259                let reason = err.to_string();
260                let outcome = if reason.contains("denied") {
261                    crate::runtime::audit_log::Outcome::Denied
262                } else {
263                    crate::runtime::audit_log::Outcome::Error
264                };
265                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266                Err(err)
267            }
268        }
269    }
270
271    fn config_write_result(
272        &self,
273        raw_query: &str,
274        collection: &str,
275        key: &str,
276        value: Value,
277        requested_type: Option<ConfigValueType>,
278        tags: &[String],
279        op: &str,
280    ) -> RedDBResult<RuntimeQueryResult> {
281        self.check_system_config_capability("config:write", collection, key)
282            .map_err(RedDBError::Query)?;
283        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284        self.ensure_config_collection(collection)?;
285        let latest = self.latest_config_version(collection, key)?;
286        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288        if let Some(value_type) = value_type {
289            validate_config_value_type(&value, value_type)?;
290        }
291        let before = latest.as_ref().and_then(|version| {
292            if version.tombstone {
293                None
294            } else {
295                Some(crate::presentation::entity_json::storage_value_to_json(
296                    &version.value,
297                ))
298            }
299        });
300        let after = Some(crate::presentation::entity_json::storage_value_to_json(
301            &value,
302        ));
303        let change_op = if latest.is_some() {
304            crate::replication::cdc::ChangeOperation::Update
305        } else {
306            crate::replication::cdc::ChangeOperation::Insert
307        };
308        let id = self.append_config_version(
309            collection,
310            key,
311            value,
312            version,
313            false,
314            op,
315            value_type,
316            schema_version,
317            tags,
318        )?;
319        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320        self.prune_config_history(collection, key)?;
321        self.invalidate_result_cache();
322        Ok(config_write_output(
323            raw_query,
324            collection,
325            key,
326            version,
327            id,
328            value_type,
329            schema_version,
330            tags,
331            match op {
332                "rotate" => "config_rotate",
333                _ => "config_put",
334            },
335            1,
336        ))
337    }
338
339    fn config_delete_result(
340        &self,
341        raw_query: &str,
342        collection: &str,
343        key: &str,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        self.check_system_config_capability("config:write", collection, key)
346            .map_err(RedDBError::Query)?;
347        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348        self.ensure_config_collection(collection)?;
349        let latest = self.latest_config_version(collection, key)?;
350        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351        let value_type = latest.as_ref().and_then(|version| version.value_type);
352        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353        let id = self.append_config_version(
354            collection,
355            key,
356            Value::Null,
357            version,
358            true,
359            "delete",
360            value_type,
361            schema_version,
362            &[],
363        )?;
364        if let Some(before) = latest.as_ref().and_then(|version| {
365            if version.tombstone {
366                None
367            } else {
368                Some(crate::presentation::entity_json::storage_value_to_json(
369                    &version.value,
370                ))
371            }
372        }) {
373            self.record_kv_watch_event(
374                crate::replication::cdc::ChangeOperation::Delete,
375                collection,
376                key,
377                id.raw(),
378                Some(before),
379                None,
380            );
381        }
382        self.prune_config_history(collection, key)?;
383        self.invalidate_result_cache();
384        Ok(config_write_output(
385            raw_query,
386            collection,
387            key,
388            version,
389            id,
390            value_type,
391            schema_version,
392            &[],
393            "delete",
394            1,
395        ))
396    }
397
398    fn config_get_result(
399        &self,
400        raw_query: &str,
401        collection: &str,
402        key: &str,
403    ) -> RedDBResult<RuntimeQueryResult> {
404        self.check_system_config_capability("config:read", collection, key)
405            .map_err(RedDBError::Query)?;
406        let latest = self.latest_config_version(collection, key)?;
407        let mut result = UnifiedResult::with_columns(vec![
408            "collection".into(),
409            "key".into(),
410            "value".into(),
411            "version".into(),
412            "value_type".into(),
413            "schema_version".into(),
414            "tags".into(),
415            "tombstone".into(),
416        ]);
417        let mut record = UnifiedRecord::new();
418        record.set("collection", Value::text(collection.to_string()));
419        record.set("key", Value::text(key.to_string()));
420        if let Some(version) = latest {
421            record.set("value", version.value);
422            record.set("version", Value::Integer(version.version));
423            record.set("value_type", config_value_type_value(version.value_type));
424            record.set(
425                "schema_version",
426                version
427                    .schema_version
428                    .map(Value::Integer)
429                    .unwrap_or(Value::Null),
430            );
431            record.set("tags", config_tags_value(&version.tags));
432            record.set("tombstone", Value::Boolean(version.tombstone));
433        } else {
434            record.set("value", Value::Null);
435            record.set("version", Value::Null);
436            record.set("value_type", Value::Null);
437            record.set("schema_version", Value::Null);
438            record.set("tags", Value::Null);
439            record.set("tombstone", Value::Boolean(false));
440        }
441        result.push(record);
442        Ok(RuntimeQueryResult {
443            query: raw_query.to_string(),
444            mode: crate::storage::query::modes::QueryMode::Sql,
445            statement: "config_get",
446            engine: "config",
447            result,
448            affected_rows: 0,
449            statement_type: "select",
450        })
451    }
452
453    fn config_history_result(
454        &self,
455        raw_query: &str,
456        collection: &str,
457        key: &str,
458    ) -> RedDBResult<RuntimeQueryResult> {
459        self.check_system_config_capability("config:read", collection, key)
460            .map_err(RedDBError::Query)?;
461        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462        let mut result = UnifiedResult::with_columns(vec![
463            "collection".into(),
464            "key".into(),
465            "version".into(),
466            "value".into(),
467            "value_type".into(),
468            "schema_version".into(),
469            "tags".into(),
470            "tombstone".into(),
471            "op".into(),
472            "created_at_ms".into(),
473        ]);
474        for version in versions {
475            let mut record = UnifiedRecord::new();
476            record.set("collection", Value::text(collection.to_string()));
477            record.set("key", Value::text(key.to_string()));
478            record.set("version", Value::Integer(version.version));
479            record.set("value", version.value);
480            record.set("value_type", config_value_type_value(version.value_type));
481            record.set(
482                "schema_version",
483                version
484                    .schema_version
485                    .map(Value::Integer)
486                    .unwrap_or(Value::Null),
487            );
488            record.set("tags", Value::Null);
489            record.set("tombstone", Value::Boolean(version.tombstone));
490            record.set("op", Value::text(version.op));
491            record.set("created_at_ms", Value::Integer(version.created_at_ms));
492            result.push(record);
493        }
494        Ok(RuntimeQueryResult {
495            query: raw_query.to_string(),
496            mode: crate::storage::query::modes::QueryMode::Sql,
497            statement: "config_history",
498            engine: "config",
499            result,
500            affected_rows: 0,
501            statement_type: "select",
502        })
503    }
504
505    fn config_list_result(
506        &self,
507        raw_query: &str,
508        collection: &str,
509        prefix: Option<&str>,
510        limit: Option<usize>,
511        offset: usize,
512    ) -> RedDBResult<RuntimeQueryResult> {
513        let mut versions = self.latest_config_versions(collection, prefix)?;
514        versions.sort_by(|left, right| left.key.cmp(&right.key));
515        let mut result = UnifiedResult::with_columns(vec![
516            "collection".into(),
517            "key".into(),
518            "value".into(),
519            "version".into(),
520            "value_type".into(),
521            "schema_version".into(),
522            "tags".into(),
523            "tombstone".into(),
524            "op".into(),
525            "created_at_ms".into(),
526        ]);
527        for version in versions
528            .into_iter()
529            .filter(|version| {
530                self.check_config_capability("config:read", collection, &version.key)
531                    .is_ok()
532            })
533            .skip(offset)
534            .take(limit.unwrap_or(usize::MAX))
535        {
536            let mut record = UnifiedRecord::new();
537            record.set("collection", Value::text(collection.to_string()));
538            record.set("key", Value::text(version.key));
539            record.set("value", version.value);
540            record.set("version", Value::Integer(version.version));
541            record.set("value_type", config_value_type_value(version.value_type));
542            record.set(
543                "schema_version",
544                version
545                    .schema_version
546                    .map(Value::Integer)
547                    .unwrap_or(Value::Null),
548            );
549            record.set("tags", config_tags_value(&version.tags));
550            record.set("tombstone", Value::Boolean(version.tombstone));
551            record.set("op", Value::text(version.op));
552            record.set("created_at_ms", Value::Integer(version.created_at_ms));
553            result.push(record);
554        }
555        Ok(RuntimeQueryResult {
556            query: raw_query.to_string(),
557            mode: crate::storage::query::modes::QueryMode::Sql,
558            statement: "config_list",
559            engine: "config",
560            result,
561            affected_rows: 0,
562            statement_type: "select",
563        })
564    }
565
566    fn config_watch_result(
567        &self,
568        raw_query: &str,
569        collection: &str,
570        key: &str,
571        prefix: bool,
572        from_lsn: Option<u64>,
573    ) -> RedDBResult<RuntimeQueryResult> {
574        let watch_key = if prefix {
575            format!("{key}.*")
576        } else {
577            key.to_string()
578        };
579        let endpoint = match from_lsn {
580            Some(lsn) => {
581                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582            }
583            None => format!("/collections/{collection}/config/{watch_key}/watch"),
584        };
585        let mut result = UnifiedResult::with_columns(vec![
586            "collection".into(),
587            "key".into(),
588            "prefix".into(),
589            "from_lsn".into(),
590            "watch_url".into(),
591            "streaming".into(),
592        ]);
593        let mut record = UnifiedRecord::new();
594        record.set("collection", Value::text(collection.to_string()));
595        record.set("key", Value::text(watch_key));
596        record.set("prefix", Value::Boolean(prefix));
597        record.set(
598            "from_lsn",
599            from_lsn
600                .map(Value::UnsignedInteger)
601                .unwrap_or(crate::storage::schema::Value::Null),
602        );
603        record.set("watch_url", Value::text(endpoint));
604        record.set("streaming", Value::Boolean(true));
605        result.push(record);
606        Ok(RuntimeQueryResult {
607            query: raw_query.to_string(),
608            mode: crate::storage::query::modes::QueryMode::Sql,
609            statement: "config_watch",
610            engine: "config",
611            result,
612            affected_rows: 0,
613            statement_type: "stream",
614        })
615    }
616
617    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618        let store = self.inner.db.store();
619        if store.get_collection(collection).is_none() {
620            store
621                .create_collection(collection)
622                .map_err(|err| RedDBError::Internal(err.to_string()))?;
623        }
624        if let Some(contract) = self.inner.db.collection_contract(collection) {
625            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626                CollectionModel::Config,
627                contract.declared_model,
628            )?;
629            return Ok(());
630        }
631        let now = current_unix_ms();
632        self.inner
633            .db
634            .save_collection_contract(CollectionContract {
635                name: collection.to_string(),
636                declared_model: CollectionModel::Config,
637                schema_mode: SchemaMode::Dynamic,
638                origin: ContractOrigin::Explicit,
639                version: 1,
640                created_at_unix_ms: now as u128,
641                updated_at_unix_ms: now as u128,
642                default_ttl_ms: None,
643                vector_dimension: None,
644                vector_metric: None,
645                context_index_fields: Vec::new(),
646                declared_columns: Vec::new(),
647                table_def: None,
648                timestamps_enabled: false,
649                context_index_enabled: false,
650                metrics_raw_retention_ms: None,
651                metrics_rollup_policies: Vec::new(),
652                metrics_tenant_identity: None,
653                metrics_namespace: None,
654                append_only: false,
655                subscriptions: Vec::new(),
656                session_key: None,
657                session_gap_ms: None,
658                retention_duration_ms: None,
659            })
660            .map(|_| ())
661            .map_err(|err| RedDBError::Internal(err.to_string()))
662    }
663
664    fn append_config_version(
665        &self,
666        collection: &str,
667        key: &str,
668        value: Value,
669        version: i64,
670        tombstone: bool,
671        op: &str,
672        value_type: Option<ConfigValueType>,
673        schema_version: Option<i64>,
674        tags: &[String],
675    ) -> RedDBResult<EntityId> {
676        let now = current_unix_ms() as i64;
677        let fields = vec![
678            ("key".to_string(), Value::text(key.to_string())),
679            ("value".to_string(), value),
680            ("version".to_string(), Value::Integer(version)),
681            (
682                "value_type".to_string(),
683                config_value_type_value(value_type),
684            ),
685            (
686                "schema_version".to_string(),
687                schema_version.map(Value::Integer).unwrap_or(Value::Null),
688            ),
689            ("tombstone".to_string(), Value::Boolean(tombstone)),
690            ("op".to_string(), Value::text(op.to_string())),
691            ("created_at_ms".to_string(), Value::Integer(now)),
692            ("tags".to_string(), config_tags_value(tags)),
693        ];
694        let mut row = RowData::new(Vec::new());
695        row.named = Some(fields.into_iter().collect());
696        let entity = UnifiedEntity::new(
697            EntityId::new(0),
698            EntityKind::TableRow {
699                table: Arc::from(collection),
700                row_id: 0,
701            },
702            EntityData::Row(row),
703        );
704        self.inner
705            .db
706            .store()
707            .insert(collection, entity)
708            .map_err(|err| RedDBError::Internal(err.to_string()))
709    }
710
711    fn latest_config_version(
712        &self,
713        collection: &str,
714        key: &str,
715    ) -> RedDBResult<Option<ConfigVersion>> {
716        Ok(super::keyed_spine::latest_version(
717            self.config_versions(collection, key)?,
718        ))
719    }
720
721    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
722        let store = self.inner.db.store();
723        let Some(manager) = store.get_collection(collection) else {
724            return Ok(Vec::new());
725        };
726        let mut versions = Vec::new();
727        for entity in manager.query_all(|_| true) {
728            let EntityData::Row(row) = &entity.data else {
729                continue;
730            };
731            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
732                continue;
733            };
734            if version.key != key {
735                continue;
736            }
737            versions.push(ConfigVersion::from_keyed_row(version, row));
738        }
739        Ok(versions)
740    }
741
742    fn latest_config_versions(
743        &self,
744        collection: &str,
745        prefix: Option<&str>,
746    ) -> RedDBResult<Vec<ConfigVersion>> {
747        let store = self.inner.db.store();
748        let Some(manager) = store.get_collection(collection) else {
749            return Ok(Vec::new());
750        };
751        let mut versions = Vec::new();
752        for entity in manager.query_all(|_| true) {
753            let EntityData::Row(row) = &entity.data else {
754                continue;
755            };
756            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
757                continue;
758            };
759            versions.push(ConfigVersion::from_keyed_row(version, row));
760        }
761        Ok(super::keyed_spine::latest_versions(versions, prefix))
762    }
763
764    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
765        let mut versions = self.config_versions(collection, key)?;
766        if versions.len() <= CONFIG_HISTORY_LIMIT {
767            return Ok(());
768        }
769        versions = super::keyed_spine::history_versions(versions);
770        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
771        let store = self.inner.db.store();
772        for version in versions.into_iter().take(drop_count) {
773            store
774                .delete(collection, version.id)
775                .map_err(|err| RedDBError::Internal(err.to_string()))?;
776        }
777        Ok(())
778    }
779
780    fn check_config_capability(
781        &self,
782        action: &str,
783        collection: &str,
784        key: &str,
785    ) -> Result<(), String> {
786        let Some(auth_store) = self.inner.auth_store.read().clone() else {
787            return Ok(());
788        };
789        if !auth_store.iam_authorization_enabled() {
790            return Ok(());
791        }
792        let Some((principal, role)) = current_auth_identity() else {
793            return Err(
794                "IAM authorization is enabled; config capability check requires an authenticated principal"
795                    .to_string(),
796            );
797        };
798        let tenant = current_tenant();
799        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
800        let mut resource = crate::auth::policies::ResourceRef::new(
801            "config",
802            config_target_resource(collection, key),
803        );
804        if let Some(ref tenant) = tenant {
805            resource = resource.with_tenant(tenant.clone());
806        }
807        let ctx = crate::auth::policies::EvalContext {
808            principal_tenant: tenant.clone(),
809            current_tenant: tenant,
810            peer_ip: None,
811            mfa_present: false,
812            now_ms: crate::utils::now_unix_millis() as u128,
813            principal_is_admin_role: role == crate::auth::Role::Admin,
814        };
815        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
816            Ok(())
817        } else {
818            Err(format!(
819                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
820                principal,
821                action,
822                config_target_resource(collection, key)
823            ))
824        }
825    }
826
827    fn check_system_config_capability(
828        &self,
829        action: &str,
830        collection: &str,
831        key: &str,
832    ) -> Result<(), String> {
833        if collection != "red.config" {
834            return Ok(());
835        }
836        self.check_config_capability(action, collection, key)
837    }
838
839    pub fn config_watch_events_since(
840        &self,
841        collection: &str,
842        key: &str,
843        since_lsn: u64,
844        max_count: usize,
845    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
846        self.kv_watch_events_since(collection, key, since_lsn, max_count)
847            .into_iter()
848            .map(|event| self.policy_filter_config_watch_event(event))
849            .collect()
850    }
851
852    pub fn config_watch_events_since_prefix(
853        &self,
854        collection: &str,
855        prefix: &str,
856        since_lsn: u64,
857        max_count: usize,
858    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
859        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
860            .into_iter()
861            .map(|event| self.policy_filter_config_watch_event(event))
862            .collect()
863    }
864
865    fn policy_filter_config_watch_event(
866        &self,
867        mut event: crate::replication::cdc::KvWatchEvent,
868    ) -> crate::replication::cdc::KvWatchEvent {
869        if self
870            .check_config_capability("config:read", &event.collection, &event.key)
871            .is_err()
872        {
873            event.before = None;
874            event.after = None;
875        }
876        event
877    }
878
879    fn audit_config_resolve(
880        &self,
881        collection: &str,
882        key: &str,
883        secret_ref: Option<&ConfigSecretRef>,
884        outcome: crate::runtime::audit_log::Outcome,
885        reason: &str,
886    ) {
887        let actor = current_auth_identity()
888            .map(|(principal, _)| principal)
889            .unwrap_or_else(|| "anonymous".to_string());
890        let request_id = match current_connection_id() {
891            0 => "embedded".to_string(),
892            id => format!("conn-{id}"),
893        };
894        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
895            .principal(actor.clone())
896            .source(crate::runtime::audit_log::AuditAuthSource::Password)
897            .resource(format!(
898                "config:{}",
899                config_target_resource(collection, key)
900            ))
901            .outcome(outcome)
902            .correlation_id(request_id.clone())
903            .fields([
904                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
905                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
906                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
907                crate::runtime::audit_log::AuditFieldEscaper::field(
908                    "target",
909                    config_target_resource(collection, key),
910                ),
911                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
912                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
913                crate::runtime::audit_log::AuditFieldEscaper::field(
914                    "connection_id",
915                    current_connection_id(),
916                ),
917            ]);
918        if let Some(tenant) = current_tenant() {
919            builder = builder.tenant(tenant);
920        }
921        if let Some(secret_ref) = secret_ref {
922            builder = builder.fields([
923                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
924                crate::runtime::audit_log::AuditFieldEscaper::field(
925                    "resolved_collection",
926                    secret_ref.collection.as_str(),
927                ),
928                crate::runtime::audit_log::AuditFieldEscaper::field(
929                    "resolved_key",
930                    secret_ref.key.as_str(),
931                ),
932                crate::runtime::audit_log::AuditFieldEscaper::field(
933                    "resolved_target",
934                    format!("{}.{}", secret_ref.collection, secret_ref.key),
935                ),
936            ]);
937        }
938        self.audit_log().record_event(builder.build());
939    }
940}
941
942fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
943    let Value::Json(bytes) = value else {
944        return Err(RedDBError::InvalidConfig(
945            "CONFIG value is not a SecretRef".to_string(),
946        ));
947    };
948    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
949        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
950    })?;
951    let Some(object) = json.as_object() else {
952        return Err(RedDBError::InvalidConfig(
953            "CONFIG SecretRef must be an object".to_string(),
954        ));
955    };
956    let get_str = |field: &str| -> RedDBResult<&str> {
957        object
958            .get(field)
959            .and_then(|value| value.as_str())
960            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
961    };
962    if get_str("type")? != "secret_ref" {
963        return Err(RedDBError::InvalidConfig(
964            "CONFIG value is not a SecretRef".to_string(),
965        ));
966    }
967    if get_str("store")? != "vault" {
968        return Err(RedDBError::InvalidConfig(
969            "CONFIG SecretRef store is unsupported".to_string(),
970        ));
971    }
972    Ok(ConfigSecretRef {
973        collection: get_str("collection")?.to_string(),
974        key: get_str("key")?.to_string(),
975    })
976}
977
978fn config_target_resource(collection: &str, key: &str) -> String {
979    if collection == "red.config" {
980        format!("red.config/{}", key.to_ascii_lowercase())
981    } else {
982        format!("{collection}.{key}")
983    }
984}
985
986fn config_write_output(
987    raw_query: &str,
988    collection: &str,
989    key: &str,
990    version: i64,
991    id: EntityId,
992    value_type: Option<ConfigValueType>,
993    schema_version: Option<i64>,
994    tags: &[String],
995    statement: &'static str,
996    affected_rows: u64,
997) -> RuntimeQueryResult {
998    let mut result = UnifiedResult::with_columns(vec![
999        "ok".into(),
1000        "collection".into(),
1001        "key".into(),
1002        "version".into(),
1003        "value_type".into(),
1004        "schema_version".into(),
1005        "tags".into(),
1006        "id".into(),
1007    ]);
1008    let mut record = UnifiedRecord::new();
1009    record.set("ok", Value::Boolean(true));
1010    record.set("collection", Value::text(collection.to_string()));
1011    record.set("key", Value::text(key.to_string()));
1012    record.set("version", Value::Integer(version));
1013    record.set("value_type", config_value_type_value(value_type));
1014    record.set(
1015        "schema_version",
1016        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1017    );
1018    record.set("tags", config_tags_value(tags));
1019    record.set("id", Value::Integer(id.raw() as i64));
1020    result.push(record);
1021    RuntimeQueryResult {
1022        query: raw_query.to_string(),
1023        mode: crate::storage::query::modes::QueryMode::Sql,
1024        statement,
1025        engine: "config",
1026        result,
1027        affected_rows,
1028        statement_type: if statement == "delete" {
1029            "delete"
1030        } else {
1031            "update"
1032        },
1033    }
1034}
1035
1036fn invalid_config_volatility(operation: &str) -> RedDBError {
1037    RedDBError::InvalidOperation(format!(
1038        "CONFIG does not support KV-only volatility operation {operation}"
1039    ))
1040}
1041
1042fn resolve_config_schema(
1043    latest: Option<&ConfigVersion>,
1044    requested_type: Option<ConfigValueType>,
1045) -> (Option<ConfigValueType>, Option<i64>) {
1046    let previous_type = latest.and_then(|version| version.value_type);
1047    let previous_schema_version = latest.and_then(|version| version.schema_version);
1048    match requested_type {
1049        Some(value_type) if Some(value_type) != previous_type => (
1050            Some(value_type),
1051            Some(previous_schema_version.unwrap_or(0) + 1),
1052        ),
1053        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1054        None => (previous_type, previous_schema_version),
1055    }
1056}
1057
1058fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1059    let valid = match value_type {
1060        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1061        ConfigValueType::Int => matches!(
1062            value,
1063            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1064        ),
1065        ConfigValueType::String => matches!(value, Value::Text(_)),
1066        ConfigValueType::Url => validate_config_url(value),
1067        ConfigValueType::Object => validate_config_json_shape(value, true),
1068        ConfigValueType::Array => {
1069            matches!(value, Value::Array(_) | Value::Vector(_))
1070                || validate_config_json_shape(value, false)
1071        }
1072    };
1073    if valid {
1074        Ok(())
1075    } else {
1076        Err(RedDBError::InvalidConfig(format!(
1077            "CONFIG value type mismatch: expected {}, got {}",
1078            value_type.as_str(),
1079            config_actual_value_type(value),
1080        )))
1081    }
1082}
1083
1084fn validate_config_url(value: &Value) -> bool {
1085    let url = match value {
1086        Value::Url(value) => value.as_str(),
1087        Value::Text(value) => value.as_ref(),
1088        _ => return false,
1089    };
1090    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1091}
1092
1093fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1094    let Value::Json(bytes) = value else {
1095        return false;
1096    };
1097    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1098        return false;
1099    };
1100    matches!(
1101        (object, json),
1102        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1103    )
1104}
1105
1106fn config_actual_value_type(value: &Value) -> &'static str {
1107    match value {
1108        Value::Null => "null",
1109        Value::Boolean(_) => "bool",
1110        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1111        Value::Text(_) => "string",
1112        Value::Url(_) => "url",
1113        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1114            Ok(crate::json::Value::Object(_)) => "object",
1115            Ok(crate::json::Value::Array(_)) => "array",
1116            _ => "json",
1117        },
1118        Value::Array(_) | Value::Vector(_) => "array",
1119        _ => "other",
1120    }
1121}
1122
1123fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1124    value_type
1125        .map(|value_type| Value::text(value_type.as_str()))
1126        .unwrap_or(Value::Null)
1127}
1128
1129fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1130    match value {
1131        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1132        _ => None,
1133    }
1134}
1135
1136fn config_tags_value(tags: &[String]) -> Value {
1137    if tags.is_empty() {
1138        return Value::Null;
1139    }
1140    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1141}
1142
1143fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1144    match value {
1145        Some(Value::Array(values)) => values
1146            .iter()
1147            .filter_map(|value| match value {
1148                Value::Text(tag) => Some(tag.to_string()),
1149                _ => None,
1150            })
1151            .collect(),
1152        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1153            .ok()
1154            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1155            .map(|values| {
1156                values
1157                    .into_iter()
1158                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1159                    .collect()
1160            })
1161            .unwrap_or_default(),
1162        _ => Vec::new(),
1163    }
1164}
1165
1166fn current_unix_ms() -> u64 {
1167    std::time::SystemTime::now()
1168        .duration_since(std::time::UNIX_EPOCH)
1169        .unwrap_or_default()
1170        .as_millis() as u64
1171}