1use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17 id: EntityId,
18 key: String,
19 version: i64,
20 value: Value,
21 tombstone: bool,
22 created_at_ms: i64,
23 op: String,
24 value_type: Option<ConfigValueType>,
25 schema_version: Option<i64>,
26 tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30 fn key(&self) -> &str {
31 &self.key
32 }
33
34 fn version(&self) -> i64 {
35 self.version
36 }
37}
38
39impl ConfigVersion {
40 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41 Self {
42 id: version.id,
43 key: version.key,
44 version: version.version,
45 value: version.value,
46 tombstone: version.tombstone,
47 created_at_ms: version.created_at_ms,
48 op: version.op,
49 value_type: row
50 .get_field("value_type")
51 .and_then(config_value_type_from_value),
52 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53 tags: config_tags_from_value(row.get_field("tags")),
54 }
55 }
56}
57
58struct ConfigSecretRef {
59 collection: String,
60 key: String,
61}
62
63impl RedDBRuntime {
64 pub fn execute_config_command(
65 &self,
66 raw_query: &str,
67 cmd: &crate::storage::query::ast::ConfigCommand,
68 ) -> RedDBResult<RuntimeQueryResult> {
69 use crate::storage::query::ast::ConfigCommand;
70
71 match cmd {
72 ConfigCommand::Put {
73 collection,
74 key,
75 value,
76 value_type,
77 tags,
78 } => self.config_write_result(
79 raw_query,
80 collection,
81 key,
82 value.clone(),
83 *value_type,
84 tags,
85 "put",
86 ),
87 ConfigCommand::Rotate {
88 collection,
89 key,
90 value,
91 value_type,
92 tags,
93 } => self.config_write_result(
94 raw_query,
95 collection,
96 key,
97 value.clone(),
98 *value_type,
99 tags,
100 "rotate",
101 ),
102 ConfigCommand::Get { collection, key } => {
103 self.config_get_result(raw_query, collection, key)
104 }
105 ConfigCommand::Resolve { collection, key } => {
106 self.config_resolve_result(raw_query, collection, key)
107 }
108 ConfigCommand::Delete { collection, key } => {
109 self.config_delete_result(raw_query, collection, key)
110 }
111 ConfigCommand::History { collection, key } => {
112 self.config_history_result(raw_query, collection, key)
113 }
114 ConfigCommand::List {
115 collection,
116 prefix,
117 limit,
118 offset,
119 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120 ConfigCommand::Watch {
121 collection,
122 key,
123 prefix,
124 from_lsn,
125 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127 Err(invalid_config_volatility(operation))
128 }
129 }
130 }
131
132 pub(crate) fn validate_config_command_before_auth(
133 &self,
134 cmd: &crate::storage::query::ast::ConfigCommand,
135 ) -> RedDBResult<()> {
136 use crate::storage::query::ast::ConfigCommand;
137 match cmd {
138 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139 Err(invalid_config_volatility(operation))
140 }
141 ConfigCommand::Put { collection, .. }
142 | ConfigCommand::Get { collection, .. }
143 | ConfigCommand::Resolve { collection, .. }
144 | ConfigCommand::Rotate { collection, .. }
145 | ConfigCommand::Delete { collection, .. }
146 | ConfigCommand::History { collection, .. }
147 | ConfigCommand::List { collection, .. }
148 | ConfigCommand::Watch { collection, .. } => {
149 let snapshot = self.inner.db.catalog_model_snapshot();
150 let Some(actual_model) = snapshot
151 .collections
152 .iter()
153 .find(|c| c.name == *collection)
154 .map(|c| c.declared_model.unwrap_or(c.model))
155 else {
156 return Ok(());
157 };
158 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159 CollectionModel::Config,
160 actual_model,
161 )
162 }
163 }
164 }
165
166 fn config_resolve_result(
167 &self,
168 raw_query: &str,
169 collection: &str,
170 key: &str,
171 ) -> RedDBResult<RuntimeQueryResult> {
172 let latest = self.latest_config_version(collection, key)?;
173 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174 self.audit_config_resolve(
175 collection,
176 key,
177 None,
178 crate::runtime::audit_log::Outcome::Denied,
179 &reason,
180 );
181 return Err(RedDBError::Query(reason));
182 }
183
184 let Some(version) = latest else {
185 let reason = "not_found";
186 self.audit_config_resolve(
187 collection,
188 key,
189 None,
190 crate::runtime::audit_log::Outcome::Denied,
191 reason,
192 );
193 return Err(RedDBError::NotFound(format!(
194 "config '{}.{}' not found",
195 collection, key
196 )));
197 };
198 if version.tombstone {
199 let reason = "deleted";
200 self.audit_config_resolve(
201 collection,
202 key,
203 None,
204 crate::runtime::audit_log::Outcome::Denied,
205 reason,
206 );
207 return Err(RedDBError::NotFound(format!(
208 "config '{}.{}' is deleted",
209 collection, key
210 )));
211 }
212
213 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214 self.audit_config_resolve(
215 collection,
216 key,
217 None,
218 crate::runtime::audit_log::Outcome::Error,
219 &err.to_string(),
220 );
221 })?;
222
223 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224 Ok(value) => {
225 self.audit_config_resolve(
226 collection,
227 key,
228 Some(&secret_ref),
229 crate::runtime::audit_log::Outcome::Success,
230 "ok",
231 );
232 let mut result = UnifiedResult::with_columns(vec![
233 "collection".into(),
234 "key".into(),
235 "value".into(),
236 "resolved_store".into(),
237 "resolved_collection".into(),
238 "resolved_key".into(),
239 ]);
240 let mut record = UnifiedRecord::new();
241 record.set("collection", Value::text(collection.to_string()));
242 record.set("key", Value::text(key.to_string()));
243 record.set("value", value);
244 record.set("resolved_store", Value::text("vault"));
245 record.set("resolved_collection", Value::text(secret_ref.collection));
246 record.set("resolved_key", Value::text(secret_ref.key));
247 result.push(record);
248 Ok(RuntimeQueryResult {
249 query: raw_query.to_string(),
250 mode: crate::storage::query::modes::QueryMode::Sql,
251 statement: "config_resolve",
252 engine: "config",
253 result,
254 affected_rows: 0,
255 statement_type: "select",
256 })
257 }
258 Err(err) => {
259 let reason = err.to_string();
260 let outcome = if reason.contains("denied") {
261 crate::runtime::audit_log::Outcome::Denied
262 } else {
263 crate::runtime::audit_log::Outcome::Error
264 };
265 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266 Err(err)
267 }
268 }
269 }
270
271 fn config_write_result(
272 &self,
273 raw_query: &str,
274 collection: &str,
275 key: &str,
276 value: Value,
277 requested_type: Option<ConfigValueType>,
278 tags: &[String],
279 op: &str,
280 ) -> RedDBResult<RuntimeQueryResult> {
281 self.check_system_config_capability("config:write", collection, key)
282 .map_err(RedDBError::Query)?;
283 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284 self.ensure_config_collection(collection)?;
285 let latest = self.latest_config_version(collection, key)?;
286 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288 if let Some(value_type) = value_type {
289 validate_config_value_type(&value, value_type)?;
290 }
291 let before = latest.as_ref().and_then(|version| {
292 if version.tombstone {
293 None
294 } else {
295 Some(crate::presentation::entity_json::storage_value_to_json(
296 &version.value,
297 ))
298 }
299 });
300 let after = Some(crate::presentation::entity_json::storage_value_to_json(
301 &value,
302 ));
303 let change_op = if latest.is_some() {
304 crate::replication::cdc::ChangeOperation::Update
305 } else {
306 crate::replication::cdc::ChangeOperation::Insert
307 };
308 let id = self.append_config_version(
309 collection,
310 key,
311 value,
312 version,
313 false,
314 op,
315 value_type,
316 schema_version,
317 tags,
318 )?;
319 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320 self.prune_config_history(collection, key)?;
321 self.invalidate_result_cache();
322 Ok(config_write_output(
323 raw_query,
324 collection,
325 key,
326 version,
327 id,
328 value_type,
329 schema_version,
330 tags,
331 match op {
332 "rotate" => "config_rotate",
333 _ => "config_put",
334 },
335 1,
336 ))
337 }
338
339 fn config_delete_result(
340 &self,
341 raw_query: &str,
342 collection: &str,
343 key: &str,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 self.check_system_config_capability("config:write", collection, key)
346 .map_err(RedDBError::Query)?;
347 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348 self.ensure_config_collection(collection)?;
349 let latest = self.latest_config_version(collection, key)?;
350 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351 let value_type = latest.as_ref().and_then(|version| version.value_type);
352 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353 let id = self.append_config_version(
354 collection,
355 key,
356 Value::Null,
357 version,
358 true,
359 "delete",
360 value_type,
361 schema_version,
362 &[],
363 )?;
364 if let Some(before) = latest.as_ref().and_then(|version| {
365 if version.tombstone {
366 None
367 } else {
368 Some(crate::presentation::entity_json::storage_value_to_json(
369 &version.value,
370 ))
371 }
372 }) {
373 self.record_kv_watch_event(
374 crate::replication::cdc::ChangeOperation::Delete,
375 collection,
376 key,
377 id.raw(),
378 Some(before),
379 None,
380 );
381 }
382 self.prune_config_history(collection, key)?;
383 self.invalidate_result_cache();
384 Ok(config_write_output(
385 raw_query,
386 collection,
387 key,
388 version,
389 id,
390 value_type,
391 schema_version,
392 &[],
393 "delete",
394 1,
395 ))
396 }
397
398 fn config_get_result(
399 &self,
400 raw_query: &str,
401 collection: &str,
402 key: &str,
403 ) -> RedDBResult<RuntimeQueryResult> {
404 self.check_system_config_capability("config:read", collection, key)
405 .map_err(RedDBError::Query)?;
406 let latest = self.latest_config_version(collection, key)?;
407 let mut result = UnifiedResult::with_columns(vec![
408 "collection".into(),
409 "key".into(),
410 "value".into(),
411 "version".into(),
412 "value_type".into(),
413 "schema_version".into(),
414 "tags".into(),
415 "tombstone".into(),
416 ]);
417 let mut record = UnifiedRecord::new();
418 record.set("collection", Value::text(collection.to_string()));
419 record.set("key", Value::text(key.to_string()));
420 if let Some(version) = latest {
421 record.set("value", version.value);
422 record.set("version", Value::Integer(version.version));
423 record.set("value_type", config_value_type_value(version.value_type));
424 record.set(
425 "schema_version",
426 version
427 .schema_version
428 .map(Value::Integer)
429 .unwrap_or(Value::Null),
430 );
431 record.set("tags", config_tags_value(&version.tags));
432 record.set("tombstone", Value::Boolean(version.tombstone));
433 } else {
434 record.set("value", Value::Null);
435 record.set("version", Value::Null);
436 record.set("value_type", Value::Null);
437 record.set("schema_version", Value::Null);
438 record.set("tags", Value::Null);
439 record.set("tombstone", Value::Boolean(false));
440 }
441 result.push(record);
442 Ok(RuntimeQueryResult {
443 query: raw_query.to_string(),
444 mode: crate::storage::query::modes::QueryMode::Sql,
445 statement: "config_get",
446 engine: "config",
447 result,
448 affected_rows: 0,
449 statement_type: "select",
450 })
451 }
452
453 fn config_history_result(
454 &self,
455 raw_query: &str,
456 collection: &str,
457 key: &str,
458 ) -> RedDBResult<RuntimeQueryResult> {
459 self.check_system_config_capability("config:read", collection, key)
460 .map_err(RedDBError::Query)?;
461 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462 let mut result = UnifiedResult::with_columns(vec![
463 "collection".into(),
464 "key".into(),
465 "version".into(),
466 "value".into(),
467 "value_type".into(),
468 "schema_version".into(),
469 "tags".into(),
470 "tombstone".into(),
471 "op".into(),
472 "created_at_ms".into(),
473 ]);
474 for version in versions {
475 let mut record = UnifiedRecord::new();
476 record.set("collection", Value::text(collection.to_string()));
477 record.set("key", Value::text(key.to_string()));
478 record.set("version", Value::Integer(version.version));
479 record.set("value", version.value);
480 record.set("value_type", config_value_type_value(version.value_type));
481 record.set(
482 "schema_version",
483 version
484 .schema_version
485 .map(Value::Integer)
486 .unwrap_or(Value::Null),
487 );
488 record.set("tags", Value::Null);
489 record.set("tombstone", Value::Boolean(version.tombstone));
490 record.set("op", Value::text(version.op));
491 record.set("created_at_ms", Value::Integer(version.created_at_ms));
492 result.push(record);
493 }
494 Ok(RuntimeQueryResult {
495 query: raw_query.to_string(),
496 mode: crate::storage::query::modes::QueryMode::Sql,
497 statement: "config_history",
498 engine: "config",
499 result,
500 affected_rows: 0,
501 statement_type: "select",
502 })
503 }
504
505 fn config_list_result(
506 &self,
507 raw_query: &str,
508 collection: &str,
509 prefix: Option<&str>,
510 limit: Option<usize>,
511 offset: usize,
512 ) -> RedDBResult<RuntimeQueryResult> {
513 let mut versions = self.latest_config_versions(collection, prefix)?;
514 versions.sort_by(|left, right| left.key.cmp(&right.key));
515 let mut result = UnifiedResult::with_columns(vec![
516 "collection".into(),
517 "key".into(),
518 "value".into(),
519 "version".into(),
520 "value_type".into(),
521 "schema_version".into(),
522 "tags".into(),
523 "tombstone".into(),
524 "op".into(),
525 "created_at_ms".into(),
526 ]);
527 for version in versions
528 .into_iter()
529 .filter(|version| {
530 self.check_config_capability("config:read", collection, &version.key)
531 .is_ok()
532 })
533 .skip(offset)
534 .take(limit.unwrap_or(usize::MAX))
535 {
536 let mut record = UnifiedRecord::new();
537 record.set("collection", Value::text(collection.to_string()));
538 record.set("key", Value::text(version.key));
539 record.set("value", version.value);
540 record.set("version", Value::Integer(version.version));
541 record.set("value_type", config_value_type_value(version.value_type));
542 record.set(
543 "schema_version",
544 version
545 .schema_version
546 .map(Value::Integer)
547 .unwrap_or(Value::Null),
548 );
549 record.set("tags", config_tags_value(&version.tags));
550 record.set("tombstone", Value::Boolean(version.tombstone));
551 record.set("op", Value::text(version.op));
552 record.set("created_at_ms", Value::Integer(version.created_at_ms));
553 result.push(record);
554 }
555 Ok(RuntimeQueryResult {
556 query: raw_query.to_string(),
557 mode: crate::storage::query::modes::QueryMode::Sql,
558 statement: "config_list",
559 engine: "config",
560 result,
561 affected_rows: 0,
562 statement_type: "select",
563 })
564 }
565
566 fn config_watch_result(
567 &self,
568 raw_query: &str,
569 collection: &str,
570 key: &str,
571 prefix: bool,
572 from_lsn: Option<u64>,
573 ) -> RedDBResult<RuntimeQueryResult> {
574 let watch_key = if prefix {
575 format!("{key}.*")
576 } else {
577 key.to_string()
578 };
579 let endpoint = match from_lsn {
580 Some(lsn) => {
581 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582 }
583 None => format!("/collections/{collection}/config/{watch_key}/watch"),
584 };
585 let mut result = UnifiedResult::with_columns(vec![
586 "collection".into(),
587 "key".into(),
588 "prefix".into(),
589 "from_lsn".into(),
590 "watch_url".into(),
591 "streaming".into(),
592 ]);
593 let mut record = UnifiedRecord::new();
594 record.set("collection", Value::text(collection.to_string()));
595 record.set("key", Value::text(watch_key));
596 record.set("prefix", Value::Boolean(prefix));
597 record.set(
598 "from_lsn",
599 from_lsn
600 .map(Value::UnsignedInteger)
601 .unwrap_or(crate::storage::schema::Value::Null),
602 );
603 record.set("watch_url", Value::text(endpoint));
604 record.set("streaming", Value::Boolean(true));
605 result.push(record);
606 Ok(RuntimeQueryResult {
607 query: raw_query.to_string(),
608 mode: crate::storage::query::modes::QueryMode::Sql,
609 statement: "config_watch",
610 engine: "config",
611 result,
612 affected_rows: 0,
613 statement_type: "stream",
614 })
615 }
616
617 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618 let store = self.inner.db.store();
619 if store.get_collection(collection).is_none() {
620 store
621 .create_collection(collection)
622 .map_err(|err| RedDBError::Internal(err.to_string()))?;
623 }
624 if let Some(contract) = self.inner.db.collection_contract(collection) {
625 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626 CollectionModel::Config,
627 contract.declared_model,
628 )?;
629 return Ok(());
630 }
631 let now = current_unix_ms();
632 self.inner
633 .db
634 .save_collection_contract(CollectionContract {
635 name: collection.to_string(),
636 declared_model: CollectionModel::Config,
637 schema_mode: SchemaMode::Dynamic,
638 origin: ContractOrigin::Explicit,
639 version: 1,
640 created_at_unix_ms: now as u128,
641 updated_at_unix_ms: now as u128,
642 default_ttl_ms: None,
643 vector_dimension: None,
644 vector_metric: None,
645 context_index_fields: Vec::new(),
646 declared_columns: Vec::new(),
647 table_def: None,
648 timestamps_enabled: false,
649 context_index_enabled: false,
650 metrics_raw_retention_ms: None,
651 metrics_rollup_policies: Vec::new(),
652 metrics_tenant_identity: None,
653 metrics_namespace: None,
654 append_only: false,
655 subscriptions: Vec::new(),
656 session_key: None,
657 session_gap_ms: None,
658 retention_duration_ms: None,
659 })
660 .map(|_| ())
661 .map_err(|err| RedDBError::Internal(err.to_string()))
662 }
663
664 fn append_config_version(
665 &self,
666 collection: &str,
667 key: &str,
668 value: Value,
669 version: i64,
670 tombstone: bool,
671 op: &str,
672 value_type: Option<ConfigValueType>,
673 schema_version: Option<i64>,
674 tags: &[String],
675 ) -> RedDBResult<EntityId> {
676 let now = current_unix_ms() as i64;
677 let fields = vec![
678 ("key".to_string(), Value::text(key.to_string())),
679 ("value".to_string(), value),
680 ("version".to_string(), Value::Integer(version)),
681 (
682 "value_type".to_string(),
683 config_value_type_value(value_type),
684 ),
685 (
686 "schema_version".to_string(),
687 schema_version.map(Value::Integer).unwrap_or(Value::Null),
688 ),
689 ("tombstone".to_string(), Value::Boolean(tombstone)),
690 ("op".to_string(), Value::text(op.to_string())),
691 ("created_at_ms".to_string(), Value::Integer(now)),
692 ("tags".to_string(), config_tags_value(tags)),
693 ];
694 let mut row = RowData::new(Vec::new());
695 row.named = Some(fields.into_iter().collect());
696 let entity = UnifiedEntity::new(
697 EntityId::new(0),
698 EntityKind::TableRow {
699 table: Arc::from(collection),
700 row_id: 0,
701 },
702 EntityData::Row(row),
703 );
704 self.inner
705 .db
706 .store()
707 .insert(collection, entity)
708 .map_err(|err| RedDBError::Internal(err.to_string()))
709 }
710
711 fn latest_config_version(
712 &self,
713 collection: &str,
714 key: &str,
715 ) -> RedDBResult<Option<ConfigVersion>> {
716 Ok(super::keyed_spine::latest_version(
717 self.config_versions(collection, key)?,
718 ))
719 }
720
721 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
722 let store = self.inner.db.store();
723 let Some(manager) = store.get_collection(collection) else {
724 return Ok(Vec::new());
725 };
726 let mut versions = Vec::new();
727 for entity in manager.query_all(|_| true) {
728 let EntityData::Row(row) = &entity.data else {
729 continue;
730 };
731 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
732 continue;
733 };
734 if version.key != key {
735 continue;
736 }
737 versions.push(ConfigVersion::from_keyed_row(version, row));
738 }
739 Ok(versions)
740 }
741
742 fn latest_config_versions(
743 &self,
744 collection: &str,
745 prefix: Option<&str>,
746 ) -> RedDBResult<Vec<ConfigVersion>> {
747 let store = self.inner.db.store();
748 let Some(manager) = store.get_collection(collection) else {
749 return Ok(Vec::new());
750 };
751 let mut versions = Vec::new();
752 for entity in manager.query_all(|_| true) {
753 let EntityData::Row(row) = &entity.data else {
754 continue;
755 };
756 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
757 continue;
758 };
759 versions.push(ConfigVersion::from_keyed_row(version, row));
760 }
761 Ok(super::keyed_spine::latest_versions(versions, prefix))
762 }
763
764 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
765 let mut versions = self.config_versions(collection, key)?;
766 if versions.len() <= CONFIG_HISTORY_LIMIT {
767 return Ok(());
768 }
769 versions = super::keyed_spine::history_versions(versions);
770 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
771 let store = self.inner.db.store();
772 for version in versions.into_iter().take(drop_count) {
773 store
774 .delete(collection, version.id)
775 .map_err(|err| RedDBError::Internal(err.to_string()))?;
776 }
777 Ok(())
778 }
779
780 fn check_config_capability(
781 &self,
782 action: &str,
783 collection: &str,
784 key: &str,
785 ) -> Result<(), String> {
786 let Some(auth_store) = self.inner.auth_store.read().clone() else {
787 return Ok(());
788 };
789 if !auth_store.iam_authorization_enabled() {
790 return Ok(());
791 }
792 let Some((principal, role)) = current_auth_identity() else {
793 return Err(
794 "IAM authorization is enabled; config capability check requires an authenticated principal"
795 .to_string(),
796 );
797 };
798 let tenant = current_tenant();
799 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
800 let mut resource = crate::auth::policies::ResourceRef::new(
801 "config",
802 config_target_resource(collection, key),
803 );
804 if let Some(ref tenant) = tenant {
805 resource = resource.with_tenant(tenant.clone());
806 }
807 let ctx = crate::auth::policies::EvalContext {
808 principal_tenant: tenant.clone(),
809 current_tenant: tenant,
810 peer_ip: None,
811 mfa_present: false,
812 now_ms: crate::utils::now_unix_millis() as u128,
813 principal_is_admin_role: role == crate::auth::Role::Admin,
814 };
815 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
816 Ok(())
817 } else {
818 Err(format!(
819 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
820 principal,
821 action,
822 config_target_resource(collection, key)
823 ))
824 }
825 }
826
827 fn check_system_config_capability(
828 &self,
829 action: &str,
830 collection: &str,
831 key: &str,
832 ) -> Result<(), String> {
833 if collection != "red.config" {
834 return Ok(());
835 }
836 self.check_config_capability(action, collection, key)
837 }
838
839 pub fn config_watch_events_since(
840 &self,
841 collection: &str,
842 key: &str,
843 since_lsn: u64,
844 max_count: usize,
845 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
846 self.kv_watch_events_since(collection, key, since_lsn, max_count)
847 .into_iter()
848 .map(|event| self.policy_filter_config_watch_event(event))
849 .collect()
850 }
851
852 pub fn config_watch_events_since_prefix(
853 &self,
854 collection: &str,
855 prefix: &str,
856 since_lsn: u64,
857 max_count: usize,
858 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
859 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
860 .into_iter()
861 .map(|event| self.policy_filter_config_watch_event(event))
862 .collect()
863 }
864
865 fn policy_filter_config_watch_event(
866 &self,
867 mut event: crate::replication::cdc::KvWatchEvent,
868 ) -> crate::replication::cdc::KvWatchEvent {
869 if self
870 .check_config_capability("config:read", &event.collection, &event.key)
871 .is_err()
872 {
873 event.before = None;
874 event.after = None;
875 }
876 event
877 }
878
879 fn audit_config_resolve(
880 &self,
881 collection: &str,
882 key: &str,
883 secret_ref: Option<&ConfigSecretRef>,
884 outcome: crate::runtime::audit_log::Outcome,
885 reason: &str,
886 ) {
887 let actor = current_auth_identity()
888 .map(|(principal, _)| principal)
889 .unwrap_or_else(|| "anonymous".to_string());
890 let request_id = match current_connection_id() {
891 0 => "embedded".to_string(),
892 id => format!("conn-{id}"),
893 };
894 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
895 .principal(actor.clone())
896 .source(crate::runtime::audit_log::AuditAuthSource::Password)
897 .resource(format!(
898 "config:{}",
899 config_target_resource(collection, key)
900 ))
901 .outcome(outcome)
902 .correlation_id(request_id.clone())
903 .fields([
904 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
905 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
906 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
907 crate::runtime::audit_log::AuditFieldEscaper::field(
908 "target",
909 config_target_resource(collection, key),
910 ),
911 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
912 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
913 crate::runtime::audit_log::AuditFieldEscaper::field(
914 "connection_id",
915 current_connection_id(),
916 ),
917 ]);
918 if let Some(tenant) = current_tenant() {
919 builder = builder.tenant(tenant);
920 }
921 if let Some(secret_ref) = secret_ref {
922 builder = builder.fields([
923 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
924 crate::runtime::audit_log::AuditFieldEscaper::field(
925 "resolved_collection",
926 secret_ref.collection.as_str(),
927 ),
928 crate::runtime::audit_log::AuditFieldEscaper::field(
929 "resolved_key",
930 secret_ref.key.as_str(),
931 ),
932 crate::runtime::audit_log::AuditFieldEscaper::field(
933 "resolved_target",
934 format!("{}.{}", secret_ref.collection, secret_ref.key),
935 ),
936 ]);
937 }
938 self.audit_log().record_event(builder.build());
939 }
940}
941
942fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
943 let Value::Json(bytes) = value else {
944 return Err(RedDBError::InvalidConfig(
945 "CONFIG value is not a SecretRef".to_string(),
946 ));
947 };
948 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
949 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
950 })?;
951 let Some(object) = json.as_object() else {
952 return Err(RedDBError::InvalidConfig(
953 "CONFIG SecretRef must be an object".to_string(),
954 ));
955 };
956 let get_str = |field: &str| -> RedDBResult<&str> {
957 object
958 .get(field)
959 .and_then(|value| value.as_str())
960 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
961 };
962 if get_str("type")? != "secret_ref" {
963 return Err(RedDBError::InvalidConfig(
964 "CONFIG value is not a SecretRef".to_string(),
965 ));
966 }
967 if get_str("store")? != "vault" {
968 return Err(RedDBError::InvalidConfig(
969 "CONFIG SecretRef store is unsupported".to_string(),
970 ));
971 }
972 Ok(ConfigSecretRef {
973 collection: get_str("collection")?.to_string(),
974 key: get_str("key")?.to_string(),
975 })
976}
977
978fn config_target_resource(collection: &str, key: &str) -> String {
979 if collection == "red.config" {
980 format!("red.config/{}", key.to_ascii_lowercase())
981 } else {
982 format!("{collection}.{key}")
983 }
984}
985
986fn config_write_output(
987 raw_query: &str,
988 collection: &str,
989 key: &str,
990 version: i64,
991 id: EntityId,
992 value_type: Option<ConfigValueType>,
993 schema_version: Option<i64>,
994 tags: &[String],
995 statement: &'static str,
996 affected_rows: u64,
997) -> RuntimeQueryResult {
998 let mut result = UnifiedResult::with_columns(vec![
999 "ok".into(),
1000 "collection".into(),
1001 "key".into(),
1002 "version".into(),
1003 "value_type".into(),
1004 "schema_version".into(),
1005 "tags".into(),
1006 "id".into(),
1007 ]);
1008 let mut record = UnifiedRecord::new();
1009 record.set("ok", Value::Boolean(true));
1010 record.set("collection", Value::text(collection.to_string()));
1011 record.set("key", Value::text(key.to_string()));
1012 record.set("version", Value::Integer(version));
1013 record.set("value_type", config_value_type_value(value_type));
1014 record.set(
1015 "schema_version",
1016 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1017 );
1018 record.set("tags", config_tags_value(tags));
1019 record.set("id", Value::Integer(id.raw() as i64));
1020 result.push(record);
1021 RuntimeQueryResult {
1022 query: raw_query.to_string(),
1023 mode: crate::storage::query::modes::QueryMode::Sql,
1024 statement,
1025 engine: "config",
1026 result,
1027 affected_rows,
1028 statement_type: if statement == "delete" {
1029 "delete"
1030 } else {
1031 "update"
1032 },
1033 }
1034}
1035
1036fn invalid_config_volatility(operation: &str) -> RedDBError {
1037 RedDBError::InvalidOperation(format!(
1038 "CONFIG does not support KV-only volatility operation {operation}"
1039 ))
1040}
1041
1042fn resolve_config_schema(
1043 latest: Option<&ConfigVersion>,
1044 requested_type: Option<ConfigValueType>,
1045) -> (Option<ConfigValueType>, Option<i64>) {
1046 let previous_type = latest.and_then(|version| version.value_type);
1047 let previous_schema_version = latest.and_then(|version| version.schema_version);
1048 match requested_type {
1049 Some(value_type) if Some(value_type) != previous_type => (
1050 Some(value_type),
1051 Some(previous_schema_version.unwrap_or(0) + 1),
1052 ),
1053 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1054 None => (previous_type, previous_schema_version),
1055 }
1056}
1057
1058fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1059 let valid = match value_type {
1060 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1061 ConfigValueType::Int => matches!(
1062 value,
1063 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1064 ),
1065 ConfigValueType::String => matches!(value, Value::Text(_)),
1066 ConfigValueType::Url => validate_config_url(value),
1067 ConfigValueType::Object => validate_config_json_shape(value, true),
1068 ConfigValueType::Array => {
1069 matches!(value, Value::Array(_) | Value::Vector(_))
1070 || validate_config_json_shape(value, false)
1071 }
1072 };
1073 if valid {
1074 Ok(())
1075 } else {
1076 Err(RedDBError::InvalidConfig(format!(
1077 "CONFIG value type mismatch: expected {}, got {}",
1078 value_type.as_str(),
1079 config_actual_value_type(value),
1080 )))
1081 }
1082}
1083
1084fn validate_config_url(value: &Value) -> bool {
1085 let url = match value {
1086 Value::Url(value) => value.as_str(),
1087 Value::Text(value) => value.as_ref(),
1088 _ => return false,
1089 };
1090 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1091}
1092
1093fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1094 let Value::Json(bytes) = value else {
1095 return false;
1096 };
1097 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1098 return false;
1099 };
1100 matches!(
1101 (object, json),
1102 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1103 )
1104}
1105
1106fn config_actual_value_type(value: &Value) -> &'static str {
1107 match value {
1108 Value::Null => "null",
1109 Value::Boolean(_) => "bool",
1110 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1111 Value::Text(_) => "string",
1112 Value::Url(_) => "url",
1113 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1114 Ok(crate::json::Value::Object(_)) => "object",
1115 Ok(crate::json::Value::Array(_)) => "array",
1116 _ => "json",
1117 },
1118 Value::Array(_) | Value::Vector(_) => "array",
1119 _ => "other",
1120 }
1121}
1122
1123fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1124 value_type
1125 .map(|value_type| Value::text(value_type.as_str()))
1126 .unwrap_or(Value::Null)
1127}
1128
1129fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1130 match value {
1131 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1132 _ => None,
1133 }
1134}
1135
1136fn config_tags_value(tags: &[String]) -> Value {
1137 if tags.is_empty() {
1138 return Value::Null;
1139 }
1140 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1141}
1142
1143fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1144 match value {
1145 Some(Value::Array(values)) => values
1146 .iter()
1147 .filter_map(|value| match value {
1148 Value::Text(tag) => Some(tag.to_string()),
1149 _ => None,
1150 })
1151 .collect(),
1152 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1153 .ok()
1154 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1155 .map(|values| {
1156 values
1157 .into_iter()
1158 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1159 .collect()
1160 })
1161 .unwrap_or_default(),
1162 _ => Vec::new(),
1163 }
1164}
1165
1166fn current_unix_ms() -> u64 {
1167 std::time::SystemTime::now()
1168 .duration_since(std::time::UNIX_EPOCH)
1169 .unwrap_or_default()
1170 .as_millis() as u64
1171}