Skip to main content

reddb_server/runtime/
impl_config.rs

1//! Stable Config keyed command execution.
2
3use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17    id: EntityId,
18    key: String,
19    version: i64,
20    value: Value,
21    tombstone: bool,
22    created_at_ms: i64,
23    op: String,
24    value_type: Option<ConfigValueType>,
25    schema_version: Option<i64>,
26    tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30    fn key(&self) -> &str {
31        &self.key
32    }
33
34    fn version(&self) -> i64 {
35        self.version
36    }
37}
38
39impl ConfigVersion {
40    fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41        Self {
42            id: version.id,
43            key: version.key,
44            version: version.version,
45            value: version.value,
46            tombstone: version.tombstone,
47            created_at_ms: version.created_at_ms,
48            op: version.op,
49            value_type: row
50                .get_field("value_type")
51                .and_then(config_value_type_from_value),
52            schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53            tags: config_tags_from_value(row.get_field("tags")),
54        }
55    }
56}
57
58struct ConfigSecretRef {
59    collection: String,
60    key: String,
61}
62
63impl RedDBRuntime {
64    pub fn execute_config_command(
65        &self,
66        raw_query: &str,
67        cmd: &crate::storage::query::ast::ConfigCommand,
68    ) -> RedDBResult<RuntimeQueryResult> {
69        use crate::storage::query::ast::ConfigCommand;
70
71        match cmd {
72            ConfigCommand::Put {
73                collection,
74                key,
75                value,
76                value_type,
77                tags,
78            } => self.config_write_result(
79                raw_query,
80                collection,
81                key,
82                value.clone(),
83                *value_type,
84                tags,
85                "put",
86            ),
87            ConfigCommand::Rotate {
88                collection,
89                key,
90                value,
91                value_type,
92                tags,
93            } => self.config_write_result(
94                raw_query,
95                collection,
96                key,
97                value.clone(),
98                *value_type,
99                tags,
100                "rotate",
101            ),
102            ConfigCommand::Get { collection, key } => {
103                self.config_get_result(raw_query, collection, key)
104            }
105            ConfigCommand::Resolve { collection, key } => {
106                self.config_resolve_result(raw_query, collection, key)
107            }
108            ConfigCommand::Delete { collection, key } => {
109                self.config_delete_result(raw_query, collection, key)
110            }
111            ConfigCommand::History { collection, key } => {
112                self.config_history_result(raw_query, collection, key)
113            }
114            ConfigCommand::List {
115                collection,
116                prefix,
117                limit,
118                offset,
119            } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120            ConfigCommand::Watch {
121                collection,
122                key,
123                prefix,
124                from_lsn,
125            } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127                Err(invalid_config_volatility(operation))
128            }
129        }
130    }
131
132    pub(crate) fn validate_config_command_before_auth(
133        &self,
134        cmd: &crate::storage::query::ast::ConfigCommand,
135    ) -> RedDBResult<()> {
136        use crate::storage::query::ast::ConfigCommand;
137        match cmd {
138            ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139                Err(invalid_config_volatility(operation))
140            }
141            ConfigCommand::Put { collection, .. }
142            | ConfigCommand::Get { collection, .. }
143            | ConfigCommand::Resolve { collection, .. }
144            | ConfigCommand::Rotate { collection, .. }
145            | ConfigCommand::Delete { collection, .. }
146            | ConfigCommand::History { collection, .. }
147            | ConfigCommand::List { collection, .. }
148            | ConfigCommand::Watch { collection, .. } => {
149                let snapshot = self.inner.db.catalog_model_snapshot();
150                let Some(actual_model) = snapshot
151                    .collections
152                    .iter()
153                    .find(|c| c.name == *collection)
154                    .map(|c| c.declared_model.unwrap_or(c.model))
155                else {
156                    return Ok(());
157                };
158                crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159                    CollectionModel::Config,
160                    actual_model,
161                )
162            }
163        }
164    }
165
166    fn config_resolve_result(
167        &self,
168        raw_query: &str,
169        collection: &str,
170        key: &str,
171    ) -> RedDBResult<RuntimeQueryResult> {
172        let latest = self.latest_config_version(collection, key)?;
173        if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174            self.audit_config_resolve(
175                collection,
176                key,
177                None,
178                crate::runtime::audit_log::Outcome::Denied,
179                &reason,
180            );
181            return Err(RedDBError::Query(reason));
182        }
183
184        let Some(version) = latest else {
185            let reason = "not_found";
186            self.audit_config_resolve(
187                collection,
188                key,
189                None,
190                crate::runtime::audit_log::Outcome::Denied,
191                reason,
192            );
193            return Err(RedDBError::NotFound(format!(
194                "config '{}.{}' not found",
195                collection, key
196            )));
197        };
198        if version.tombstone {
199            let reason = "deleted";
200            self.audit_config_resolve(
201                collection,
202                key,
203                None,
204                crate::runtime::audit_log::Outcome::Denied,
205                reason,
206            );
207            return Err(RedDBError::NotFound(format!(
208                "config '{}.{}' is deleted",
209                collection, key
210            )));
211        }
212
213        let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214            self.audit_config_resolve(
215                collection,
216                key,
217                None,
218                crate::runtime::audit_log::Outcome::Error,
219                &err.to_string(),
220            );
221        })?;
222
223        match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224            Ok(value) => {
225                self.audit_config_resolve(
226                    collection,
227                    key,
228                    Some(&secret_ref),
229                    crate::runtime::audit_log::Outcome::Success,
230                    "ok",
231                );
232                let mut result = UnifiedResult::with_columns(vec![
233                    "collection".into(),
234                    "key".into(),
235                    "value".into(),
236                    "resolved_store".into(),
237                    "resolved_collection".into(),
238                    "resolved_key".into(),
239                ]);
240                let mut record = UnifiedRecord::new();
241                record.set("collection", Value::text(collection.to_string()));
242                record.set("key", Value::text(key.to_string()));
243                record.set("value", value);
244                record.set("resolved_store", Value::text("vault"));
245                record.set("resolved_collection", Value::text(secret_ref.collection));
246                record.set("resolved_key", Value::text(secret_ref.key));
247                result.push(record);
248                Ok(RuntimeQueryResult {
249                    query: raw_query.to_string(),
250                    mode: crate::storage::query::modes::QueryMode::Sql,
251                    statement: "config_resolve",
252                    engine: "config",
253                    result,
254                    affected_rows: 0,
255                    statement_type: "select",
256                })
257            }
258            Err(err) => {
259                let reason = err.to_string();
260                let outcome = if reason.contains("denied") {
261                    crate::runtime::audit_log::Outcome::Denied
262                } else {
263                    crate::runtime::audit_log::Outcome::Error
264                };
265                self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266                Err(err)
267            }
268        }
269    }
270
271    fn config_write_result(
272        &self,
273        raw_query: &str,
274        collection: &str,
275        key: &str,
276        value: Value,
277        requested_type: Option<ConfigValueType>,
278        tags: &[String],
279        op: &str,
280    ) -> RedDBResult<RuntimeQueryResult> {
281        self.check_system_config_capability("config:write", collection, key)
282            .map_err(RedDBError::Query)?;
283        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284        self.ensure_config_collection(collection)?;
285        let latest = self.latest_config_version(collection, key)?;
286        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287        let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288        if let Some(value_type) = value_type {
289            validate_config_value_type(&value, value_type)?;
290        }
291        let before = latest.as_ref().and_then(|version| {
292            if version.tombstone {
293                None
294            } else {
295                Some(crate::presentation::entity_json::storage_value_to_json(
296                    &version.value,
297                ))
298            }
299        });
300        let after = Some(crate::presentation::entity_json::storage_value_to_json(
301            &value,
302        ));
303        let change_op = if latest.is_some() {
304            crate::replication::cdc::ChangeOperation::Update
305        } else {
306            crate::replication::cdc::ChangeOperation::Insert
307        };
308        let id = self.append_config_version(
309            collection,
310            key,
311            value,
312            version,
313            false,
314            op,
315            value_type,
316            schema_version,
317            tags,
318        )?;
319        self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320        self.prune_config_history(collection, key)?;
321        self.invalidate_result_cache();
322        Ok(config_write_output(
323            raw_query,
324            collection,
325            key,
326            version,
327            id,
328            value_type,
329            schema_version,
330            tags,
331            match op {
332                "rotate" => "config_rotate",
333                _ => "config_put",
334            },
335            1,
336        ))
337    }
338
339    fn config_delete_result(
340        &self,
341        raw_query: &str,
342        collection: &str,
343        key: &str,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        self.check_system_config_capability("config:write", collection, key)
346            .map_err(RedDBError::Query)?;
347        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348        self.ensure_config_collection(collection)?;
349        let latest = self.latest_config_version(collection, key)?;
350        let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351        let value_type = latest.as_ref().and_then(|version| version.value_type);
352        let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353        let id = self.append_config_version(
354            collection,
355            key,
356            Value::Null,
357            version,
358            true,
359            "delete",
360            value_type,
361            schema_version,
362            &[],
363        )?;
364        if let Some(before) = latest.as_ref().and_then(|version| {
365            if version.tombstone {
366                None
367            } else {
368                Some(crate::presentation::entity_json::storage_value_to_json(
369                    &version.value,
370                ))
371            }
372        }) {
373            self.record_kv_watch_event(
374                crate::replication::cdc::ChangeOperation::Delete,
375                collection,
376                key,
377                id.raw(),
378                Some(before),
379                None,
380            );
381        }
382        self.prune_config_history(collection, key)?;
383        self.invalidate_result_cache();
384        Ok(config_write_output(
385            raw_query,
386            collection,
387            key,
388            version,
389            id,
390            value_type,
391            schema_version,
392            &[],
393            "delete",
394            1,
395        ))
396    }
397
398    fn config_get_result(
399        &self,
400        raw_query: &str,
401        collection: &str,
402        key: &str,
403    ) -> RedDBResult<RuntimeQueryResult> {
404        self.check_system_config_capability("config:read", collection, key)
405            .map_err(RedDBError::Query)?;
406        let latest = self.latest_config_version(collection, key)?;
407        let mut result = UnifiedResult::with_columns(vec![
408            "collection".into(),
409            "key".into(),
410            "value".into(),
411            "version".into(),
412            "value_type".into(),
413            "schema_version".into(),
414            "tags".into(),
415            "tombstone".into(),
416        ]);
417        let mut record = UnifiedRecord::new();
418        record.set("collection", Value::text(collection.to_string()));
419        record.set("key", Value::text(key.to_string()));
420        if let Some(version) = latest {
421            record.set("value", version.value);
422            record.set("version", Value::Integer(version.version));
423            record.set("value_type", config_value_type_value(version.value_type));
424            record.set(
425                "schema_version",
426                version
427                    .schema_version
428                    .map(Value::Integer)
429                    .unwrap_or(Value::Null),
430            );
431            record.set("tags", config_tags_value(&version.tags));
432            record.set("tombstone", Value::Boolean(version.tombstone));
433        } else {
434            record.set("value", Value::Null);
435            record.set("version", Value::Null);
436            record.set("value_type", Value::Null);
437            record.set("schema_version", Value::Null);
438            record.set("tags", Value::Null);
439            record.set("tombstone", Value::Boolean(false));
440        }
441        result.push(record);
442        Ok(RuntimeQueryResult {
443            query: raw_query.to_string(),
444            mode: crate::storage::query::modes::QueryMode::Sql,
445            statement: "config_get",
446            engine: "config",
447            result,
448            affected_rows: 0,
449            statement_type: "select",
450        })
451    }
452
453    fn config_history_result(
454        &self,
455        raw_query: &str,
456        collection: &str,
457        key: &str,
458    ) -> RedDBResult<RuntimeQueryResult> {
459        self.check_system_config_capability("config:read", collection, key)
460            .map_err(RedDBError::Query)?;
461        let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462        let mut result = UnifiedResult::with_columns(vec![
463            "collection".into(),
464            "key".into(),
465            "version".into(),
466            "value".into(),
467            "value_type".into(),
468            "schema_version".into(),
469            "tags".into(),
470            "tombstone".into(),
471            "op".into(),
472            "created_at_ms".into(),
473        ]);
474        for version in versions {
475            let mut record = UnifiedRecord::new();
476            record.set("collection", Value::text(collection.to_string()));
477            record.set("key", Value::text(key.to_string()));
478            record.set("version", Value::Integer(version.version));
479            record.set("value", version.value);
480            record.set("value_type", config_value_type_value(version.value_type));
481            record.set(
482                "schema_version",
483                version
484                    .schema_version
485                    .map(Value::Integer)
486                    .unwrap_or(Value::Null),
487            );
488            record.set("tags", Value::Null);
489            record.set("tombstone", Value::Boolean(version.tombstone));
490            record.set("op", Value::text(version.op));
491            record.set("created_at_ms", Value::Integer(version.created_at_ms));
492            result.push(record);
493        }
494        Ok(RuntimeQueryResult {
495            query: raw_query.to_string(),
496            mode: crate::storage::query::modes::QueryMode::Sql,
497            statement: "config_history",
498            engine: "config",
499            result,
500            affected_rows: 0,
501            statement_type: "select",
502        })
503    }
504
505    fn config_list_result(
506        &self,
507        raw_query: &str,
508        collection: &str,
509        prefix: Option<&str>,
510        limit: Option<usize>,
511        offset: usize,
512    ) -> RedDBResult<RuntimeQueryResult> {
513        let mut versions = self.latest_config_versions(collection, prefix)?;
514        versions.sort_by(|left, right| left.key.cmp(&right.key));
515        let mut result = UnifiedResult::with_columns(vec![
516            "collection".into(),
517            "key".into(),
518            "value".into(),
519            "version".into(),
520            "value_type".into(),
521            "schema_version".into(),
522            "tags".into(),
523            "tombstone".into(),
524            "op".into(),
525            "created_at_ms".into(),
526        ]);
527        for version in versions
528            .into_iter()
529            .filter(|version| {
530                self.check_config_capability("config:read", collection, &version.key)
531                    .is_ok()
532            })
533            .skip(offset)
534            .take(limit.unwrap_or(usize::MAX))
535        {
536            let mut record = UnifiedRecord::new();
537            record.set("collection", Value::text(collection.to_string()));
538            record.set("key", Value::text(version.key));
539            record.set("value", version.value);
540            record.set("version", Value::Integer(version.version));
541            record.set("value_type", config_value_type_value(version.value_type));
542            record.set(
543                "schema_version",
544                version
545                    .schema_version
546                    .map(Value::Integer)
547                    .unwrap_or(Value::Null),
548            );
549            record.set("tags", config_tags_value(&version.tags));
550            record.set("tombstone", Value::Boolean(version.tombstone));
551            record.set("op", Value::text(version.op));
552            record.set("created_at_ms", Value::Integer(version.created_at_ms));
553            result.push(record);
554        }
555        Ok(RuntimeQueryResult {
556            query: raw_query.to_string(),
557            mode: crate::storage::query::modes::QueryMode::Sql,
558            statement: "config_list",
559            engine: "config",
560            result,
561            affected_rows: 0,
562            statement_type: "select",
563        })
564    }
565
566    fn config_watch_result(
567        &self,
568        raw_query: &str,
569        collection: &str,
570        key: &str,
571        prefix: bool,
572        from_lsn: Option<u64>,
573    ) -> RedDBResult<RuntimeQueryResult> {
574        let watch_key = if prefix {
575            format!("{key}.*")
576        } else {
577            key.to_string()
578        };
579        let endpoint = match from_lsn {
580            Some(lsn) => {
581                format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582            }
583            None => format!("/collections/{collection}/config/{watch_key}/watch"),
584        };
585        let mut result = UnifiedResult::with_columns(vec![
586            "collection".into(),
587            "key".into(),
588            "prefix".into(),
589            "from_lsn".into(),
590            "watch_url".into(),
591            "streaming".into(),
592        ]);
593        let mut record = UnifiedRecord::new();
594        record.set("collection", Value::text(collection.to_string()));
595        record.set("key", Value::text(watch_key));
596        record.set("prefix", Value::Boolean(prefix));
597        record.set(
598            "from_lsn",
599            from_lsn
600                .map(Value::UnsignedInteger)
601                .unwrap_or(crate::storage::schema::Value::Null),
602        );
603        record.set("watch_url", Value::text(endpoint));
604        record.set("streaming", Value::Boolean(true));
605        result.push(record);
606        Ok(RuntimeQueryResult {
607            query: raw_query.to_string(),
608            mode: crate::storage::query::modes::QueryMode::Sql,
609            statement: "config_watch",
610            engine: "config",
611            result,
612            affected_rows: 0,
613            statement_type: "stream",
614        })
615    }
616
617    fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618        let store = self.inner.db.store();
619        if store.get_collection(collection).is_none() {
620            store
621                .create_collection(collection)
622                .map_err(|err| RedDBError::Internal(err.to_string()))?;
623        }
624        if let Some(contract) = self.inner.db.collection_contract(collection) {
625            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626                CollectionModel::Config,
627                contract.declared_model,
628            )?;
629            return Ok(());
630        }
631        let now = current_unix_ms();
632        self.inner
633            .db
634            .save_collection_contract(CollectionContract {
635                name: collection.to_string(),
636                declared_model: CollectionModel::Config,
637                schema_mode: SchemaMode::Dynamic,
638                origin: ContractOrigin::Explicit,
639                version: 1,
640                created_at_unix_ms: now as u128,
641                updated_at_unix_ms: now as u128,
642                default_ttl_ms: None,
643                vector_dimension: None,
644                vector_metric: None,
645                context_index_fields: Vec::new(),
646                declared_columns: Vec::new(),
647                table_def: None,
648                timestamps_enabled: false,
649                context_index_enabled: false,
650                append_only: false,
651                subscriptions: Vec::new(),
652            })
653            .map(|_| ())
654            .map_err(|err| RedDBError::Internal(err.to_string()))
655    }
656
657    fn append_config_version(
658        &self,
659        collection: &str,
660        key: &str,
661        value: Value,
662        version: i64,
663        tombstone: bool,
664        op: &str,
665        value_type: Option<ConfigValueType>,
666        schema_version: Option<i64>,
667        tags: &[String],
668    ) -> RedDBResult<EntityId> {
669        let now = current_unix_ms() as i64;
670        let fields = vec![
671            ("key".to_string(), Value::text(key.to_string())),
672            ("value".to_string(), value),
673            ("version".to_string(), Value::Integer(version)),
674            (
675                "value_type".to_string(),
676                config_value_type_value(value_type),
677            ),
678            (
679                "schema_version".to_string(),
680                schema_version.map(Value::Integer).unwrap_or(Value::Null),
681            ),
682            ("tombstone".to_string(), Value::Boolean(tombstone)),
683            ("op".to_string(), Value::text(op.to_string())),
684            ("created_at_ms".to_string(), Value::Integer(now)),
685            ("tags".to_string(), config_tags_value(tags)),
686        ];
687        let mut row = RowData::new(Vec::new());
688        row.named = Some(fields.into_iter().collect());
689        let entity = UnifiedEntity::new(
690            EntityId::new(0),
691            EntityKind::TableRow {
692                table: Arc::from(collection),
693                row_id: 0,
694            },
695            EntityData::Row(row),
696        );
697        self.inner
698            .db
699            .store()
700            .insert(collection, entity)
701            .map_err(|err| RedDBError::Internal(err.to_string()))
702    }
703
704    fn latest_config_version(
705        &self,
706        collection: &str,
707        key: &str,
708    ) -> RedDBResult<Option<ConfigVersion>> {
709        Ok(super::keyed_spine::latest_version(
710            self.config_versions(collection, key)?,
711        ))
712    }
713
714    fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
715        let store = self.inner.db.store();
716        let Some(manager) = store.get_collection(collection) else {
717            return Ok(Vec::new());
718        };
719        let mut versions = Vec::new();
720        for entity in manager.query_all(|_| true) {
721            let EntityData::Row(row) = &entity.data else {
722                continue;
723            };
724            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
725                continue;
726            };
727            if version.key != key {
728                continue;
729            }
730            versions.push(ConfigVersion::from_keyed_row(version, row));
731        }
732        Ok(versions)
733    }
734
735    fn latest_config_versions(
736        &self,
737        collection: &str,
738        prefix: Option<&str>,
739    ) -> RedDBResult<Vec<ConfigVersion>> {
740        let store = self.inner.db.store();
741        let Some(manager) = store.get_collection(collection) else {
742            return Ok(Vec::new());
743        };
744        let mut versions = Vec::new();
745        for entity in manager.query_all(|_| true) {
746            let EntityData::Row(row) = &entity.data else {
747                continue;
748            };
749            let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
750                continue;
751            };
752            versions.push(ConfigVersion::from_keyed_row(version, row));
753        }
754        Ok(super::keyed_spine::latest_versions(versions, prefix))
755    }
756
757    fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
758        let mut versions = self.config_versions(collection, key)?;
759        if versions.len() <= CONFIG_HISTORY_LIMIT {
760            return Ok(());
761        }
762        versions = super::keyed_spine::history_versions(versions);
763        let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
764        let store = self.inner.db.store();
765        for version in versions.into_iter().take(drop_count) {
766            store
767                .delete(collection, version.id)
768                .map_err(|err| RedDBError::Internal(err.to_string()))?;
769        }
770        Ok(())
771    }
772
773    fn check_config_capability(
774        &self,
775        action: &str,
776        collection: &str,
777        key: &str,
778    ) -> Result<(), String> {
779        let Some(auth_store) = self.inner.auth_store.read().clone() else {
780            return Ok(());
781        };
782        if !auth_store.iam_authorization_enabled() {
783            return Ok(());
784        }
785        let Some((principal, role)) = current_auth_identity() else {
786            return Err(
787                "IAM authorization is enabled; config capability check requires an authenticated principal"
788                    .to_string(),
789            );
790        };
791        let tenant = current_tenant();
792        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
793        let mut resource = crate::auth::policies::ResourceRef::new(
794            "config",
795            config_target_resource(collection, key),
796        );
797        if let Some(ref tenant) = tenant {
798            resource = resource.with_tenant(tenant.clone());
799        }
800        let ctx = crate::auth::policies::EvalContext {
801            principal_tenant: tenant.clone(),
802            current_tenant: tenant,
803            peer_ip: None,
804            mfa_present: false,
805            now_ms: crate::utils::now_unix_millis() as u128,
806            principal_is_admin_role: role == crate::auth::Role::Admin,
807        };
808        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
809            Ok(())
810        } else {
811            Err(format!(
812                "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
813                principal,
814                action,
815                config_target_resource(collection, key)
816            ))
817        }
818    }
819
820    fn check_system_config_capability(
821        &self,
822        action: &str,
823        collection: &str,
824        key: &str,
825    ) -> Result<(), String> {
826        if collection != "red.config" {
827            return Ok(());
828        }
829        self.check_config_capability(action, collection, key)
830    }
831
832    pub fn config_watch_events_since(
833        &self,
834        collection: &str,
835        key: &str,
836        since_lsn: u64,
837        max_count: usize,
838    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
839        self.kv_watch_events_since(collection, key, since_lsn, max_count)
840            .into_iter()
841            .map(|event| self.policy_filter_config_watch_event(event))
842            .collect()
843    }
844
845    pub fn config_watch_events_since_prefix(
846        &self,
847        collection: &str,
848        prefix: &str,
849        since_lsn: u64,
850        max_count: usize,
851    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
852        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
853            .into_iter()
854            .map(|event| self.policy_filter_config_watch_event(event))
855            .collect()
856    }
857
858    fn policy_filter_config_watch_event(
859        &self,
860        mut event: crate::replication::cdc::KvWatchEvent,
861    ) -> crate::replication::cdc::KvWatchEvent {
862        if self
863            .check_config_capability("config:read", &event.collection, &event.key)
864            .is_err()
865        {
866            event.before = None;
867            event.after = None;
868        }
869        event
870    }
871
872    fn audit_config_resolve(
873        &self,
874        collection: &str,
875        key: &str,
876        secret_ref: Option<&ConfigSecretRef>,
877        outcome: crate::runtime::audit_log::Outcome,
878        reason: &str,
879    ) {
880        let actor = current_auth_identity()
881            .map(|(principal, _)| principal)
882            .unwrap_or_else(|| "anonymous".to_string());
883        let request_id = match current_connection_id() {
884            0 => "embedded".to_string(),
885            id => format!("conn-{id}"),
886        };
887        let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
888            .principal(actor.clone())
889            .source(crate::runtime::audit_log::AuditAuthSource::Password)
890            .resource(format!(
891                "config:{}",
892                config_target_resource(collection, key)
893            ))
894            .outcome(outcome)
895            .correlation_id(request_id.clone())
896            .fields([
897                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
898                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
899                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
900                crate::runtime::audit_log::AuditFieldEscaper::field(
901                    "target",
902                    config_target_resource(collection, key),
903                ),
904                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
905                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
906                crate::runtime::audit_log::AuditFieldEscaper::field(
907                    "connection_id",
908                    current_connection_id(),
909                ),
910            ]);
911        if let Some(tenant) = current_tenant() {
912            builder = builder.tenant(tenant);
913        }
914        if let Some(secret_ref) = secret_ref {
915            builder = builder.fields([
916                crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
917                crate::runtime::audit_log::AuditFieldEscaper::field(
918                    "resolved_collection",
919                    secret_ref.collection.as_str(),
920                ),
921                crate::runtime::audit_log::AuditFieldEscaper::field(
922                    "resolved_key",
923                    secret_ref.key.as_str(),
924                ),
925                crate::runtime::audit_log::AuditFieldEscaper::field(
926                    "resolved_target",
927                    format!("{}.{}", secret_ref.collection, secret_ref.key),
928                ),
929            ]);
930        }
931        self.audit_log().record_event(builder.build());
932    }
933}
934
935fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
936    let Value::Json(bytes) = value else {
937        return Err(RedDBError::InvalidConfig(
938            "CONFIG value is not a SecretRef".to_string(),
939        ));
940    };
941    let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
942        RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
943    })?;
944    let Some(object) = json.as_object() else {
945        return Err(RedDBError::InvalidConfig(
946            "CONFIG SecretRef must be an object".to_string(),
947        ));
948    };
949    let get_str = |field: &str| -> RedDBResult<&str> {
950        object
951            .get(field)
952            .and_then(|value| value.as_str())
953            .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
954    };
955    if get_str("type")? != "secret_ref" {
956        return Err(RedDBError::InvalidConfig(
957            "CONFIG value is not a SecretRef".to_string(),
958        ));
959    }
960    if get_str("store")? != "vault" {
961        return Err(RedDBError::InvalidConfig(
962            "CONFIG SecretRef store is unsupported".to_string(),
963        ));
964    }
965    Ok(ConfigSecretRef {
966        collection: get_str("collection")?.to_string(),
967        key: get_str("key")?.to_string(),
968    })
969}
970
971fn config_target_resource(collection: &str, key: &str) -> String {
972    if collection == "red.config" {
973        format!("red.config/{}", key.to_ascii_lowercase())
974    } else {
975        format!("{collection}.{key}")
976    }
977}
978
979fn config_write_output(
980    raw_query: &str,
981    collection: &str,
982    key: &str,
983    version: i64,
984    id: EntityId,
985    value_type: Option<ConfigValueType>,
986    schema_version: Option<i64>,
987    tags: &[String],
988    statement: &'static str,
989    affected_rows: u64,
990) -> RuntimeQueryResult {
991    let mut result = UnifiedResult::with_columns(vec![
992        "ok".into(),
993        "collection".into(),
994        "key".into(),
995        "version".into(),
996        "value_type".into(),
997        "schema_version".into(),
998        "tags".into(),
999        "id".into(),
1000    ]);
1001    let mut record = UnifiedRecord::new();
1002    record.set("ok", Value::Boolean(true));
1003    record.set("collection", Value::text(collection.to_string()));
1004    record.set("key", Value::text(key.to_string()));
1005    record.set("version", Value::Integer(version));
1006    record.set("value_type", config_value_type_value(value_type));
1007    record.set(
1008        "schema_version",
1009        schema_version.map(Value::Integer).unwrap_or(Value::Null),
1010    );
1011    record.set("tags", config_tags_value(tags));
1012    record.set("id", Value::Integer(id.raw() as i64));
1013    result.push(record);
1014    RuntimeQueryResult {
1015        query: raw_query.to_string(),
1016        mode: crate::storage::query::modes::QueryMode::Sql,
1017        statement,
1018        engine: "config",
1019        result,
1020        affected_rows,
1021        statement_type: if statement == "delete" {
1022            "delete"
1023        } else {
1024            "update"
1025        },
1026    }
1027}
1028
1029fn invalid_config_volatility(operation: &str) -> RedDBError {
1030    RedDBError::InvalidOperation(format!(
1031        "CONFIG does not support KV-only volatility operation {operation}"
1032    ))
1033}
1034
1035fn resolve_config_schema(
1036    latest: Option<&ConfigVersion>,
1037    requested_type: Option<ConfigValueType>,
1038) -> (Option<ConfigValueType>, Option<i64>) {
1039    let previous_type = latest.and_then(|version| version.value_type);
1040    let previous_schema_version = latest.and_then(|version| version.schema_version);
1041    match requested_type {
1042        Some(value_type) if Some(value_type) != previous_type => (
1043            Some(value_type),
1044            Some(previous_schema_version.unwrap_or(0) + 1),
1045        ),
1046        Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1047        None => (previous_type, previous_schema_version),
1048    }
1049}
1050
1051fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1052    let valid = match value_type {
1053        ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1054        ConfigValueType::Int => matches!(
1055            value,
1056            Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1057        ),
1058        ConfigValueType::String => matches!(value, Value::Text(_)),
1059        ConfigValueType::Url => validate_config_url(value),
1060        ConfigValueType::Object => validate_config_json_shape(value, true),
1061        ConfigValueType::Array => {
1062            matches!(value, Value::Array(_) | Value::Vector(_))
1063                || validate_config_json_shape(value, false)
1064        }
1065    };
1066    if valid {
1067        Ok(())
1068    } else {
1069        Err(RedDBError::InvalidConfig(format!(
1070            "CONFIG value type mismatch: expected {}, got {}",
1071            value_type.as_str(),
1072            config_actual_value_type(value),
1073        )))
1074    }
1075}
1076
1077fn validate_config_url(value: &Value) -> bool {
1078    let url = match value {
1079        Value::Url(value) => value.as_str(),
1080        Value::Text(value) => value.as_ref(),
1081        _ => return false,
1082    };
1083    url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1084}
1085
1086fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1087    let Value::Json(bytes) = value else {
1088        return false;
1089    };
1090    let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1091        return false;
1092    };
1093    matches!(
1094        (object, json),
1095        (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1096    )
1097}
1098
1099fn config_actual_value_type(value: &Value) -> &'static str {
1100    match value {
1101        Value::Null => "null",
1102        Value::Boolean(_) => "bool",
1103        Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1104        Value::Text(_) => "string",
1105        Value::Url(_) => "url",
1106        Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1107            Ok(crate::json::Value::Object(_)) => "object",
1108            Ok(crate::json::Value::Array(_)) => "array",
1109            _ => "json",
1110        },
1111        Value::Array(_) | Value::Vector(_) => "array",
1112        _ => "other",
1113    }
1114}
1115
1116fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1117    value_type
1118        .map(|value_type| Value::text(value_type.as_str()))
1119        .unwrap_or(Value::Null)
1120}
1121
1122fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1123    match value {
1124        Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1125        _ => None,
1126    }
1127}
1128
1129fn config_tags_value(tags: &[String]) -> Value {
1130    if tags.is_empty() {
1131        return Value::Null;
1132    }
1133    Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1134}
1135
1136fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1137    match value {
1138        Some(Value::Array(values)) => values
1139            .iter()
1140            .filter_map(|value| match value {
1141                Value::Text(tag) => Some(tag.to_string()),
1142                _ => None,
1143            })
1144            .collect(),
1145        Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1146            .ok()
1147            .and_then(|value| value.as_array().map(|values| values.to_vec()))
1148            .map(|values| {
1149                values
1150                    .into_iter()
1151                    .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1152                    .collect()
1153            })
1154            .unwrap_or_default(),
1155        _ => Vec::new(),
1156    }
1157}
1158
1159fn current_unix_ms() -> u64 {
1160    std::time::SystemTime::now()
1161        .duration_since(std::time::UNIX_EPOCH)
1162        .unwrap_or_default()
1163        .as_millis() as u64
1164}