1use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17 id: EntityId,
18 key: String,
19 version: i64,
20 value: Value,
21 tombstone: bool,
22 created_at_ms: i64,
23 op: String,
24 value_type: Option<ConfigValueType>,
25 schema_version: Option<i64>,
26 tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30 fn key(&self) -> &str {
31 &self.key
32 }
33
34 fn version(&self) -> i64 {
35 self.version
36 }
37}
38
39impl ConfigVersion {
40 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41 Self {
42 id: version.id,
43 key: version.key,
44 version: version.version,
45 value: version.value,
46 tombstone: version.tombstone,
47 created_at_ms: version.created_at_ms,
48 op: version.op,
49 value_type: row
50 .get_field("value_type")
51 .and_then(config_value_type_from_value),
52 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53 tags: config_tags_from_value(row.get_field("tags")),
54 }
55 }
56}
57
58struct ConfigSecretRef {
59 collection: String,
60 key: String,
61}
62
63impl RedDBRuntime {
64 pub fn execute_config_command(
65 &self,
66 raw_query: &str,
67 cmd: &crate::storage::query::ast::ConfigCommand,
68 ) -> RedDBResult<RuntimeQueryResult> {
69 use crate::storage::query::ast::ConfigCommand;
70
71 match cmd {
72 ConfigCommand::Put {
73 collection,
74 key,
75 value,
76 value_type,
77 tags,
78 } => self.config_write_result(
79 raw_query,
80 collection,
81 key,
82 value.clone(),
83 *value_type,
84 tags,
85 "put",
86 ),
87 ConfigCommand::Rotate {
88 collection,
89 key,
90 value,
91 value_type,
92 tags,
93 } => self.config_write_result(
94 raw_query,
95 collection,
96 key,
97 value.clone(),
98 *value_type,
99 tags,
100 "rotate",
101 ),
102 ConfigCommand::Get { collection, key } => {
103 self.config_get_result(raw_query, collection, key)
104 }
105 ConfigCommand::Resolve { collection, key } => {
106 self.config_resolve_result(raw_query, collection, key)
107 }
108 ConfigCommand::Delete { collection, key } => {
109 self.config_delete_result(raw_query, collection, key)
110 }
111 ConfigCommand::History { collection, key } => {
112 self.config_history_result(raw_query, collection, key)
113 }
114 ConfigCommand::List {
115 collection,
116 prefix,
117 limit,
118 offset,
119 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120 ConfigCommand::Watch {
121 collection,
122 key,
123 prefix,
124 from_lsn,
125 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127 Err(invalid_config_volatility(operation))
128 }
129 }
130 }
131
132 pub(crate) fn validate_config_command_before_auth(
133 &self,
134 cmd: &crate::storage::query::ast::ConfigCommand,
135 ) -> RedDBResult<()> {
136 use crate::storage::query::ast::ConfigCommand;
137 match cmd {
138 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139 Err(invalid_config_volatility(operation))
140 }
141 ConfigCommand::Put { collection, .. }
142 | ConfigCommand::Get { collection, .. }
143 | ConfigCommand::Resolve { collection, .. }
144 | ConfigCommand::Rotate { collection, .. }
145 | ConfigCommand::Delete { collection, .. }
146 | ConfigCommand::History { collection, .. }
147 | ConfigCommand::List { collection, .. }
148 | ConfigCommand::Watch { collection, .. } => {
149 let snapshot = self.inner.db.catalog_model_snapshot();
150 let Some(actual_model) = snapshot
151 .collections
152 .iter()
153 .find(|c| c.name == *collection)
154 .map(|c| c.declared_model.unwrap_or(c.model))
155 else {
156 return Ok(());
157 };
158 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159 CollectionModel::Config,
160 actual_model,
161 )
162 }
163 }
164 }
165
166 fn config_resolve_result(
167 &self,
168 raw_query: &str,
169 collection: &str,
170 key: &str,
171 ) -> RedDBResult<RuntimeQueryResult> {
172 let latest = self.latest_config_version(collection, key)?;
173 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174 self.audit_config_resolve(
175 collection,
176 key,
177 None,
178 crate::runtime::audit_log::Outcome::Denied,
179 &reason,
180 );
181 return Err(RedDBError::Query(reason));
182 }
183
184 let Some(version) = latest else {
185 let reason = "not_found";
186 self.audit_config_resolve(
187 collection,
188 key,
189 None,
190 crate::runtime::audit_log::Outcome::Denied,
191 reason,
192 );
193 return Err(RedDBError::NotFound(format!(
194 "config '{}.{}' not found",
195 collection, key
196 )));
197 };
198 if version.tombstone {
199 let reason = "deleted";
200 self.audit_config_resolve(
201 collection,
202 key,
203 None,
204 crate::runtime::audit_log::Outcome::Denied,
205 reason,
206 );
207 return Err(RedDBError::NotFound(format!(
208 "config '{}.{}' is deleted",
209 collection, key
210 )));
211 }
212
213 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214 self.audit_config_resolve(
215 collection,
216 key,
217 None,
218 crate::runtime::audit_log::Outcome::Error,
219 &err.to_string(),
220 );
221 })?;
222
223 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224 Ok(value) => {
225 self.audit_config_resolve(
226 collection,
227 key,
228 Some(&secret_ref),
229 crate::runtime::audit_log::Outcome::Success,
230 "ok",
231 );
232 let mut result = UnifiedResult::with_columns(vec![
233 "collection".into(),
234 "key".into(),
235 "value".into(),
236 "resolved_store".into(),
237 "resolved_collection".into(),
238 "resolved_key".into(),
239 ]);
240 let mut record = UnifiedRecord::new();
241 record.set("collection", Value::text(collection.to_string()));
242 record.set("key", Value::text(key.to_string()));
243 record.set("value", value);
244 record.set("resolved_store", Value::text("vault"));
245 record.set("resolved_collection", Value::text(secret_ref.collection));
246 record.set("resolved_key", Value::text(secret_ref.key));
247 result.push(record);
248 Ok(RuntimeQueryResult {
249 query: raw_query.to_string(),
250 mode: crate::storage::query::modes::QueryMode::Sql,
251 statement: "config_resolve",
252 engine: "config",
253 result,
254 affected_rows: 0,
255 statement_type: "select",
256 })
257 }
258 Err(err) => {
259 let reason = err.to_string();
260 let outcome = if reason.contains("denied") {
261 crate::runtime::audit_log::Outcome::Denied
262 } else {
263 crate::runtime::audit_log::Outcome::Error
264 };
265 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266 Err(err)
267 }
268 }
269 }
270
271 fn config_write_result(
272 &self,
273 raw_query: &str,
274 collection: &str,
275 key: &str,
276 value: Value,
277 requested_type: Option<ConfigValueType>,
278 tags: &[String],
279 op: &str,
280 ) -> RedDBResult<RuntimeQueryResult> {
281 self.check_system_config_capability("config:write", collection, key)
282 .map_err(RedDBError::Query)?;
283 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284 self.ensure_config_collection(collection)?;
285 let latest = self.latest_config_version(collection, key)?;
286 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288 if let Some(value_type) = value_type {
289 validate_config_value_type(&value, value_type)?;
290 }
291 let before = latest.as_ref().and_then(|version| {
292 if version.tombstone {
293 None
294 } else {
295 Some(crate::presentation::entity_json::storage_value_to_json(
296 &version.value,
297 ))
298 }
299 });
300 let after = Some(crate::presentation::entity_json::storage_value_to_json(
301 &value,
302 ));
303 let change_op = if latest.is_some() {
304 crate::replication::cdc::ChangeOperation::Update
305 } else {
306 crate::replication::cdc::ChangeOperation::Insert
307 };
308 let id = self.append_config_version(
309 collection,
310 key,
311 value,
312 version,
313 false,
314 op,
315 value_type,
316 schema_version,
317 tags,
318 )?;
319 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320 self.prune_config_history(collection, key)?;
321 self.invalidate_result_cache();
322 Ok(config_write_output(
323 raw_query,
324 collection,
325 key,
326 version,
327 id,
328 value_type,
329 schema_version,
330 tags,
331 match op {
332 "rotate" => "config_rotate",
333 _ => "config_put",
334 },
335 1,
336 ))
337 }
338
339 fn config_delete_result(
340 &self,
341 raw_query: &str,
342 collection: &str,
343 key: &str,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 self.check_system_config_capability("config:write", collection, key)
346 .map_err(RedDBError::Query)?;
347 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348 self.ensure_config_collection(collection)?;
349 let latest = self.latest_config_version(collection, key)?;
350 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351 let value_type = latest.as_ref().and_then(|version| version.value_type);
352 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353 let id = self.append_config_version(
354 collection,
355 key,
356 Value::Null,
357 version,
358 true,
359 "delete",
360 value_type,
361 schema_version,
362 &[],
363 )?;
364 if let Some(before) = latest.as_ref().and_then(|version| {
365 if version.tombstone {
366 None
367 } else {
368 Some(crate::presentation::entity_json::storage_value_to_json(
369 &version.value,
370 ))
371 }
372 }) {
373 self.record_kv_watch_event(
374 crate::replication::cdc::ChangeOperation::Delete,
375 collection,
376 key,
377 id.raw(),
378 Some(before),
379 None,
380 );
381 }
382 self.prune_config_history(collection, key)?;
383 self.invalidate_result_cache();
384 Ok(config_write_output(
385 raw_query,
386 collection,
387 key,
388 version,
389 id,
390 value_type,
391 schema_version,
392 &[],
393 "delete",
394 1,
395 ))
396 }
397
398 fn config_get_result(
399 &self,
400 raw_query: &str,
401 collection: &str,
402 key: &str,
403 ) -> RedDBResult<RuntimeQueryResult> {
404 self.check_system_config_capability("config:read", collection, key)
405 .map_err(RedDBError::Query)?;
406 let latest = self.latest_config_version(collection, key)?;
407 let mut result = UnifiedResult::with_columns(vec![
408 "collection".into(),
409 "key".into(),
410 "value".into(),
411 "version".into(),
412 "value_type".into(),
413 "schema_version".into(),
414 "tags".into(),
415 "tombstone".into(),
416 ]);
417 let mut record = UnifiedRecord::new();
418 record.set("collection", Value::text(collection.to_string()));
419 record.set("key", Value::text(key.to_string()));
420 if let Some(version) = latest {
421 record.set("value", version.value);
422 record.set("version", Value::Integer(version.version));
423 record.set("value_type", config_value_type_value(version.value_type));
424 record.set(
425 "schema_version",
426 version
427 .schema_version
428 .map(Value::Integer)
429 .unwrap_or(Value::Null),
430 );
431 record.set("tags", config_tags_value(&version.tags));
432 record.set("tombstone", Value::Boolean(version.tombstone));
433 } else {
434 record.set("value", Value::Null);
435 record.set("version", Value::Null);
436 record.set("value_type", Value::Null);
437 record.set("schema_version", Value::Null);
438 record.set("tags", Value::Null);
439 record.set("tombstone", Value::Boolean(false));
440 }
441 result.push(record);
442 Ok(RuntimeQueryResult {
443 query: raw_query.to_string(),
444 mode: crate::storage::query::modes::QueryMode::Sql,
445 statement: "config_get",
446 engine: "config",
447 result,
448 affected_rows: 0,
449 statement_type: "select",
450 })
451 }
452
453 fn config_history_result(
454 &self,
455 raw_query: &str,
456 collection: &str,
457 key: &str,
458 ) -> RedDBResult<RuntimeQueryResult> {
459 self.check_system_config_capability("config:read", collection, key)
460 .map_err(RedDBError::Query)?;
461 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462 let mut result = UnifiedResult::with_columns(vec![
463 "collection".into(),
464 "key".into(),
465 "version".into(),
466 "value".into(),
467 "value_type".into(),
468 "schema_version".into(),
469 "tags".into(),
470 "tombstone".into(),
471 "op".into(),
472 "created_at_ms".into(),
473 ]);
474 for version in versions {
475 let mut record = UnifiedRecord::new();
476 record.set("collection", Value::text(collection.to_string()));
477 record.set("key", Value::text(key.to_string()));
478 record.set("version", Value::Integer(version.version));
479 record.set("value", version.value);
480 record.set("value_type", config_value_type_value(version.value_type));
481 record.set(
482 "schema_version",
483 version
484 .schema_version
485 .map(Value::Integer)
486 .unwrap_or(Value::Null),
487 );
488 record.set("tags", Value::Null);
489 record.set("tombstone", Value::Boolean(version.tombstone));
490 record.set("op", Value::text(version.op));
491 record.set("created_at_ms", Value::Integer(version.created_at_ms));
492 result.push(record);
493 }
494 Ok(RuntimeQueryResult {
495 query: raw_query.to_string(),
496 mode: crate::storage::query::modes::QueryMode::Sql,
497 statement: "config_history",
498 engine: "config",
499 result,
500 affected_rows: 0,
501 statement_type: "select",
502 })
503 }
504
505 fn config_list_result(
506 &self,
507 raw_query: &str,
508 collection: &str,
509 prefix: Option<&str>,
510 limit: Option<usize>,
511 offset: usize,
512 ) -> RedDBResult<RuntimeQueryResult> {
513 let mut versions = self.latest_config_versions(collection, prefix)?;
514 versions.sort_by(|left, right| left.key.cmp(&right.key));
515 let mut result = UnifiedResult::with_columns(vec![
516 "collection".into(),
517 "key".into(),
518 "value".into(),
519 "version".into(),
520 "value_type".into(),
521 "schema_version".into(),
522 "tags".into(),
523 "tombstone".into(),
524 "op".into(),
525 "created_at_ms".into(),
526 ]);
527 for version in versions
528 .into_iter()
529 .filter(|version| {
530 self.check_config_capability("config:read", collection, &version.key)
531 .is_ok()
532 })
533 .skip(offset)
534 .take(limit.unwrap_or(usize::MAX))
535 {
536 let mut record = UnifiedRecord::new();
537 record.set("collection", Value::text(collection.to_string()));
538 record.set("key", Value::text(version.key));
539 record.set("value", version.value);
540 record.set("version", Value::Integer(version.version));
541 record.set("value_type", config_value_type_value(version.value_type));
542 record.set(
543 "schema_version",
544 version
545 .schema_version
546 .map(Value::Integer)
547 .unwrap_or(Value::Null),
548 );
549 record.set("tags", config_tags_value(&version.tags));
550 record.set("tombstone", Value::Boolean(version.tombstone));
551 record.set("op", Value::text(version.op));
552 record.set("created_at_ms", Value::Integer(version.created_at_ms));
553 result.push(record);
554 }
555 Ok(RuntimeQueryResult {
556 query: raw_query.to_string(),
557 mode: crate::storage::query::modes::QueryMode::Sql,
558 statement: "config_list",
559 engine: "config",
560 result,
561 affected_rows: 0,
562 statement_type: "select",
563 })
564 }
565
566 fn config_watch_result(
567 &self,
568 raw_query: &str,
569 collection: &str,
570 key: &str,
571 prefix: bool,
572 from_lsn: Option<u64>,
573 ) -> RedDBResult<RuntimeQueryResult> {
574 let watch_key = if prefix {
575 format!("{key}.*")
576 } else {
577 key.to_string()
578 };
579 let endpoint = match from_lsn {
580 Some(lsn) => {
581 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582 }
583 None => format!("/collections/{collection}/config/{watch_key}/watch"),
584 };
585 let mut result = UnifiedResult::with_columns(vec![
586 "collection".into(),
587 "key".into(),
588 "prefix".into(),
589 "from_lsn".into(),
590 "watch_url".into(),
591 "streaming".into(),
592 ]);
593 let mut record = UnifiedRecord::new();
594 record.set("collection", Value::text(collection.to_string()));
595 record.set("key", Value::text(watch_key));
596 record.set("prefix", Value::Boolean(prefix));
597 record.set(
598 "from_lsn",
599 from_lsn
600 .map(Value::UnsignedInteger)
601 .unwrap_or(crate::storage::schema::Value::Null),
602 );
603 record.set("watch_url", Value::text(endpoint));
604 record.set("streaming", Value::Boolean(true));
605 result.push(record);
606 Ok(RuntimeQueryResult {
607 query: raw_query.to_string(),
608 mode: crate::storage::query::modes::QueryMode::Sql,
609 statement: "config_watch",
610 engine: "config",
611 result,
612 affected_rows: 0,
613 statement_type: "stream",
614 })
615 }
616
617 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618 let store = self.inner.db.store();
619 if store.get_collection(collection).is_none() {
620 store
621 .create_collection(collection)
622 .map_err(|err| RedDBError::Internal(err.to_string()))?;
623 }
624 if let Some(contract) = self.inner.db.collection_contract(collection) {
625 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626 CollectionModel::Config,
627 contract.declared_model,
628 )?;
629 return Ok(());
630 }
631 let now = current_unix_ms();
632 self.inner
633 .db
634 .save_collection_contract(CollectionContract {
635 name: collection.to_string(),
636 declared_model: CollectionModel::Config,
637 schema_mode: SchemaMode::Dynamic,
638 origin: ContractOrigin::Explicit,
639 version: 1,
640 created_at_unix_ms: now as u128,
641 updated_at_unix_ms: now as u128,
642 default_ttl_ms: None,
643 vector_dimension: None,
644 vector_metric: None,
645 context_index_fields: Vec::new(),
646 declared_columns: Vec::new(),
647 table_def: None,
648 timestamps_enabled: false,
649 context_index_enabled: false,
650 metrics_raw_retention_ms: None,
651 metrics_rollup_policies: Vec::new(),
652 metrics_tenant_identity: None,
653 metrics_namespace: None,
654 append_only: false,
655 subscriptions: Vec::new(),
656 })
657 .map(|_| ())
658 .map_err(|err| RedDBError::Internal(err.to_string()))
659 }
660
661 fn append_config_version(
662 &self,
663 collection: &str,
664 key: &str,
665 value: Value,
666 version: i64,
667 tombstone: bool,
668 op: &str,
669 value_type: Option<ConfigValueType>,
670 schema_version: Option<i64>,
671 tags: &[String],
672 ) -> RedDBResult<EntityId> {
673 let now = current_unix_ms() as i64;
674 let fields = vec![
675 ("key".to_string(), Value::text(key.to_string())),
676 ("value".to_string(), value),
677 ("version".to_string(), Value::Integer(version)),
678 (
679 "value_type".to_string(),
680 config_value_type_value(value_type),
681 ),
682 (
683 "schema_version".to_string(),
684 schema_version.map(Value::Integer).unwrap_or(Value::Null),
685 ),
686 ("tombstone".to_string(), Value::Boolean(tombstone)),
687 ("op".to_string(), Value::text(op.to_string())),
688 ("created_at_ms".to_string(), Value::Integer(now)),
689 ("tags".to_string(), config_tags_value(tags)),
690 ];
691 let mut row = RowData::new(Vec::new());
692 row.named = Some(fields.into_iter().collect());
693 let entity = UnifiedEntity::new(
694 EntityId::new(0),
695 EntityKind::TableRow {
696 table: Arc::from(collection),
697 row_id: 0,
698 },
699 EntityData::Row(row),
700 );
701 self.inner
702 .db
703 .store()
704 .insert(collection, entity)
705 .map_err(|err| RedDBError::Internal(err.to_string()))
706 }
707
708 fn latest_config_version(
709 &self,
710 collection: &str,
711 key: &str,
712 ) -> RedDBResult<Option<ConfigVersion>> {
713 Ok(super::keyed_spine::latest_version(
714 self.config_versions(collection, key)?,
715 ))
716 }
717
718 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
719 let store = self.inner.db.store();
720 let Some(manager) = store.get_collection(collection) else {
721 return Ok(Vec::new());
722 };
723 let mut versions = Vec::new();
724 for entity in manager.query_all(|_| true) {
725 let EntityData::Row(row) = &entity.data else {
726 continue;
727 };
728 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
729 continue;
730 };
731 if version.key != key {
732 continue;
733 }
734 versions.push(ConfigVersion::from_keyed_row(version, row));
735 }
736 Ok(versions)
737 }
738
739 fn latest_config_versions(
740 &self,
741 collection: &str,
742 prefix: Option<&str>,
743 ) -> RedDBResult<Vec<ConfigVersion>> {
744 let store = self.inner.db.store();
745 let Some(manager) = store.get_collection(collection) else {
746 return Ok(Vec::new());
747 };
748 let mut versions = Vec::new();
749 for entity in manager.query_all(|_| true) {
750 let EntityData::Row(row) = &entity.data else {
751 continue;
752 };
753 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
754 continue;
755 };
756 versions.push(ConfigVersion::from_keyed_row(version, row));
757 }
758 Ok(super::keyed_spine::latest_versions(versions, prefix))
759 }
760
761 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
762 let mut versions = self.config_versions(collection, key)?;
763 if versions.len() <= CONFIG_HISTORY_LIMIT {
764 return Ok(());
765 }
766 versions = super::keyed_spine::history_versions(versions);
767 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
768 let store = self.inner.db.store();
769 for version in versions.into_iter().take(drop_count) {
770 store
771 .delete(collection, version.id)
772 .map_err(|err| RedDBError::Internal(err.to_string()))?;
773 }
774 Ok(())
775 }
776
777 fn check_config_capability(
778 &self,
779 action: &str,
780 collection: &str,
781 key: &str,
782 ) -> Result<(), String> {
783 let Some(auth_store) = self.inner.auth_store.read().clone() else {
784 return Ok(());
785 };
786 if !auth_store.iam_authorization_enabled() {
787 return Ok(());
788 }
789 let Some((principal, role)) = current_auth_identity() else {
790 return Err(
791 "IAM authorization is enabled; config capability check requires an authenticated principal"
792 .to_string(),
793 );
794 };
795 let tenant = current_tenant();
796 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
797 let mut resource = crate::auth::policies::ResourceRef::new(
798 "config",
799 config_target_resource(collection, key),
800 );
801 if let Some(ref tenant) = tenant {
802 resource = resource.with_tenant(tenant.clone());
803 }
804 let ctx = crate::auth::policies::EvalContext {
805 principal_tenant: tenant.clone(),
806 current_tenant: tenant,
807 peer_ip: None,
808 mfa_present: false,
809 now_ms: crate::utils::now_unix_millis() as u128,
810 principal_is_admin_role: role == crate::auth::Role::Admin,
811 };
812 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
813 Ok(())
814 } else {
815 Err(format!(
816 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
817 principal,
818 action,
819 config_target_resource(collection, key)
820 ))
821 }
822 }
823
824 fn check_system_config_capability(
825 &self,
826 action: &str,
827 collection: &str,
828 key: &str,
829 ) -> Result<(), String> {
830 if collection != "red.config" {
831 return Ok(());
832 }
833 self.check_config_capability(action, collection, key)
834 }
835
836 pub fn config_watch_events_since(
837 &self,
838 collection: &str,
839 key: &str,
840 since_lsn: u64,
841 max_count: usize,
842 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
843 self.kv_watch_events_since(collection, key, since_lsn, max_count)
844 .into_iter()
845 .map(|event| self.policy_filter_config_watch_event(event))
846 .collect()
847 }
848
849 pub fn config_watch_events_since_prefix(
850 &self,
851 collection: &str,
852 prefix: &str,
853 since_lsn: u64,
854 max_count: usize,
855 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
856 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
857 .into_iter()
858 .map(|event| self.policy_filter_config_watch_event(event))
859 .collect()
860 }
861
862 fn policy_filter_config_watch_event(
863 &self,
864 mut event: crate::replication::cdc::KvWatchEvent,
865 ) -> crate::replication::cdc::KvWatchEvent {
866 if self
867 .check_config_capability("config:read", &event.collection, &event.key)
868 .is_err()
869 {
870 event.before = None;
871 event.after = None;
872 }
873 event
874 }
875
876 fn audit_config_resolve(
877 &self,
878 collection: &str,
879 key: &str,
880 secret_ref: Option<&ConfigSecretRef>,
881 outcome: crate::runtime::audit_log::Outcome,
882 reason: &str,
883 ) {
884 let actor = current_auth_identity()
885 .map(|(principal, _)| principal)
886 .unwrap_or_else(|| "anonymous".to_string());
887 let request_id = match current_connection_id() {
888 0 => "embedded".to_string(),
889 id => format!("conn-{id}"),
890 };
891 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
892 .principal(actor.clone())
893 .source(crate::runtime::audit_log::AuditAuthSource::Password)
894 .resource(format!(
895 "config:{}",
896 config_target_resource(collection, key)
897 ))
898 .outcome(outcome)
899 .correlation_id(request_id.clone())
900 .fields([
901 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
902 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
903 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
904 crate::runtime::audit_log::AuditFieldEscaper::field(
905 "target",
906 config_target_resource(collection, key),
907 ),
908 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
909 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
910 crate::runtime::audit_log::AuditFieldEscaper::field(
911 "connection_id",
912 current_connection_id(),
913 ),
914 ]);
915 if let Some(tenant) = current_tenant() {
916 builder = builder.tenant(tenant);
917 }
918 if let Some(secret_ref) = secret_ref {
919 builder = builder.fields([
920 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
921 crate::runtime::audit_log::AuditFieldEscaper::field(
922 "resolved_collection",
923 secret_ref.collection.as_str(),
924 ),
925 crate::runtime::audit_log::AuditFieldEscaper::field(
926 "resolved_key",
927 secret_ref.key.as_str(),
928 ),
929 crate::runtime::audit_log::AuditFieldEscaper::field(
930 "resolved_target",
931 format!("{}.{}", secret_ref.collection, secret_ref.key),
932 ),
933 ]);
934 }
935 self.audit_log().record_event(builder.build());
936 }
937}
938
939fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
940 let Value::Json(bytes) = value else {
941 return Err(RedDBError::InvalidConfig(
942 "CONFIG value is not a SecretRef".to_string(),
943 ));
944 };
945 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
946 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
947 })?;
948 let Some(object) = json.as_object() else {
949 return Err(RedDBError::InvalidConfig(
950 "CONFIG SecretRef must be an object".to_string(),
951 ));
952 };
953 let get_str = |field: &str| -> RedDBResult<&str> {
954 object
955 .get(field)
956 .and_then(|value| value.as_str())
957 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
958 };
959 if get_str("type")? != "secret_ref" {
960 return Err(RedDBError::InvalidConfig(
961 "CONFIG value is not a SecretRef".to_string(),
962 ));
963 }
964 if get_str("store")? != "vault" {
965 return Err(RedDBError::InvalidConfig(
966 "CONFIG SecretRef store is unsupported".to_string(),
967 ));
968 }
969 Ok(ConfigSecretRef {
970 collection: get_str("collection")?.to_string(),
971 key: get_str("key")?.to_string(),
972 })
973}
974
975fn config_target_resource(collection: &str, key: &str) -> String {
976 if collection == "red.config" {
977 format!("red.config/{}", key.to_ascii_lowercase())
978 } else {
979 format!("{collection}.{key}")
980 }
981}
982
983fn config_write_output(
984 raw_query: &str,
985 collection: &str,
986 key: &str,
987 version: i64,
988 id: EntityId,
989 value_type: Option<ConfigValueType>,
990 schema_version: Option<i64>,
991 tags: &[String],
992 statement: &'static str,
993 affected_rows: u64,
994) -> RuntimeQueryResult {
995 let mut result = UnifiedResult::with_columns(vec![
996 "ok".into(),
997 "collection".into(),
998 "key".into(),
999 "version".into(),
1000 "value_type".into(),
1001 "schema_version".into(),
1002 "tags".into(),
1003 "id".into(),
1004 ]);
1005 let mut record = UnifiedRecord::new();
1006 record.set("ok", Value::Boolean(true));
1007 record.set("collection", Value::text(collection.to_string()));
1008 record.set("key", Value::text(key.to_string()));
1009 record.set("version", Value::Integer(version));
1010 record.set("value_type", config_value_type_value(value_type));
1011 record.set(
1012 "schema_version",
1013 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1014 );
1015 record.set("tags", config_tags_value(tags));
1016 record.set("id", Value::Integer(id.raw() as i64));
1017 result.push(record);
1018 RuntimeQueryResult {
1019 query: raw_query.to_string(),
1020 mode: crate::storage::query::modes::QueryMode::Sql,
1021 statement,
1022 engine: "config",
1023 result,
1024 affected_rows,
1025 statement_type: if statement == "delete" {
1026 "delete"
1027 } else {
1028 "update"
1029 },
1030 }
1031}
1032
1033fn invalid_config_volatility(operation: &str) -> RedDBError {
1034 RedDBError::InvalidOperation(format!(
1035 "CONFIG does not support KV-only volatility operation {operation}"
1036 ))
1037}
1038
1039fn resolve_config_schema(
1040 latest: Option<&ConfigVersion>,
1041 requested_type: Option<ConfigValueType>,
1042) -> (Option<ConfigValueType>, Option<i64>) {
1043 let previous_type = latest.and_then(|version| version.value_type);
1044 let previous_schema_version = latest.and_then(|version| version.schema_version);
1045 match requested_type {
1046 Some(value_type) if Some(value_type) != previous_type => (
1047 Some(value_type),
1048 Some(previous_schema_version.unwrap_or(0) + 1),
1049 ),
1050 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1051 None => (previous_type, previous_schema_version),
1052 }
1053}
1054
1055fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1056 let valid = match value_type {
1057 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1058 ConfigValueType::Int => matches!(
1059 value,
1060 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1061 ),
1062 ConfigValueType::String => matches!(value, Value::Text(_)),
1063 ConfigValueType::Url => validate_config_url(value),
1064 ConfigValueType::Object => validate_config_json_shape(value, true),
1065 ConfigValueType::Array => {
1066 matches!(value, Value::Array(_) | Value::Vector(_))
1067 || validate_config_json_shape(value, false)
1068 }
1069 };
1070 if valid {
1071 Ok(())
1072 } else {
1073 Err(RedDBError::InvalidConfig(format!(
1074 "CONFIG value type mismatch: expected {}, got {}",
1075 value_type.as_str(),
1076 config_actual_value_type(value),
1077 )))
1078 }
1079}
1080
1081fn validate_config_url(value: &Value) -> bool {
1082 let url = match value {
1083 Value::Url(value) => value.as_str(),
1084 Value::Text(value) => value.as_ref(),
1085 _ => return false,
1086 };
1087 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1088}
1089
1090fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1091 let Value::Json(bytes) = value else {
1092 return false;
1093 };
1094 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1095 return false;
1096 };
1097 matches!(
1098 (object, json),
1099 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1100 )
1101}
1102
1103fn config_actual_value_type(value: &Value) -> &'static str {
1104 match value {
1105 Value::Null => "null",
1106 Value::Boolean(_) => "bool",
1107 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1108 Value::Text(_) => "string",
1109 Value::Url(_) => "url",
1110 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1111 Ok(crate::json::Value::Object(_)) => "object",
1112 Ok(crate::json::Value::Array(_)) => "array",
1113 _ => "json",
1114 },
1115 Value::Array(_) | Value::Vector(_) => "array",
1116 _ => "other",
1117 }
1118}
1119
1120fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1121 value_type
1122 .map(|value_type| Value::text(value_type.as_str()))
1123 .unwrap_or(Value::Null)
1124}
1125
1126fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1127 match value {
1128 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1129 _ => None,
1130 }
1131}
1132
1133fn config_tags_value(tags: &[String]) -> Value {
1134 if tags.is_empty() {
1135 return Value::Null;
1136 }
1137 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1138}
1139
1140fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1141 match value {
1142 Some(Value::Array(values)) => values
1143 .iter()
1144 .filter_map(|value| match value {
1145 Value::Text(tag) => Some(tag.to_string()),
1146 _ => None,
1147 })
1148 .collect(),
1149 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1150 .ok()
1151 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1152 .map(|values| {
1153 values
1154 .into_iter()
1155 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1156 .collect()
1157 })
1158 .unwrap_or_default(),
1159 _ => Vec::new(),
1160 }
1161}
1162
1163fn current_unix_ms() -> u64 {
1164 std::time::SystemTime::now()
1165 .duration_since(std::time::UNIX_EPOCH)
1166 .unwrap_or_default()
1167 .as_millis() as u64
1168}