1use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17 id: EntityId,
18 key: String,
19 version: i64,
20 value: Value,
21 tombstone: bool,
22 created_at_ms: i64,
23 op: String,
24 value_type: Option<ConfigValueType>,
25 schema_version: Option<i64>,
26 tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30 fn key(&self) -> &str {
31 &self.key
32 }
33
34 fn version(&self) -> i64 {
35 self.version
36 }
37}
38
39impl ConfigVersion {
40 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41 Self {
42 id: version.id,
43 key: version.key,
44 version: version.version,
45 value: version.value,
46 tombstone: version.tombstone,
47 created_at_ms: version.created_at_ms,
48 op: version.op,
49 value_type: row
50 .get_field("value_type")
51 .and_then(config_value_type_from_value),
52 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53 tags: config_tags_from_value(row.get_field("tags")),
54 }
55 }
56}
57
58struct ConfigSecretRef {
59 collection: String,
60 key: String,
61}
62
63impl RedDBRuntime {
64 pub fn execute_config_command(
65 &self,
66 raw_query: &str,
67 cmd: &crate::storage::query::ast::ConfigCommand,
68 ) -> RedDBResult<RuntimeQueryResult> {
69 use crate::storage::query::ast::ConfigCommand;
70
71 match cmd {
72 ConfigCommand::Put {
73 collection,
74 key,
75 value,
76 value_type,
77 tags,
78 } => self.config_write_result(
79 raw_query,
80 collection,
81 key,
82 value.clone(),
83 *value_type,
84 tags,
85 "put",
86 ),
87 ConfigCommand::Rotate {
88 collection,
89 key,
90 value,
91 value_type,
92 tags,
93 } => self.config_write_result(
94 raw_query,
95 collection,
96 key,
97 value.clone(),
98 *value_type,
99 tags,
100 "rotate",
101 ),
102 ConfigCommand::Get { collection, key } => {
103 self.config_get_result(raw_query, collection, key)
104 }
105 ConfigCommand::Resolve { collection, key } => {
106 self.config_resolve_result(raw_query, collection, key)
107 }
108 ConfigCommand::Delete { collection, key } => {
109 self.config_delete_result(raw_query, collection, key)
110 }
111 ConfigCommand::History { collection, key } => {
112 self.config_history_result(raw_query, collection, key)
113 }
114 ConfigCommand::List {
115 collection,
116 prefix,
117 limit,
118 offset,
119 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120 ConfigCommand::Watch {
121 collection,
122 key,
123 prefix,
124 from_lsn,
125 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127 Err(invalid_config_volatility(operation))
128 }
129 }
130 }
131
132 pub(crate) fn validate_config_command_before_auth(
133 &self,
134 cmd: &crate::storage::query::ast::ConfigCommand,
135 ) -> RedDBResult<()> {
136 use crate::storage::query::ast::ConfigCommand;
137 match cmd {
138 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139 Err(invalid_config_volatility(operation))
140 }
141 ConfigCommand::Put { collection, .. }
142 | ConfigCommand::Get { collection, .. }
143 | ConfigCommand::Resolve { collection, .. }
144 | ConfigCommand::Rotate { collection, .. }
145 | ConfigCommand::Delete { collection, .. }
146 | ConfigCommand::History { collection, .. }
147 | ConfigCommand::List { collection, .. }
148 | ConfigCommand::Watch { collection, .. } => {
149 let snapshot = self.inner.db.catalog_model_snapshot();
150 let Some(actual_model) = snapshot
151 .collections
152 .iter()
153 .find(|c| c.name == *collection)
154 .map(|c| c.declared_model.unwrap_or(c.model))
155 else {
156 return Ok(());
157 };
158 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159 CollectionModel::Config,
160 actual_model,
161 )
162 }
163 }
164 }
165
166 fn config_resolve_result(
167 &self,
168 raw_query: &str,
169 collection: &str,
170 key: &str,
171 ) -> RedDBResult<RuntimeQueryResult> {
172 let latest = self.latest_config_version(collection, key)?;
173 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174 self.audit_config_resolve(
175 collection,
176 key,
177 None,
178 crate::runtime::audit_log::Outcome::Denied,
179 &reason,
180 );
181 return Err(RedDBError::Query(reason));
182 }
183
184 let Some(version) = latest else {
185 let reason = "not_found";
186 self.audit_config_resolve(
187 collection,
188 key,
189 None,
190 crate::runtime::audit_log::Outcome::Denied,
191 reason,
192 );
193 return Err(RedDBError::NotFound(format!(
194 "config '{}.{}' not found",
195 collection, key
196 )));
197 };
198 if version.tombstone {
199 let reason = "deleted";
200 self.audit_config_resolve(
201 collection,
202 key,
203 None,
204 crate::runtime::audit_log::Outcome::Denied,
205 reason,
206 );
207 return Err(RedDBError::NotFound(format!(
208 "config '{}.{}' is deleted",
209 collection, key
210 )));
211 }
212
213 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214 self.audit_config_resolve(
215 collection,
216 key,
217 None,
218 crate::runtime::audit_log::Outcome::Error,
219 &err.to_string(),
220 );
221 })?;
222
223 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224 Ok(value) => {
225 self.audit_config_resolve(
226 collection,
227 key,
228 Some(&secret_ref),
229 crate::runtime::audit_log::Outcome::Success,
230 "ok",
231 );
232 let mut result = UnifiedResult::with_columns(vec![
233 "collection".into(),
234 "key".into(),
235 "value".into(),
236 "resolved_store".into(),
237 "resolved_collection".into(),
238 "resolved_key".into(),
239 ]);
240 let mut record = UnifiedRecord::new();
241 record.set("collection", Value::text(collection.to_string()));
242 record.set("key", Value::text(key.to_string()));
243 record.set("value", value);
244 record.set("resolved_store", Value::text("vault"));
245 record.set("resolved_collection", Value::text(secret_ref.collection));
246 record.set("resolved_key", Value::text(secret_ref.key));
247 result.push(record);
248 Ok(RuntimeQueryResult {
249 query: raw_query.to_string(),
250 mode: crate::storage::query::modes::QueryMode::Sql,
251 statement: "config_resolve",
252 engine: "config",
253 result,
254 affected_rows: 0,
255 statement_type: "select",
256 })
257 }
258 Err(err) => {
259 let reason = err.to_string();
260 let outcome = if reason.contains("denied") {
261 crate::runtime::audit_log::Outcome::Denied
262 } else {
263 crate::runtime::audit_log::Outcome::Error
264 };
265 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266 Err(err)
267 }
268 }
269 }
270
271 fn config_write_result(
272 &self,
273 raw_query: &str,
274 collection: &str,
275 key: &str,
276 value: Value,
277 requested_type: Option<ConfigValueType>,
278 tags: &[String],
279 op: &str,
280 ) -> RedDBResult<RuntimeQueryResult> {
281 self.check_system_config_capability("config:write", collection, key)
282 .map_err(RedDBError::Query)?;
283 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284 self.ensure_config_collection(collection)?;
285 let latest = self.latest_config_version(collection, key)?;
286 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288 if let Some(value_type) = value_type {
289 validate_config_value_type(&value, value_type)?;
290 }
291 let before = latest.as_ref().and_then(|version| {
292 if version.tombstone {
293 None
294 } else {
295 Some(crate::presentation::entity_json::storage_value_to_json(
296 &version.value,
297 ))
298 }
299 });
300 let after = Some(crate::presentation::entity_json::storage_value_to_json(
301 &value,
302 ));
303 let change_op = if latest.is_some() {
304 crate::replication::cdc::ChangeOperation::Update
305 } else {
306 crate::replication::cdc::ChangeOperation::Insert
307 };
308 let id = self.append_config_version(
309 collection,
310 key,
311 value,
312 version,
313 false,
314 op,
315 value_type,
316 schema_version,
317 tags,
318 )?;
319 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320 self.prune_config_history(collection, key)?;
321 self.invalidate_result_cache();
322 Ok(config_write_output(
323 raw_query,
324 collection,
325 key,
326 version,
327 id,
328 value_type,
329 schema_version,
330 tags,
331 match op {
332 "rotate" => "config_rotate",
333 _ => "config_put",
334 },
335 1,
336 ))
337 }
338
339 fn config_delete_result(
340 &self,
341 raw_query: &str,
342 collection: &str,
343 key: &str,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 self.check_system_config_capability("config:write", collection, key)
346 .map_err(RedDBError::Query)?;
347 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348 self.ensure_config_collection(collection)?;
349 let latest = self.latest_config_version(collection, key)?;
350 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351 let value_type = latest.as_ref().and_then(|version| version.value_type);
352 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353 let id = self.append_config_version(
354 collection,
355 key,
356 Value::Null,
357 version,
358 true,
359 "delete",
360 value_type,
361 schema_version,
362 &[],
363 )?;
364 if let Some(before) = latest.as_ref().and_then(|version| {
365 if version.tombstone {
366 None
367 } else {
368 Some(crate::presentation::entity_json::storage_value_to_json(
369 &version.value,
370 ))
371 }
372 }) {
373 self.record_kv_watch_event(
374 crate::replication::cdc::ChangeOperation::Delete,
375 collection,
376 key,
377 id.raw(),
378 Some(before),
379 None,
380 );
381 }
382 self.prune_config_history(collection, key)?;
383 self.invalidate_result_cache();
384 Ok(config_write_output(
385 raw_query,
386 collection,
387 key,
388 version,
389 id,
390 value_type,
391 schema_version,
392 &[],
393 "delete",
394 1,
395 ))
396 }
397
398 fn config_get_result(
399 &self,
400 raw_query: &str,
401 collection: &str,
402 key: &str,
403 ) -> RedDBResult<RuntimeQueryResult> {
404 self.check_system_config_capability("config:read", collection, key)
405 .map_err(RedDBError::Query)?;
406 let latest = self.latest_config_version(collection, key)?;
407 let mut result = UnifiedResult::with_columns(vec![
408 "collection".into(),
409 "key".into(),
410 "value".into(),
411 "version".into(),
412 "value_type".into(),
413 "schema_version".into(),
414 "tags".into(),
415 "tombstone".into(),
416 ]);
417 let mut record = UnifiedRecord::new();
418 record.set("collection", Value::text(collection.to_string()));
419 record.set("key", Value::text(key.to_string()));
420 if let Some(version) = latest {
421 record.set("value", version.value);
422 record.set("version", Value::Integer(version.version));
423 record.set("value_type", config_value_type_value(version.value_type));
424 record.set(
425 "schema_version",
426 version
427 .schema_version
428 .map(Value::Integer)
429 .unwrap_or(Value::Null),
430 );
431 record.set("tags", config_tags_value(&version.tags));
432 record.set("tombstone", Value::Boolean(version.tombstone));
433 } else {
434 record.set("value", Value::Null);
435 record.set("version", Value::Null);
436 record.set("value_type", Value::Null);
437 record.set("schema_version", Value::Null);
438 record.set("tags", Value::Null);
439 record.set("tombstone", Value::Boolean(false));
440 }
441 result.push(record);
442 Ok(RuntimeQueryResult {
443 query: raw_query.to_string(),
444 mode: crate::storage::query::modes::QueryMode::Sql,
445 statement: "config_get",
446 engine: "config",
447 result,
448 affected_rows: 0,
449 statement_type: "select",
450 })
451 }
452
453 fn config_history_result(
454 &self,
455 raw_query: &str,
456 collection: &str,
457 key: &str,
458 ) -> RedDBResult<RuntimeQueryResult> {
459 self.check_system_config_capability("config:read", collection, key)
460 .map_err(RedDBError::Query)?;
461 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462 let mut result = UnifiedResult::with_columns(vec![
463 "collection".into(),
464 "key".into(),
465 "version".into(),
466 "value".into(),
467 "value_type".into(),
468 "schema_version".into(),
469 "tags".into(),
470 "tombstone".into(),
471 "op".into(),
472 "created_at_ms".into(),
473 ]);
474 for version in versions {
475 let mut record = UnifiedRecord::new();
476 record.set("collection", Value::text(collection.to_string()));
477 record.set("key", Value::text(key.to_string()));
478 record.set("version", Value::Integer(version.version));
479 record.set("value", version.value);
480 record.set("value_type", config_value_type_value(version.value_type));
481 record.set(
482 "schema_version",
483 version
484 .schema_version
485 .map(Value::Integer)
486 .unwrap_or(Value::Null),
487 );
488 record.set("tags", Value::Null);
489 record.set("tombstone", Value::Boolean(version.tombstone));
490 record.set("op", Value::text(version.op));
491 record.set("created_at_ms", Value::Integer(version.created_at_ms));
492 result.push(record);
493 }
494 Ok(RuntimeQueryResult {
495 query: raw_query.to_string(),
496 mode: crate::storage::query::modes::QueryMode::Sql,
497 statement: "config_history",
498 engine: "config",
499 result,
500 affected_rows: 0,
501 statement_type: "select",
502 })
503 }
504
505 fn config_list_result(
506 &self,
507 raw_query: &str,
508 collection: &str,
509 prefix: Option<&str>,
510 limit: Option<usize>,
511 offset: usize,
512 ) -> RedDBResult<RuntimeQueryResult> {
513 let mut versions = self.latest_config_versions(collection, prefix)?;
514 versions.sort_by(|left, right| left.key.cmp(&right.key));
515 let mut result = UnifiedResult::with_columns(vec![
516 "collection".into(),
517 "key".into(),
518 "value".into(),
519 "version".into(),
520 "value_type".into(),
521 "schema_version".into(),
522 "tags".into(),
523 "tombstone".into(),
524 "op".into(),
525 "created_at_ms".into(),
526 ]);
527 for version in versions
528 .into_iter()
529 .filter(|version| {
530 self.check_config_capability("config:read", collection, &version.key)
531 .is_ok()
532 })
533 .skip(offset)
534 .take(limit.unwrap_or(usize::MAX))
535 {
536 let mut record = UnifiedRecord::new();
537 record.set("collection", Value::text(collection.to_string()));
538 record.set("key", Value::text(version.key));
539 record.set("value", version.value);
540 record.set("version", Value::Integer(version.version));
541 record.set("value_type", config_value_type_value(version.value_type));
542 record.set(
543 "schema_version",
544 version
545 .schema_version
546 .map(Value::Integer)
547 .unwrap_or(Value::Null),
548 );
549 record.set("tags", config_tags_value(&version.tags));
550 record.set("tombstone", Value::Boolean(version.tombstone));
551 record.set("op", Value::text(version.op));
552 record.set("created_at_ms", Value::Integer(version.created_at_ms));
553 result.push(record);
554 }
555 Ok(RuntimeQueryResult {
556 query: raw_query.to_string(),
557 mode: crate::storage::query::modes::QueryMode::Sql,
558 statement: "config_list",
559 engine: "config",
560 result,
561 affected_rows: 0,
562 statement_type: "select",
563 })
564 }
565
566 fn config_watch_result(
567 &self,
568 raw_query: &str,
569 collection: &str,
570 key: &str,
571 prefix: bool,
572 from_lsn: Option<u64>,
573 ) -> RedDBResult<RuntimeQueryResult> {
574 let watch_key = if prefix {
575 format!("{key}.*")
576 } else {
577 key.to_string()
578 };
579 let endpoint = match from_lsn {
580 Some(lsn) => {
581 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582 }
583 None => format!("/collections/{collection}/config/{watch_key}/watch"),
584 };
585 let mut result = UnifiedResult::with_columns(vec![
586 "collection".into(),
587 "key".into(),
588 "prefix".into(),
589 "from_lsn".into(),
590 "watch_url".into(),
591 "streaming".into(),
592 ]);
593 let mut record = UnifiedRecord::new();
594 record.set("collection", Value::text(collection.to_string()));
595 record.set("key", Value::text(watch_key));
596 record.set("prefix", Value::Boolean(prefix));
597 record.set(
598 "from_lsn",
599 from_lsn
600 .map(Value::UnsignedInteger)
601 .unwrap_or(crate::storage::schema::Value::Null),
602 );
603 record.set("watch_url", Value::text(endpoint));
604 record.set("streaming", Value::Boolean(true));
605 result.push(record);
606 Ok(RuntimeQueryResult {
607 query: raw_query.to_string(),
608 mode: crate::storage::query::modes::QueryMode::Sql,
609 statement: "config_watch",
610 engine: "config",
611 result,
612 affected_rows: 0,
613 statement_type: "stream",
614 })
615 }
616
617 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618 let store = self.inner.db.store();
619 if store.get_collection(collection).is_none() {
620 store
621 .create_collection(collection)
622 .map_err(|err| RedDBError::Internal(err.to_string()))?;
623 }
624 if let Some(contract) = self.inner.db.collection_contract(collection) {
625 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626 CollectionModel::Config,
627 contract.declared_model,
628 )?;
629 return Ok(());
630 }
631 let now = current_unix_ms();
632 self.inner
633 .db
634 .save_collection_contract(CollectionContract {
635 name: collection.to_string(),
636 declared_model: CollectionModel::Config,
637 schema_mode: SchemaMode::Dynamic,
638 origin: ContractOrigin::Explicit,
639 version: 1,
640 created_at_unix_ms: now as u128,
641 updated_at_unix_ms: now as u128,
642 default_ttl_ms: None,
643 context_index_fields: Vec::new(),
644 declared_columns: Vec::new(),
645 table_def: None,
646 timestamps_enabled: false,
647 context_index_enabled: false,
648 append_only: false,
649 subscriptions: Vec::new(),
650 })
651 .map(|_| ())
652 .map_err(|err| RedDBError::Internal(err.to_string()))
653 }
654
655 fn append_config_version(
656 &self,
657 collection: &str,
658 key: &str,
659 value: Value,
660 version: i64,
661 tombstone: bool,
662 op: &str,
663 value_type: Option<ConfigValueType>,
664 schema_version: Option<i64>,
665 tags: &[String],
666 ) -> RedDBResult<EntityId> {
667 let now = current_unix_ms() as i64;
668 let fields = vec![
669 ("key".to_string(), Value::text(key.to_string())),
670 ("value".to_string(), value),
671 ("version".to_string(), Value::Integer(version)),
672 (
673 "value_type".to_string(),
674 config_value_type_value(value_type),
675 ),
676 (
677 "schema_version".to_string(),
678 schema_version.map(Value::Integer).unwrap_or(Value::Null),
679 ),
680 ("tombstone".to_string(), Value::Boolean(tombstone)),
681 ("op".to_string(), Value::text(op.to_string())),
682 ("created_at_ms".to_string(), Value::Integer(now)),
683 ("tags".to_string(), config_tags_value(tags)),
684 ];
685 let mut row = RowData::new(Vec::new());
686 row.named = Some(fields.into_iter().collect());
687 let entity = UnifiedEntity::new(
688 EntityId::new(0),
689 EntityKind::TableRow {
690 table: Arc::from(collection),
691 row_id: 0,
692 },
693 EntityData::Row(row),
694 );
695 self.inner
696 .db
697 .store()
698 .insert(collection, entity)
699 .map_err(|err| RedDBError::Internal(err.to_string()))
700 }
701
702 fn latest_config_version(
703 &self,
704 collection: &str,
705 key: &str,
706 ) -> RedDBResult<Option<ConfigVersion>> {
707 Ok(super::keyed_spine::latest_version(
708 self.config_versions(collection, key)?,
709 ))
710 }
711
712 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
713 let store = self.inner.db.store();
714 let Some(manager) = store.get_collection(collection) else {
715 return Ok(Vec::new());
716 };
717 let mut versions = Vec::new();
718 for entity in manager.query_all(|_| true) {
719 let EntityData::Row(row) = &entity.data else {
720 continue;
721 };
722 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
723 continue;
724 };
725 if version.key != key {
726 continue;
727 }
728 versions.push(ConfigVersion::from_keyed_row(version, row));
729 }
730 Ok(versions)
731 }
732
733 fn latest_config_versions(
734 &self,
735 collection: &str,
736 prefix: Option<&str>,
737 ) -> RedDBResult<Vec<ConfigVersion>> {
738 let store = self.inner.db.store();
739 let Some(manager) = store.get_collection(collection) else {
740 return Ok(Vec::new());
741 };
742 let mut versions = Vec::new();
743 for entity in manager.query_all(|_| true) {
744 let EntityData::Row(row) = &entity.data else {
745 continue;
746 };
747 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
748 continue;
749 };
750 versions.push(ConfigVersion::from_keyed_row(version, row));
751 }
752 Ok(super::keyed_spine::latest_versions(versions, prefix))
753 }
754
755 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
756 let mut versions = self.config_versions(collection, key)?;
757 if versions.len() <= CONFIG_HISTORY_LIMIT {
758 return Ok(());
759 }
760 versions = super::keyed_spine::history_versions(versions);
761 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
762 let store = self.inner.db.store();
763 for version in versions.into_iter().take(drop_count) {
764 store
765 .delete(collection, version.id)
766 .map_err(|err| RedDBError::Internal(err.to_string()))?;
767 }
768 Ok(())
769 }
770
771 fn check_config_capability(
772 &self,
773 action: &str,
774 collection: &str,
775 key: &str,
776 ) -> Result<(), String> {
777 let Some(auth_store) = self.inner.auth_store.read().clone() else {
778 return Ok(());
779 };
780 if !auth_store.iam_authorization_enabled() {
781 return Ok(());
782 }
783 let Some((principal, role)) = current_auth_identity() else {
784 return Err(
785 "IAM authorization is enabled; config capability check requires an authenticated principal"
786 .to_string(),
787 );
788 };
789 let tenant = current_tenant();
790 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
791 let mut resource = crate::auth::policies::ResourceRef::new(
792 "config",
793 config_target_resource(collection, key),
794 );
795 if let Some(ref tenant) = tenant {
796 resource = resource.with_tenant(tenant.clone());
797 }
798 let ctx = crate::auth::policies::EvalContext {
799 principal_tenant: tenant.clone(),
800 current_tenant: tenant,
801 peer_ip: None,
802 mfa_present: false,
803 now_ms: crate::utils::now_unix_millis() as u128,
804 principal_is_admin_role: role == crate::auth::Role::Admin,
805 };
806 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
807 Ok(())
808 } else {
809 Err(format!(
810 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
811 principal,
812 action,
813 config_target_resource(collection, key)
814 ))
815 }
816 }
817
818 fn check_system_config_capability(
819 &self,
820 action: &str,
821 collection: &str,
822 key: &str,
823 ) -> Result<(), String> {
824 if collection != "red.config" {
825 return Ok(());
826 }
827 self.check_config_capability(action, collection, key)
828 }
829
830 pub fn config_watch_events_since(
831 &self,
832 collection: &str,
833 key: &str,
834 since_lsn: u64,
835 max_count: usize,
836 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
837 self.kv_watch_events_since(collection, key, since_lsn, max_count)
838 .into_iter()
839 .map(|event| self.policy_filter_config_watch_event(event))
840 .collect()
841 }
842
843 pub fn config_watch_events_since_prefix(
844 &self,
845 collection: &str,
846 prefix: &str,
847 since_lsn: u64,
848 max_count: usize,
849 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
850 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
851 .into_iter()
852 .map(|event| self.policy_filter_config_watch_event(event))
853 .collect()
854 }
855
856 fn policy_filter_config_watch_event(
857 &self,
858 mut event: crate::replication::cdc::KvWatchEvent,
859 ) -> crate::replication::cdc::KvWatchEvent {
860 if self
861 .check_config_capability("config:read", &event.collection, &event.key)
862 .is_err()
863 {
864 event.before = None;
865 event.after = None;
866 }
867 event
868 }
869
870 fn audit_config_resolve(
871 &self,
872 collection: &str,
873 key: &str,
874 secret_ref: Option<&ConfigSecretRef>,
875 outcome: crate::runtime::audit_log::Outcome,
876 reason: &str,
877 ) {
878 let actor = current_auth_identity()
879 .map(|(principal, _)| principal)
880 .unwrap_or_else(|| "anonymous".to_string());
881 let request_id = match current_connection_id() {
882 0 => "embedded".to_string(),
883 id => format!("conn-{id}"),
884 };
885 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
886 .principal(actor.clone())
887 .source(crate::runtime::audit_log::AuditAuthSource::Password)
888 .resource(format!(
889 "config:{}",
890 config_target_resource(collection, key)
891 ))
892 .outcome(outcome)
893 .correlation_id(request_id.clone())
894 .fields([
895 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
896 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
897 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
898 crate::runtime::audit_log::AuditFieldEscaper::field(
899 "target",
900 config_target_resource(collection, key),
901 ),
902 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
903 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
904 crate::runtime::audit_log::AuditFieldEscaper::field(
905 "connection_id",
906 current_connection_id(),
907 ),
908 ]);
909 if let Some(tenant) = current_tenant() {
910 builder = builder.tenant(tenant);
911 }
912 if let Some(secret_ref) = secret_ref {
913 builder = builder.fields([
914 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
915 crate::runtime::audit_log::AuditFieldEscaper::field(
916 "resolved_collection",
917 secret_ref.collection.as_str(),
918 ),
919 crate::runtime::audit_log::AuditFieldEscaper::field(
920 "resolved_key",
921 secret_ref.key.as_str(),
922 ),
923 crate::runtime::audit_log::AuditFieldEscaper::field(
924 "resolved_target",
925 format!("{}.{}", secret_ref.collection, secret_ref.key),
926 ),
927 ]);
928 }
929 self.audit_log().record_event(builder.build());
930 }
931}
932
933fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
934 let Value::Json(bytes) = value else {
935 return Err(RedDBError::InvalidConfig(
936 "CONFIG value is not a SecretRef".to_string(),
937 ));
938 };
939 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
940 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
941 })?;
942 let Some(object) = json.as_object() else {
943 return Err(RedDBError::InvalidConfig(
944 "CONFIG SecretRef must be an object".to_string(),
945 ));
946 };
947 let get_str = |field: &str| -> RedDBResult<&str> {
948 object
949 .get(field)
950 .and_then(|value| value.as_str())
951 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
952 };
953 if get_str("type")? != "secret_ref" {
954 return Err(RedDBError::InvalidConfig(
955 "CONFIG value is not a SecretRef".to_string(),
956 ));
957 }
958 if get_str("store")? != "vault" {
959 return Err(RedDBError::InvalidConfig(
960 "CONFIG SecretRef store is unsupported".to_string(),
961 ));
962 }
963 Ok(ConfigSecretRef {
964 collection: get_str("collection")?.to_string(),
965 key: get_str("key")?.to_string(),
966 })
967}
968
969fn config_target_resource(collection: &str, key: &str) -> String {
970 if collection == "red.config" {
971 format!("red.config/{}", key.to_ascii_lowercase())
972 } else {
973 format!("{collection}.{key}")
974 }
975}
976
977fn config_write_output(
978 raw_query: &str,
979 collection: &str,
980 key: &str,
981 version: i64,
982 id: EntityId,
983 value_type: Option<ConfigValueType>,
984 schema_version: Option<i64>,
985 tags: &[String],
986 statement: &'static str,
987 affected_rows: u64,
988) -> RuntimeQueryResult {
989 let mut result = UnifiedResult::with_columns(vec![
990 "ok".into(),
991 "collection".into(),
992 "key".into(),
993 "version".into(),
994 "value_type".into(),
995 "schema_version".into(),
996 "tags".into(),
997 "id".into(),
998 ]);
999 let mut record = UnifiedRecord::new();
1000 record.set("ok", Value::Boolean(true));
1001 record.set("collection", Value::text(collection.to_string()));
1002 record.set("key", Value::text(key.to_string()));
1003 record.set("version", Value::Integer(version));
1004 record.set("value_type", config_value_type_value(value_type));
1005 record.set(
1006 "schema_version",
1007 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1008 );
1009 record.set("tags", config_tags_value(tags));
1010 record.set("id", Value::Integer(id.raw() as i64));
1011 result.push(record);
1012 RuntimeQueryResult {
1013 query: raw_query.to_string(),
1014 mode: crate::storage::query::modes::QueryMode::Sql,
1015 statement,
1016 engine: "config",
1017 result,
1018 affected_rows,
1019 statement_type: if statement == "delete" {
1020 "delete"
1021 } else {
1022 "update"
1023 },
1024 }
1025}
1026
1027fn invalid_config_volatility(operation: &str) -> RedDBError {
1028 RedDBError::InvalidOperation(format!(
1029 "CONFIG does not support KV-only volatility operation {operation}"
1030 ))
1031}
1032
1033fn resolve_config_schema(
1034 latest: Option<&ConfigVersion>,
1035 requested_type: Option<ConfigValueType>,
1036) -> (Option<ConfigValueType>, Option<i64>) {
1037 let previous_type = latest.and_then(|version| version.value_type);
1038 let previous_schema_version = latest.and_then(|version| version.schema_version);
1039 match requested_type {
1040 Some(value_type) if Some(value_type) != previous_type => (
1041 Some(value_type),
1042 Some(previous_schema_version.unwrap_or(0) + 1),
1043 ),
1044 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1045 None => (previous_type, previous_schema_version),
1046 }
1047}
1048
1049fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1050 let valid = match value_type {
1051 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1052 ConfigValueType::Int => matches!(
1053 value,
1054 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1055 ),
1056 ConfigValueType::String => matches!(value, Value::Text(_)),
1057 ConfigValueType::Url => validate_config_url(value),
1058 ConfigValueType::Object => validate_config_json_shape(value, true),
1059 ConfigValueType::Array => {
1060 matches!(value, Value::Array(_) | Value::Vector(_))
1061 || validate_config_json_shape(value, false)
1062 }
1063 };
1064 if valid {
1065 Ok(())
1066 } else {
1067 Err(RedDBError::InvalidConfig(format!(
1068 "CONFIG value type mismatch: expected {}, got {}",
1069 value_type.as_str(),
1070 config_actual_value_type(value),
1071 )))
1072 }
1073}
1074
1075fn validate_config_url(value: &Value) -> bool {
1076 let url = match value {
1077 Value::Url(value) => value.as_str(),
1078 Value::Text(value) => value.as_ref(),
1079 _ => return false,
1080 };
1081 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1082}
1083
1084fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1085 let Value::Json(bytes) = value else {
1086 return false;
1087 };
1088 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1089 return false;
1090 };
1091 matches!(
1092 (object, json),
1093 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1094 )
1095}
1096
1097fn config_actual_value_type(value: &Value) -> &'static str {
1098 match value {
1099 Value::Null => "null",
1100 Value::Boolean(_) => "bool",
1101 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1102 Value::Text(_) => "string",
1103 Value::Url(_) => "url",
1104 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1105 Ok(crate::json::Value::Object(_)) => "object",
1106 Ok(crate::json::Value::Array(_)) => "array",
1107 _ => "json",
1108 },
1109 Value::Array(_) | Value::Vector(_) => "array",
1110 _ => "other",
1111 }
1112}
1113
1114fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1115 value_type
1116 .map(|value_type| Value::text(value_type.as_str()))
1117 .unwrap_or(Value::Null)
1118}
1119
1120fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1121 match value {
1122 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1123 _ => None,
1124 }
1125}
1126
1127fn config_tags_value(tags: &[String]) -> Value {
1128 if tags.is_empty() {
1129 return Value::Null;
1130 }
1131 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1132}
1133
1134fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1135 match value {
1136 Some(Value::Array(values)) => values
1137 .iter()
1138 .filter_map(|value| match value {
1139 Value::Text(tag) => Some(tag.to_string()),
1140 _ => None,
1141 })
1142 .collect(),
1143 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1144 .ok()
1145 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1146 .map(|values| {
1147 values
1148 .into_iter()
1149 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1150 .collect()
1151 })
1152 .unwrap_or_default(),
1153 _ => Vec::new(),
1154 }
1155}
1156
1157fn current_unix_ms() -> u64 {
1158 std::time::SystemTime::now()
1159 .duration_since(std::time::UNIX_EPOCH)
1160 .unwrap_or_default()
1161 .as_millis() as u64
1162}