Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17    id: EntityId,
18    key: String,
19    version: i64,
20    value: Value,
21    tombstone: bool,
22    created_at_ms: i64,
23    op: String,
24    value_type: Option<ConfigValueType>,
25    schema_version: Option<i64>,
26    tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30    fn key(&self) -> &str {
31        &self.key
32    }
33
34    fn version(&self) -> i64 {
35        self.version
36    }
37}
38
39impl ConfigVersion {
40    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41        Self {
42            id: version.id,
43            key: version.key,
44            version: version.version,
45            value: version.value,
46            tombstone: version.tombstone,
47            created_at_ms: version.created_at_ms,
48            op: version.op,
49            value_type: row
50                .get_field("value_type")
51                .and_then(config_value_type_from_value),
52            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53            tags: config_tags_from_value(row.get_field("tags")),
54        }
55    }
56}
57
58struct ConfigSecretRef {
59    collection: String,
60    key: String,
61}
62
63impl RedDBRuntime {
64    pub fn execute_config_command(
65        &self,
66        raw_query: &str,
67        cmd: &crate::storage::query::ast::ConfigCommand,
68    ) -> RedDBResult<RuntimeQueryResult> {
69        use crate::storage::query::ast::ConfigCommand;
70
71        match cmd {
72            ConfigCommand::Put {
73                collection,
74                key,
75                value,
76                value_type,
77                tags,
78            } => self.config_write_result(
79                raw_query,
80                collection,
81                key,
82                value.clone(),
83                *value_type,
84                tags,
85                "put",
86            ),
87            ConfigCommand::Rotate {
88                collection,
89                key,
90                value,
91                value_type,
92                tags,
93            } => self.config_write_result(
94                raw_query,
95                collection,
96                key,
97                value.clone(),
98                *value_type,
99                tags,
100                "rotate",
101            ),
102            ConfigCommand::Get { collection, key } => {
103                self.config_get_result(raw_query, collection, key)
104            }
105            ConfigCommand::Resolve { collection, key } => {
106                self.config_resolve_result(raw_query, collection, key)
107            }
108            ConfigCommand::Delete { collection, key } => {
109                self.config_delete_result(raw_query, collection, key)
110            }
111            ConfigCommand::History { collection, key } => {
112                self.config_history_result(raw_query, collection, key)
113            }
114            ConfigCommand::List {
115                collection,
116                prefix,
117                limit,
118                offset,
119            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120            ConfigCommand::Watch {
121                collection,
122                key,
123                prefix,
124                from_lsn,
125            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127                Err(invalid_config_volatility(operation))
128            }
129        }
130    }
131
132    pub(crate) fn validate_config_command_before_auth(
133        &self,
134        cmd: &crate::storage::query::ast::ConfigCommand,
135    ) -> RedDBResult<()> {
136        use crate::storage::query::ast::ConfigCommand;
137        match cmd {
138            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139                Err(invalid_config_volatility(operation))
140            }
141            ConfigCommand::Put { collection, .. }
142            | ConfigCommand::Get { collection, .. }
143            | ConfigCommand::Resolve { collection, .. }
144            | ConfigCommand::Rotate { collection, .. }
145            | ConfigCommand::Delete { collection, .. }
146            | ConfigCommand::History { collection, .. }
147            | ConfigCommand::List { collection, .. }
148            | ConfigCommand::Watch { collection, .. } => {
149                let snapshot = self.inner.db.catalog_model_snapshot();
150                let Some(actual_model) = snapshot
151                    .collections
152                    .iter()
153                    .find(|c| c.name == *collection)
154                    .map(|c| c.declared_model.unwrap_or(c.model))
155                else {
156                    return Ok(());
157                };
158                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159                    CollectionModel::Config,
160                    actual_model,
161                )
162            }
163        }
164    }
165
166    fn config_resolve_result(
167        &self,
168        raw_query: &str,
169        collection: &str,
170        key: &str,
171    ) -> RedDBResult<RuntimeQueryResult> {
172        let latest = self.latest_config_version(collection, key)?;
173        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174            self.audit_config_resolve(
175                collection,
176                key,
177                None,
178                crate::runtime::audit_log::Outcome::Denied,
179                &reason,
180            );
181            return Err(RedDBError::Query(reason));
182        }
183
184        let Some(version) = latest else {
185            let reason = "not_found";
186            self.audit_config_resolve(
187                collection,
188                key,
189                None,
190                crate::runtime::audit_log::Outcome::Denied,
191                reason,
192            );
193            return Err(RedDBError::NotFound(format!(
194                "config '{}.{}' not found",
195                collection, key
196            )));
197        };
198        if version.tombstone {
199            let reason = "deleted";
200            self.audit_config_resolve(
201                collection,
202                key,
203                None,
204                crate::runtime::audit_log::Outcome::Denied,
205                reason,
206            );
207            return Err(RedDBError::NotFound(format!(
208                "config '{}.{}' is deleted",
209                collection, key
210            )));
211        }
212
213        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214            self.audit_config_resolve(
215                collection,
216                key,
217                None,
218                crate::runtime::audit_log::Outcome::Error,
219                &err.to_string(),
220            );
221        })?;
222
223        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224            Ok(value) => {
225                self.audit_config_resolve(
226                    collection,
227                    key,
228                    Some(&secret_ref),
229                    crate::runtime::audit_log::Outcome::Success,
230                    "ok",
231                );
232                let mut result = UnifiedResult::with_columns(vec![
233                    "collection".into(),
234                    "key".into(),
235                    "value".into(),
236                    "resolved_store".into(),
237                    "resolved_collection".into(),
238                    "resolved_key".into(),
239                ]);
240                let mut record = UnifiedRecord::new();
241                record.set("collection", Value::text(collection.to_string()));
242                record.set("key", Value::text(key.to_string()));
243                record.set("value", value);
244                record.set("resolved_store", Value::text("vault"));
245                record.set("resolved_collection", Value::text(secret_ref.collection));
246                record.set("resolved_key", Value::text(secret_ref.key));
247                result.push(record);
248                Ok(RuntimeQueryResult {
249                    query: raw_query.to_string(),
250                    mode: crate::storage::query::modes::QueryMode::Sql,
251                    statement: "config_resolve",
252                    engine: "config",
253                    result,
254                    affected_rows: 0,
255                    statement_type: "select",
256                })
257            }
258            Err(err) => {
259                let reason = err.to_string();
260                let outcome = if reason.contains("denied") {
261                    crate::runtime::audit_log::Outcome::Denied
262                } else {
263                    crate::runtime::audit_log::Outcome::Error
264                };
265                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266                Err(err)
267            }
268        }
269    }
270
271    fn config_write_result(
272        &self,
273        raw_query: &str,
274        collection: &str,
275        key: &str,
276        value: Value,
277        requested_type: Option<ConfigValueType>,
278        tags: &[String],
279        op: &str,
280    ) -> RedDBResult<RuntimeQueryResult> {
281        self.check_system_config_capability("config:write", collection, key)
282            .map_err(RedDBError::Query)?;
283        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284        self.ensure_config_collection(collection)?;
285        let latest = self.latest_config_version(collection, key)?;
286        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288        if let Some(value_type) = value_type {
289            validate_config_value_type(&value, value_type)?;
290        }
291        let before = latest.as_ref().and_then(|version| {
292            if version.tombstone {
293                None
294            } else {
295                Some(crate::presentation::entity_json::storage_value_to_json(
296                    &version.value,
297                ))
298            }
299        });
300        let after = Some(crate::presentation::entity_json::storage_value_to_json(
301            &value,
302        ));
303        let change_op = if latest.is_some() {
304            crate::replication::cdc::ChangeOperation::Update
305        } else {
306            crate::replication::cdc::ChangeOperation::Insert
307        };
308        let id = self.append_config_version(
309            collection,
310            key,
311            value,
312            version,
313            false,
314            op,
315            value_type,
316            schema_version,
317            tags,
318        )?;
319        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320        self.prune_config_history(collection, key)?;
321        self.invalidate_result_cache();
322        Ok(config_write_output(
323            raw_query,
324            collection,
325            key,
326            version,
327            id,
328            value_type,
329            schema_version,
330            tags,
331            match op {
332                "rotate" => "config_rotate",
333                _ => "config_put",
334            },
335            1,
336        ))
337    }
338
339    fn config_delete_result(
340        &self,
341        raw_query: &str,
342        collection: &str,
343        key: &str,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        self.check_system_config_capability("config:write", collection, key)
346            .map_err(RedDBError::Query)?;
347        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348        self.ensure_config_collection(collection)?;
349        let latest = self.latest_config_version(collection, key)?;
350        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351        let value_type = latest.as_ref().and_then(|version| version.value_type);
352        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353        let id = self.append_config_version(
354            collection,
355            key,
356            Value::Null,
357            version,
358            true,
359            "delete",
360            value_type,
361            schema_version,
362            &[],
363        )?;
364        if let Some(before) = latest.as_ref().and_then(|version| {
365            if version.tombstone {
366                None
367            } else {
368                Some(crate::presentation::entity_json::storage_value_to_json(
369                    &version.value,
370                ))
371            }
372        }) {
373            self.record_kv_watch_event(
374                crate::replication::cdc::ChangeOperation::Delete,
375                collection,
376                key,
377                id.raw(),
378                Some(before),
379                None,
380            );
381        }
382        self.prune_config_history(collection, key)?;
383        self.invalidate_result_cache();
384        Ok(config_write_output(
385            raw_query,
386            collection,
387            key,
388            version,
389            id,
390            value_type,
391            schema_version,
392            &[],
393            "delete",
394            1,
395        ))
396    }
397
398    fn config_get_result(
399        &self,
400        raw_query: &str,
401        collection: &str,
402        key: &str,
403    ) -> RedDBResult<RuntimeQueryResult> {
404        self.check_system_config_capability("config:read", collection, key)
405            .map_err(RedDBError::Query)?;
406        let latest = self.latest_config_version(collection, key)?;
407        let mut result = UnifiedResult::with_columns(vec![
408            "collection".into(),
409            "key".into(),
410            "value".into(),
411            "version".into(),
412            "value_type".into(),
413            "schema_version".into(),
414            "tags".into(),
415            "tombstone".into(),
416        ]);
417        let mut record = UnifiedRecord::new();
418        record.set("collection", Value::text(collection.to_string()));
419        record.set("key", Value::text(key.to_string()));
420        if let Some(version) = latest {
421            record.set("value", version.value);
422            record.set("version", Value::Integer(version.version));
423            record.set("value_type", config_value_type_value(version.value_type));
424            record.set(
425                "schema_version",
426                version
427                    .schema_version
428                    .map(Value::Integer)
429                    .unwrap_or(Value::Null),
430            );
431            record.set("tags", config_tags_value(&version.tags));
432            record.set("tombstone", Value::Boolean(version.tombstone));
433        } else {
434            record.set("value", Value::Null);
435            record.set("version", Value::Null);
436            record.set("value_type", Value::Null);
437            record.set("schema_version", Value::Null);
438            record.set("tags", Value::Null);
439            record.set("tombstone", Value::Boolean(false));
440        }
441        result.push(record);
442        Ok(RuntimeQueryResult {
443            query: raw_query.to_string(),
444            mode: crate::storage::query::modes::QueryMode::Sql,
445            statement: "config_get",
446            engine: "config",
447            result,
448            affected_rows: 0,
449            statement_type: "select",
450        })
451    }
452
453    fn config_history_result(
454        &self,
455        raw_query: &str,
456        collection: &str,
457        key: &str,
458    ) -> RedDBResult<RuntimeQueryResult> {
459        self.check_system_config_capability("config:read", collection, key)
460            .map_err(RedDBError::Query)?;
461        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462        let mut result = UnifiedResult::with_columns(vec![
463            "collection".into(),
464            "key".into(),
465            "version".into(),
466            "value".into(),
467            "value_type".into(),
468            "schema_version".into(),
469            "tags".into(),
470            "tombstone".into(),
471            "op".into(),
472            "created_at_ms".into(),
473        ]);
474        for version in versions {
475            let mut record = UnifiedRecord::new();
476            record.set("collection", Value::text(collection.to_string()));
477            record.set("key", Value::text(key.to_string()));
478            record.set("version", Value::Integer(version.version));
479            record.set("value", version.value);
480            record.set("value_type", config_value_type_value(version.value_type));
481            record.set(
482                "schema_version",
483                version
484                    .schema_version
485                    .map(Value::Integer)
486                    .unwrap_or(Value::Null),
487            );
488            record.set("tags", Value::Null);
489            record.set("tombstone", Value::Boolean(version.tombstone));
490            record.set("op", Value::text(version.op));
491            record.set("created_at_ms", Value::Integer(version.created_at_ms));
492            result.push(record);
493        }
494        Ok(RuntimeQueryResult {
495            query: raw_query.to_string(),
496            mode: crate::storage::query::modes::QueryMode::Sql,
497            statement: "config_history",
498            engine: "config",
499            result,
500            affected_rows: 0,
501            statement_type: "select",
502        })
503    }
504
505    fn config_list_result(
506        &self,
507        raw_query: &str,
508        collection: &str,
509        prefix: Option<&str>,
510        limit: Option<usize>,
511        offset: usize,
512    ) -> RedDBResult<RuntimeQueryResult> {
513        let mut versions = self.latest_config_versions(collection, prefix)?;
514        versions.sort_by(|left, right| left.key.cmp(&right.key));
515        let mut result = UnifiedResult::with_columns(vec![
516            "collection".into(),
517            "key".into(),
518            "value".into(),
519            "version".into(),
520            "value_type".into(),
521            "schema_version".into(),
522            "tags".into(),
523            "tombstone".into(),
524            "op".into(),
525            "created_at_ms".into(),
526        ]);
527        for version in versions
528            .into_iter()
529            .filter(|version| {
530                self.check_config_capability("config:read", collection, &version.key)
531                    .is_ok()
532            })
533            .skip(offset)
534            .take(limit.unwrap_or(usize::MAX))
535        {
536            let mut record = UnifiedRecord::new();
537            record.set("collection", Value::text(collection.to_string()));
538            record.set("key", Value::text(version.key));
539            record.set("value", version.value);
540            record.set("version", Value::Integer(version.version));
541            record.set("value_type", config_value_type_value(version.value_type));
542            record.set(
543                "schema_version",
544                version
545                    .schema_version
546                    .map(Value::Integer)
547                    .unwrap_or(Value::Null),
548            );
549            record.set("tags", config_tags_value(&version.tags));
550            record.set("tombstone", Value::Boolean(version.tombstone));
551            record.set("op", Value::text(version.op));
552            record.set("created_at_ms", Value::Integer(version.created_at_ms));
553            result.push(record);
554        }
555        Ok(RuntimeQueryResult {
556            query: raw_query.to_string(),
557            mode: crate::storage::query::modes::QueryMode::Sql,
558            statement: "config_list",
559            engine: "config",
560            result,
561            affected_rows: 0,
562            statement_type: "select",
563        })
564    }
565
566    fn config_watch_result(
567        &self,
568        raw_query: &str,
569        collection: &str,
570        key: &str,
571        prefix: bool,
572        from_lsn: Option<u64>,
573    ) -> RedDBResult<RuntimeQueryResult> {
574        let watch_key = if prefix {
575            format!("{key}.*")
576        } else {
577            key.to_string()
578        };
579        let endpoint = match from_lsn {
580            Some(lsn) => {
581                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582            }
583            None => format!("/collections/{collection}/config/{watch_key}/watch"),
584        };
585        let mut result = UnifiedResult::with_columns(vec![
586            "collection".into(),
587            "key".into(),
588            "prefix".into(),
589            "from_lsn".into(),
590            "watch_url".into(),
591            "streaming".into(),
592        ]);
593        let mut record = UnifiedRecord::new();
594        record.set("collection", Value::text(collection.to_string()));
595        record.set("key", Value::text(watch_key));
596        record.set("prefix", Value::Boolean(prefix));
597        record.set(
598            "from_lsn",
599            from_lsn
600                .map(Value::UnsignedInteger)
601                .unwrap_or(crate::storage::schema::Value::Null),
602        );
603        record.set("watch_url", Value::text(endpoint));
604        record.set("streaming", Value::Boolean(true));
605        result.push(record);
606        Ok(RuntimeQueryResult {
607            query: raw_query.to_string(),
608            mode: crate::storage::query::modes::QueryMode::Sql,
609            statement: "config_watch",
610            engine: "config",
611            result,
612            affected_rows: 0,
613            statement_type: "stream",
614        })
615    }
616
617    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618        let store = self.inner.db.store();
619        if store.get_collection(collection).is_none() {
620            store
621                .create_collection(collection)
622                .map_err(|err| RedDBError::Internal(err.to_string()))?;
623        }
624        if let Some(contract) = self.inner.db.collection_contract(collection) {
625            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626                CollectionModel::Config,
627                contract.declared_model,
628            )?;
629            return Ok(());
630        }
631        let now = current_unix_ms();
632        self.inner
633            .db
634            .save_collection_contract(CollectionContract {
635                name: collection.to_string(),
636                declared_model: CollectionModel::Config,
637                schema_mode: SchemaMode::Dynamic,
638                origin: ContractOrigin::Explicit,
639                version: 1,
640                created_at_unix_ms: now as u128,
641                updated_at_unix_ms: now as u128,
642                default_ttl_ms: None,
643                vector_dimension: None,
644                vector_metric: None,
645                context_index_fields: Vec::new(),
646                declared_columns: Vec::new(),
647                table_def: None,
648                timestamps_enabled: false,
649                context_index_enabled: false,
650                metrics_raw_retention_ms: None,
651                metrics_rollup_policies: Vec::new(),
652                metrics_tenant_identity: None,
653                metrics_namespace: None,
654                append_only: false,
655                subscriptions: Vec::new(),
656            })
657            .map(|_| ())
658            .map_err(|err| RedDBError::Internal(err.to_string()))
659    }
660
661    fn append_config_version(
662        &self,
663        collection: &str,
664        key: &str,
665        value: Value,
666        version: i64,
667        tombstone: bool,
668        op: &str,
669        value_type: Option<ConfigValueType>,
670        schema_version: Option<i64>,
671        tags: &[String],
672    ) -> RedDBResult<EntityId> {
673        let now = current_unix_ms() as i64;
674        let fields = vec![
675            ("key".to_string(), Value::text(key.to_string())),
676            ("value".to_string(), value),
677            ("version".to_string(), Value::Integer(version)),
678            (
679                "value_type".to_string(),
680                config_value_type_value(value_type),
681            ),
682            (
683                "schema_version".to_string(),
684                schema_version.map(Value::Integer).unwrap_or(Value::Null),
685            ),
686            ("tombstone".to_string(), Value::Boolean(tombstone)),
687            ("op".to_string(), Value::text(op.to_string())),
688            ("created_at_ms".to_string(), Value::Integer(now)),
689            ("tags".to_string(), config_tags_value(tags)),
690        ];
691        let mut row = RowData::new(Vec::new());
692        row.named = Some(fields.into_iter().collect());
693        let entity = UnifiedEntity::new(
694            EntityId::new(0),
695            EntityKind::TableRow {
696                table: Arc::from(collection),
697                row_id: 0,
698            },
699            EntityData::Row(row),
700        );
701        self.inner
702            .db
703            .store()
704            .insert(collection, entity)
705            .map_err(|err| RedDBError::Internal(err.to_string()))
706    }
707
708    fn latest_config_version(
709        &self,
710        collection: &str,
711        key: &str,
712    ) -> RedDBResult<Option<ConfigVersion>> {
713        Ok(super::keyed_spine::latest_version(
714            self.config_versions(collection, key)?,
715        ))
716    }
717
718    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
719        let store = self.inner.db.store();
720        let Some(manager) = store.get_collection(collection) else {
721            return Ok(Vec::new());
722        };
723        let mut versions = Vec::new();
724        for entity in manager.query_all(|_| true) {
725            let EntityData::Row(row) = &entity.data else {
726                continue;
727            };
728            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
729                continue;
730            };
731            if version.key != key {
732                continue;
733            }
734            versions.push(ConfigVersion::from_keyed_row(version, row));
735        }
736        Ok(versions)
737    }
738
739    fn latest_config_versions(
740        &self,
741        collection: &str,
742        prefix: Option<&str>,
743    ) -> RedDBResult<Vec<ConfigVersion>> {
744        let store = self.inner.db.store();
745        let Some(manager) = store.get_collection(collection) else {
746            return Ok(Vec::new());
747        };
748        let mut versions = Vec::new();
749        for entity in manager.query_all(|_| true) {
750            let EntityData::Row(row) = &entity.data else {
751                continue;
752            };
753            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
754                continue;
755            };
756            versions.push(ConfigVersion::from_keyed_row(version, row));
757        }
758        Ok(super::keyed_spine::latest_versions(versions, prefix))
759    }
760
761    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
762        let mut versions = self.config_versions(collection, key)?;
763        if versions.len() <= CONFIG_HISTORY_LIMIT {
764            return Ok(());
765        }
766        versions = super::keyed_spine::history_versions(versions);
767        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
768        let store = self.inner.db.store();
769        for version in versions.into_iter().take(drop_count) {
770            store
771                .delete(collection, version.id)
772                .map_err(|err| RedDBError::Internal(err.to_string()))?;
773        }
774        Ok(())
775    }
776
777    fn check_config_capability(
778        &self,
779        action: &str,
780        collection: &str,
781        key: &str,
782    ) -> Result<(), String> {
783        let Some(auth_store) = self.inner.auth_store.read().clone() else {
784            return Ok(());
785        };
786        if !auth_store.iam_authorization_enabled() {
787            return Ok(());
788        }
789        let Some((principal, role)) = current_auth_identity() else {
790            return Err(
791                "IAM authorization is enabled; config capability check requires an authenticated principal"
792                    .to_string(),
793            );
794        };
795        let tenant = current_tenant();
796        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
797        let mut resource = crate::auth::policies::ResourceRef::new(
798            "config",
799            config_target_resource(collection, key),
800        );
801        if let Some(ref tenant) = tenant {
802            resource = resource.with_tenant(tenant.clone());
803        }
804        let ctx = crate::auth::policies::EvalContext {
805            principal_tenant: tenant.clone(),
806            current_tenant: tenant,
807            peer_ip: None,
808            mfa_present: false,
809            now_ms: crate::utils::now_unix_millis() as u128,
810            principal_is_admin_role: role == crate::auth::Role::Admin,
811        };
812        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
813            Ok(())
814        } else {
815            Err(format!(
816                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
817                principal,
818                action,
819                config_target_resource(collection, key)
820            ))
821        }
822    }
823
824    fn check_system_config_capability(
825        &self,
826        action: &str,
827        collection: &str,
828        key: &str,
829    ) -> Result<(), String> {
830        if collection != "red.config" {
831            return Ok(());
832        }
833        self.check_config_capability(action, collection, key)
834    }
835
836    pub fn config_watch_events_since(
837        &self,
838        collection: &str,
839        key: &str,
840        since_lsn: u64,
841        max_count: usize,
842    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
843        self.kv_watch_events_since(collection, key, since_lsn, max_count)
844            .into_iter()
845            .map(|event| self.policy_filter_config_watch_event(event))
846            .collect()
847    }
848
849    pub fn config_watch_events_since_prefix(
850        &self,
851        collection: &str,
852        prefix: &str,
853        since_lsn: u64,
854        max_count: usize,
855    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
856        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
857            .into_iter()
858            .map(|event| self.policy_filter_config_watch_event(event))
859            .collect()
860    }
861
862    fn policy_filter_config_watch_event(
863        &self,
864        mut event: crate::replication::cdc::KvWatchEvent,
865    ) -> crate::replication::cdc::KvWatchEvent {
866        if self
867            .check_config_capability("config:read", &event.collection, &event.key)
868            .is_err()
869        {
870            event.before = None;
871            event.after = None;
872        }
873        event
874    }
875
876    fn audit_config_resolve(
877        &self,
878        collection: &str,
879        key: &str,
880        secret_ref: Option<&ConfigSecretRef>,
881        outcome: crate::runtime::audit_log::Outcome,
882        reason: &str,
883    ) {
884        let actor = current_auth_identity()
885            .map(|(principal, _)| principal)
886            .unwrap_or_else(|| "anonymous".to_string());
887        let request_id = match current_connection_id() {
888            0 => "embedded".to_string(),
889            id => format!("conn-{id}"),
890        };
891        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
892            .principal(actor.clone())
893            .source(crate::runtime::audit_log::AuditAuthSource::Password)
894            .resource(format!(
895                "config:{}",
896                config_target_resource(collection, key)
897            ))
898            .outcome(outcome)
899            .correlation_id(request_id.clone())
900            .fields([
901                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
902                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
903                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
904                crate::runtime::audit_log::AuditFieldEscaper::field(
905                    "target",
906                    config_target_resource(collection, key),
907                ),
908                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
909                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
910                crate::runtime::audit_log::AuditFieldEscaper::field(
911                    "connection_id",
912                    current_connection_id(),
913                ),
914            ]);
915        if let Some(tenant) = current_tenant() {
916            builder = builder.tenant(tenant);
917        }
918        if let Some(secret_ref) = secret_ref {
919            builder = builder.fields([
920                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
921                crate::runtime::audit_log::AuditFieldEscaper::field(
922                    "resolved_collection",
923                    secret_ref.collection.as_str(),
924                ),
925                crate::runtime::audit_log::AuditFieldEscaper::field(
926                    "resolved_key",
927                    secret_ref.key.as_str(),
928                ),
929                crate::runtime::audit_log::AuditFieldEscaper::field(
930                    "resolved_target",
931                    format!("{}.{}", secret_ref.collection, secret_ref.key),
932                ),
933            ]);
934        }
935        self.audit_log().record_event(builder.build());
936    }
937}
938
939fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
940    let Value::Json(bytes) = value else {
941        return Err(RedDBError::InvalidConfig(
942            "CONFIG value is not a SecretRef".to_string(),
943        ));
944    };
945    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
946        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
947    })?;
948    let Some(object) = json.as_object() else {
949        return Err(RedDBError::InvalidConfig(
950            "CONFIG SecretRef must be an object".to_string(),
951        ));
952    };
953    let get_str = |field: &str| -> RedDBResult<&str> {
954        object
955            .get(field)
956            .and_then(|value| value.as_str())
957            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
958    };
959    if get_str("type")? != "secret_ref" {
960        return Err(RedDBError::InvalidConfig(
961            "CONFIG value is not a SecretRef".to_string(),
962        ));
963    }
964    if get_str("store")? != "vault" {
965        return Err(RedDBError::InvalidConfig(
966            "CONFIG SecretRef store is unsupported".to_string(),
967        ));
968    }
969    Ok(ConfigSecretRef {
970        collection: get_str("collection")?.to_string(),
971        key: get_str("key")?.to_string(),
972    })
973}
974
975fn config_target_resource(collection: &str, key: &str) -> String {
976    if collection == "red.config" {
977        format!("red.config/{}", key.to_ascii_lowercase())
978    } else {
979        format!("{collection}.{key}")
980    }
981}
982
983fn config_write_output(
984    raw_query: &str,
985    collection: &str,
986    key: &str,
987    version: i64,
988    id: EntityId,
989    value_type: Option<ConfigValueType>,
990    schema_version: Option<i64>,
991    tags: &[String],
992    statement: &'static str,
993    affected_rows: u64,
994) -> RuntimeQueryResult {
995    let mut result = UnifiedResult::with_columns(vec![
996        "ok".into(),
997        "collection".into(),
998        "key".into(),
999        "version".into(),
1000        "value_type".into(),
1001        "schema_version".into(),
1002        "tags".into(),
1003        "id".into(),
1004    ]);
1005    let mut record = UnifiedRecord::new();
1006    record.set("ok", Value::Boolean(true));
1007    record.set("collection", Value::text(collection.to_string()));
1008    record.set("key", Value::text(key.to_string()));
1009    record.set("version", Value::Integer(version));
1010    record.set("value_type", config_value_type_value(value_type));
1011    record.set(
1012        "schema_version",
1013        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1014    );
1015    record.set("tags", config_tags_value(tags));
1016    record.set("id", Value::Integer(id.raw() as i64));
1017    result.push(record);
1018    RuntimeQueryResult {
1019        query: raw_query.to_string(),
1020        mode: crate::storage::query::modes::QueryMode::Sql,
1021        statement,
1022        engine: "config",
1023        result,
1024        affected_rows,
1025        statement_type: if statement == "delete" {
1026            "delete"
1027        } else {
1028            "update"
1029        },
1030    }
1031}
1032
1033fn invalid_config_volatility(operation: &str) -> RedDBError {
1034    RedDBError::InvalidOperation(format!(
1035        "CONFIG does not support KV-only volatility operation {operation}"
1036    ))
1037}
1038
1039fn resolve_config_schema(
1040    latest: Option<&ConfigVersion>,
1041    requested_type: Option<ConfigValueType>,
1042) -> (Option<ConfigValueType>, Option<i64>) {
1043    let previous_type = latest.and_then(|version| version.value_type);
1044    let previous_schema_version = latest.and_then(|version| version.schema_version);
1045    match requested_type {
1046        Some(value_type) if Some(value_type) != previous_type => (
1047            Some(value_type),
1048            Some(previous_schema_version.unwrap_or(0) + 1),
1049        ),
1050        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1051        None => (previous_type, previous_schema_version),
1052    }
1053}
1054
1055fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1056    let valid = match value_type {
1057        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1058        ConfigValueType::Int => matches!(
1059            value,
1060            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1061        ),
1062        ConfigValueType::String => matches!(value, Value::Text(_)),
1063        ConfigValueType::Url => validate_config_url(value),
1064        ConfigValueType::Object => validate_config_json_shape(value, true),
1065        ConfigValueType::Array => {
1066            matches!(value, Value::Array(_) | Value::Vector(_))
1067                || validate_config_json_shape(value, false)
1068        }
1069    };
1070    if valid {
1071        Ok(())
1072    } else {
1073        Err(RedDBError::InvalidConfig(format!(
1074            "CONFIG value type mismatch: expected {}, got {}",
1075            value_type.as_str(),
1076            config_actual_value_type(value),
1077        )))
1078    }
1079}
1080
1081fn validate_config_url(value: &Value) -> bool {
1082    let url = match value {
1083        Value::Url(value) => value.as_str(),
1084        Value::Text(value) => value.as_ref(),
1085        _ => return false,
1086    };
1087    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1088}
1089
1090fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1091    let Value::Json(bytes) = value else {
1092        return false;
1093    };
1094    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1095        return false;
1096    };
1097    matches!(
1098        (object, json),
1099        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1100    )
1101}
1102
1103fn config_actual_value_type(value: &Value) -> &'static str {
1104    match value {
1105        Value::Null => "null",
1106        Value::Boolean(_) => "bool",
1107        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1108        Value::Text(_) => "string",
1109        Value::Url(_) => "url",
1110        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1111            Ok(crate::json::Value::Object(_)) => "object",
1112            Ok(crate::json::Value::Array(_)) => "array",
1113            _ => "json",
1114        },
1115        Value::Array(_) | Value::Vector(_) => "array",
1116        _ => "other",
1117    }
1118}
1119
1120fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1121    value_type
1122        .map(|value_type| Value::text(value_type.as_str()))
1123        .unwrap_or(Value::Null)
1124}
1125
1126fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1127    match value {
1128        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1129        _ => None,
1130    }
1131}
1132
1133fn config_tags_value(tags: &[String]) -> Value {
1134    if tags.is_empty() {
1135        return Value::Null;
1136    }
1137    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1138}
1139
1140fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1141    match value {
1142        Some(Value::Array(values)) => values
1143            .iter()
1144            .filter_map(|value| match value {
1145                Value::Text(tag) => Some(tag.to_string()),
1146                _ => None,
1147            })
1148            .collect(),
1149        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1150            .ok()
1151            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1152            .map(|values| {
1153                values
1154                    .into_iter()
1155                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1156                    .collect()
1157            })
1158            .unwrap_or_default(),
1159        _ => Vec::new(),
1160    }
1161}
1162
1163fn current_unix_ms() -> u64 {
1164    std::time::SystemTime::now()
1165        .duration_since(std::time::UNIX_EPOCH)
1166        .unwrap_or_default()
1167        .as_millis() as u64
1168}