1use std::sync::Arc;
4
5use crate::catalog::{CollectionModel, SchemaMode};
6use crate::physical::{CollectionContract, ContractOrigin};
7use crate::storage::query::ast::ConfigValueType;
8use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9
10use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
11use super::*;
12
13const CONFIG_HISTORY_LIMIT: usize = 16;
14
15#[derive(Clone)]
16struct ConfigVersion {
17 id: EntityId,
18 key: String,
19 version: i64,
20 value: Value,
21 tombstone: bool,
22 created_at_ms: i64,
23 op: String,
24 value_type: Option<ConfigValueType>,
25 schema_version: Option<i64>,
26 tags: Vec<String>,
27}
28
29impl super::keyed_spine::KeyedVersion for ConfigVersion {
30 fn key(&self) -> &str {
31 &self.key
32 }
33
34 fn version(&self) -> i64 {
35 self.version
36 }
37}
38
39impl ConfigVersion {
40 fn from_keyed_row(version: super::keyed_spine::KeyedRowVersion, row: &RowData) -> Self {
41 Self {
42 id: version.id,
43 key: version.key,
44 version: version.version,
45 value: version.value,
46 tombstone: version.tombstone,
47 created_at_ms: version.created_at_ms,
48 op: version.op,
49 value_type: row
50 .get_field("value_type")
51 .and_then(config_value_type_from_value),
52 schema_version: super::keyed_spine::value_i64(row.get_field("schema_version")),
53 tags: config_tags_from_value(row.get_field("tags")),
54 }
55 }
56}
57
58struct ConfigSecretRef {
59 collection: String,
60 key: String,
61}
62
63impl RedDBRuntime {
64 pub fn execute_config_command(
65 &self,
66 raw_query: &str,
67 cmd: &crate::storage::query::ast::ConfigCommand,
68 ) -> RedDBResult<RuntimeQueryResult> {
69 use crate::storage::query::ast::ConfigCommand;
70
71 match cmd {
72 ConfigCommand::Put {
73 collection,
74 key,
75 value,
76 value_type,
77 tags,
78 } => self.config_write_result(
79 raw_query,
80 collection,
81 key,
82 value.clone(),
83 *value_type,
84 tags,
85 "put",
86 ),
87 ConfigCommand::Rotate {
88 collection,
89 key,
90 value,
91 value_type,
92 tags,
93 } => self.config_write_result(
94 raw_query,
95 collection,
96 key,
97 value.clone(),
98 *value_type,
99 tags,
100 "rotate",
101 ),
102 ConfigCommand::Get { collection, key } => {
103 self.config_get_result(raw_query, collection, key)
104 }
105 ConfigCommand::Resolve { collection, key } => {
106 self.config_resolve_result(raw_query, collection, key)
107 }
108 ConfigCommand::Delete { collection, key } => {
109 self.config_delete_result(raw_query, collection, key)
110 }
111 ConfigCommand::History { collection, key } => {
112 self.config_history_result(raw_query, collection, key)
113 }
114 ConfigCommand::List {
115 collection,
116 prefix,
117 limit,
118 offset,
119 } => self.config_list_result(raw_query, collection, prefix.as_deref(), *limit, *offset),
120 ConfigCommand::Watch {
121 collection,
122 key,
123 prefix,
124 from_lsn,
125 } => self.config_watch_result(raw_query, collection, key, *prefix, *from_lsn),
126 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
127 Err(invalid_config_volatility(operation))
128 }
129 }
130 }
131
132 pub(crate) fn validate_config_command_before_auth(
133 &self,
134 cmd: &crate::storage::query::ast::ConfigCommand,
135 ) -> RedDBResult<()> {
136 use crate::storage::query::ast::ConfigCommand;
137 match cmd {
138 ConfigCommand::InvalidVolatileOperation { operation, .. } => {
139 Err(invalid_config_volatility(operation))
140 }
141 ConfigCommand::Put { collection, .. }
142 | ConfigCommand::Get { collection, .. }
143 | ConfigCommand::Resolve { collection, .. }
144 | ConfigCommand::Rotate { collection, .. }
145 | ConfigCommand::Delete { collection, .. }
146 | ConfigCommand::History { collection, .. }
147 | ConfigCommand::List { collection, .. }
148 | ConfigCommand::Watch { collection, .. } => {
149 let snapshot = self.inner.db.catalog_model_snapshot();
150 let Some(actual_model) = snapshot
151 .collections
152 .iter()
153 .find(|c| c.name == *collection)
154 .map(|c| c.declared_model.unwrap_or(c.model))
155 else {
156 return Ok(());
157 };
158 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
159 CollectionModel::Config,
160 actual_model,
161 )
162 }
163 }
164 }
165
166 fn config_resolve_result(
167 &self,
168 raw_query: &str,
169 collection: &str,
170 key: &str,
171 ) -> RedDBResult<RuntimeQueryResult> {
172 let latest = self.latest_config_version(collection, key)?;
173 if let Err(reason) = self.check_config_capability("config:read", collection, key) {
174 self.audit_config_resolve(
175 collection,
176 key,
177 None,
178 crate::runtime::audit_log::Outcome::Denied,
179 &reason,
180 );
181 return Err(RedDBError::Query(reason));
182 }
183
184 let Some(version) = latest else {
185 let reason = "not_found";
186 self.audit_config_resolve(
187 collection,
188 key,
189 None,
190 crate::runtime::audit_log::Outcome::Denied,
191 reason,
192 );
193 return Err(RedDBError::NotFound(format!(
194 "config '{}.{}' not found",
195 collection, key
196 )));
197 };
198 if version.tombstone {
199 let reason = "deleted";
200 self.audit_config_resolve(
201 collection,
202 key,
203 None,
204 crate::runtime::audit_log::Outcome::Denied,
205 reason,
206 );
207 return Err(RedDBError::NotFound(format!(
208 "config '{}.{}' is deleted",
209 collection, key
210 )));
211 }
212
213 let secret_ref = parse_config_secret_ref(&version.value).inspect_err(|err| {
214 self.audit_config_resolve(
215 collection,
216 key,
217 None,
218 crate::runtime::audit_log::Outcome::Error,
219 &err.to_string(),
220 );
221 })?;
222
223 match self.resolve_vault_secret_value(&secret_ref.collection, &secret_ref.key) {
224 Ok(value) => {
225 self.audit_config_resolve(
226 collection,
227 key,
228 Some(&secret_ref),
229 crate::runtime::audit_log::Outcome::Success,
230 "ok",
231 );
232 let mut result = UnifiedResult::with_columns(vec![
233 "collection".into(),
234 "key".into(),
235 "value".into(),
236 "resolved_store".into(),
237 "resolved_collection".into(),
238 "resolved_key".into(),
239 ]);
240 let mut record = UnifiedRecord::new();
241 record.set("collection", Value::text(collection.to_string()));
242 record.set("key", Value::text(key.to_string()));
243 record.set("value", value);
244 record.set("resolved_store", Value::text("vault"));
245 record.set("resolved_collection", Value::text(secret_ref.collection));
246 record.set("resolved_key", Value::text(secret_ref.key));
247 result.push(record);
248 Ok(RuntimeQueryResult {
249 query: raw_query.to_string(),
250 mode: crate::storage::query::modes::QueryMode::Sql,
251 statement: "config_resolve",
252 engine: "config",
253 result,
254 affected_rows: 0,
255 statement_type: "select",
256 })
257 }
258 Err(err) => {
259 let reason = err.to_string();
260 let outcome = if reason.contains("denied") {
261 crate::runtime::audit_log::Outcome::Denied
262 } else {
263 crate::runtime::audit_log::Outcome::Error
264 };
265 self.audit_config_resolve(collection, key, Some(&secret_ref), outcome, &reason);
266 Err(err)
267 }
268 }
269 }
270
271 fn config_write_result(
272 &self,
273 raw_query: &str,
274 collection: &str,
275 key: &str,
276 value: Value,
277 requested_type: Option<ConfigValueType>,
278 tags: &[String],
279 op: &str,
280 ) -> RedDBResult<RuntimeQueryResult> {
281 self.check_system_config_capability("config:write", collection, key)
282 .map_err(RedDBError::Query)?;
283 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
284 self.ensure_config_collection(collection)?;
285 let latest = self.latest_config_version(collection, key)?;
286 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
287 let (value_type, schema_version) = resolve_config_schema(latest.as_ref(), requested_type);
288 if let Some(value_type) = value_type {
289 validate_config_value_type(&value, value_type)?;
290 }
291 let before = latest.as_ref().and_then(|version| {
292 if version.tombstone {
293 None
294 } else {
295 Some(crate::presentation::entity_json::storage_value_to_json(
296 &version.value,
297 ))
298 }
299 });
300 let after = Some(crate::presentation::entity_json::storage_value_to_json(
301 &value,
302 ));
303 let change_op = if latest.is_some() {
304 crate::replication::cdc::ChangeOperation::Update
305 } else {
306 crate::replication::cdc::ChangeOperation::Insert
307 };
308 let id = self.append_config_version(
309 collection,
310 key,
311 value,
312 version,
313 false,
314 op,
315 value_type,
316 schema_version,
317 tags,
318 )?;
319 self.record_kv_watch_event(change_op, collection, key, id.raw(), before, after);
320 self.prune_config_history(collection, key)?;
321 self.invalidate_result_cache();
322 Ok(config_write_output(
323 raw_query,
324 collection,
325 key,
326 version,
327 id,
328 value_type,
329 schema_version,
330 tags,
331 match op {
332 "rotate" => "config_rotate",
333 _ => "config_put",
334 },
335 1,
336 ))
337 }
338
339 fn config_delete_result(
340 &self,
341 raw_query: &str,
342 collection: &str,
343 key: &str,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 self.check_system_config_capability("config:write", collection, key)
346 .map_err(RedDBError::Query)?;
347 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
348 self.ensure_config_collection(collection)?;
349 let latest = self.latest_config_version(collection, key)?;
350 let version = latest.as_ref().map(|version| version.version).unwrap_or(0) + 1;
351 let value_type = latest.as_ref().and_then(|version| version.value_type);
352 let schema_version = latest.as_ref().and_then(|version| version.schema_version);
353 let id = self.append_config_version(
354 collection,
355 key,
356 Value::Null,
357 version,
358 true,
359 "delete",
360 value_type,
361 schema_version,
362 &[],
363 )?;
364 if let Some(before) = latest.as_ref().and_then(|version| {
365 if version.tombstone {
366 None
367 } else {
368 Some(crate::presentation::entity_json::storage_value_to_json(
369 &version.value,
370 ))
371 }
372 }) {
373 self.record_kv_watch_event(
374 crate::replication::cdc::ChangeOperation::Delete,
375 collection,
376 key,
377 id.raw(),
378 Some(before),
379 None,
380 );
381 }
382 self.prune_config_history(collection, key)?;
383 self.invalidate_result_cache();
384 Ok(config_write_output(
385 raw_query,
386 collection,
387 key,
388 version,
389 id,
390 value_type,
391 schema_version,
392 &[],
393 "delete",
394 1,
395 ))
396 }
397
398 fn config_get_result(
399 &self,
400 raw_query: &str,
401 collection: &str,
402 key: &str,
403 ) -> RedDBResult<RuntimeQueryResult> {
404 self.check_system_config_capability("config:read", collection, key)
405 .map_err(RedDBError::Query)?;
406 let latest = self.latest_config_version(collection, key)?;
407 let mut result = UnifiedResult::with_columns(vec![
408 "collection".into(),
409 "key".into(),
410 "value".into(),
411 "version".into(),
412 "value_type".into(),
413 "schema_version".into(),
414 "tags".into(),
415 "tombstone".into(),
416 ]);
417 let mut record = UnifiedRecord::new();
418 record.set("collection", Value::text(collection.to_string()));
419 record.set("key", Value::text(key.to_string()));
420 if let Some(version) = latest {
421 record.set("value", version.value);
422 record.set("version", Value::Integer(version.version));
423 record.set("value_type", config_value_type_value(version.value_type));
424 record.set(
425 "schema_version",
426 version
427 .schema_version
428 .map(Value::Integer)
429 .unwrap_or(Value::Null),
430 );
431 record.set("tags", config_tags_value(&version.tags));
432 record.set("tombstone", Value::Boolean(version.tombstone));
433 } else {
434 record.set("value", Value::Null);
435 record.set("version", Value::Null);
436 record.set("value_type", Value::Null);
437 record.set("schema_version", Value::Null);
438 record.set("tags", Value::Null);
439 record.set("tombstone", Value::Boolean(false));
440 }
441 result.push(record);
442 Ok(RuntimeQueryResult {
443 query: raw_query.to_string(),
444 mode: crate::storage::query::modes::QueryMode::Sql,
445 statement: "config_get",
446 engine: "config",
447 result,
448 affected_rows: 0,
449 statement_type: "select",
450 })
451 }
452
453 fn config_history_result(
454 &self,
455 raw_query: &str,
456 collection: &str,
457 key: &str,
458 ) -> RedDBResult<RuntimeQueryResult> {
459 self.check_system_config_capability("config:read", collection, key)
460 .map_err(RedDBError::Query)?;
461 let versions = super::keyed_spine::history_versions(self.config_versions(collection, key)?);
462 let mut result = UnifiedResult::with_columns(vec![
463 "collection".into(),
464 "key".into(),
465 "version".into(),
466 "value".into(),
467 "value_type".into(),
468 "schema_version".into(),
469 "tags".into(),
470 "tombstone".into(),
471 "op".into(),
472 "created_at_ms".into(),
473 ]);
474 for version in versions {
475 let mut record = UnifiedRecord::new();
476 record.set("collection", Value::text(collection.to_string()));
477 record.set("key", Value::text(key.to_string()));
478 record.set("version", Value::Integer(version.version));
479 record.set("value", version.value);
480 record.set("value_type", config_value_type_value(version.value_type));
481 record.set(
482 "schema_version",
483 version
484 .schema_version
485 .map(Value::Integer)
486 .unwrap_or(Value::Null),
487 );
488 record.set("tags", Value::Null);
489 record.set("tombstone", Value::Boolean(version.tombstone));
490 record.set("op", Value::text(version.op));
491 record.set("created_at_ms", Value::Integer(version.created_at_ms));
492 result.push(record);
493 }
494 Ok(RuntimeQueryResult {
495 query: raw_query.to_string(),
496 mode: crate::storage::query::modes::QueryMode::Sql,
497 statement: "config_history",
498 engine: "config",
499 result,
500 affected_rows: 0,
501 statement_type: "select",
502 })
503 }
504
505 fn config_list_result(
506 &self,
507 raw_query: &str,
508 collection: &str,
509 prefix: Option<&str>,
510 limit: Option<usize>,
511 offset: usize,
512 ) -> RedDBResult<RuntimeQueryResult> {
513 let mut versions = self.latest_config_versions(collection, prefix)?;
514 versions.sort_by(|left, right| left.key.cmp(&right.key));
515 let mut result = UnifiedResult::with_columns(vec![
516 "collection".into(),
517 "key".into(),
518 "value".into(),
519 "version".into(),
520 "value_type".into(),
521 "schema_version".into(),
522 "tags".into(),
523 "tombstone".into(),
524 "op".into(),
525 "created_at_ms".into(),
526 ]);
527 for version in versions
528 .into_iter()
529 .filter(|version| {
530 self.check_config_capability("config:read", collection, &version.key)
531 .is_ok()
532 })
533 .skip(offset)
534 .take(limit.unwrap_or(usize::MAX))
535 {
536 let mut record = UnifiedRecord::new();
537 record.set("collection", Value::text(collection.to_string()));
538 record.set("key", Value::text(version.key));
539 record.set("value", version.value);
540 record.set("version", Value::Integer(version.version));
541 record.set("value_type", config_value_type_value(version.value_type));
542 record.set(
543 "schema_version",
544 version
545 .schema_version
546 .map(Value::Integer)
547 .unwrap_or(Value::Null),
548 );
549 record.set("tags", config_tags_value(&version.tags));
550 record.set("tombstone", Value::Boolean(version.tombstone));
551 record.set("op", Value::text(version.op));
552 record.set("created_at_ms", Value::Integer(version.created_at_ms));
553 result.push(record);
554 }
555 Ok(RuntimeQueryResult {
556 query: raw_query.to_string(),
557 mode: crate::storage::query::modes::QueryMode::Sql,
558 statement: "config_list",
559 engine: "config",
560 result,
561 affected_rows: 0,
562 statement_type: "select",
563 })
564 }
565
566 fn config_watch_result(
567 &self,
568 raw_query: &str,
569 collection: &str,
570 key: &str,
571 prefix: bool,
572 from_lsn: Option<u64>,
573 ) -> RedDBResult<RuntimeQueryResult> {
574 let watch_key = if prefix {
575 format!("{key}.*")
576 } else {
577 key.to_string()
578 };
579 let endpoint = match from_lsn {
580 Some(lsn) => {
581 format!("/collections/{collection}/config/{watch_key}/watch?since_lsn={lsn}")
582 }
583 None => format!("/collections/{collection}/config/{watch_key}/watch"),
584 };
585 let mut result = UnifiedResult::with_columns(vec![
586 "collection".into(),
587 "key".into(),
588 "prefix".into(),
589 "from_lsn".into(),
590 "watch_url".into(),
591 "streaming".into(),
592 ]);
593 let mut record = UnifiedRecord::new();
594 record.set("collection", Value::text(collection.to_string()));
595 record.set("key", Value::text(watch_key));
596 record.set("prefix", Value::Boolean(prefix));
597 record.set(
598 "from_lsn",
599 from_lsn
600 .map(Value::UnsignedInteger)
601 .unwrap_or(crate::storage::schema::Value::Null),
602 );
603 record.set("watch_url", Value::text(endpoint));
604 record.set("streaming", Value::Boolean(true));
605 result.push(record);
606 Ok(RuntimeQueryResult {
607 query: raw_query.to_string(),
608 mode: crate::storage::query::modes::QueryMode::Sql,
609 statement: "config_watch",
610 engine: "config",
611 result,
612 affected_rows: 0,
613 statement_type: "stream",
614 })
615 }
616
617 fn ensure_config_collection(&self, collection: &str) -> RedDBResult<()> {
618 let store = self.inner.db.store();
619 if store.get_collection(collection).is_none() {
620 store
621 .create_collection(collection)
622 .map_err(|err| RedDBError::Internal(err.to_string()))?;
623 }
624 if let Some(contract) = self.inner.db.collection_contract(collection) {
625 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
626 CollectionModel::Config,
627 contract.declared_model,
628 )?;
629 return Ok(());
630 }
631 let now = current_unix_ms();
632 self.inner
633 .db
634 .save_collection_contract(CollectionContract {
635 name: collection.to_string(),
636 declared_model: CollectionModel::Config,
637 schema_mode: SchemaMode::Dynamic,
638 origin: ContractOrigin::Explicit,
639 version: 1,
640 created_at_unix_ms: now as u128,
641 updated_at_unix_ms: now as u128,
642 default_ttl_ms: None,
643 vector_dimension: None,
644 vector_metric: None,
645 context_index_fields: Vec::new(),
646 declared_columns: Vec::new(),
647 table_def: None,
648 timestamps_enabled: false,
649 context_index_enabled: false,
650 append_only: false,
651 subscriptions: Vec::new(),
652 })
653 .map(|_| ())
654 .map_err(|err| RedDBError::Internal(err.to_string()))
655 }
656
657 fn append_config_version(
658 &self,
659 collection: &str,
660 key: &str,
661 value: Value,
662 version: i64,
663 tombstone: bool,
664 op: &str,
665 value_type: Option<ConfigValueType>,
666 schema_version: Option<i64>,
667 tags: &[String],
668 ) -> RedDBResult<EntityId> {
669 let now = current_unix_ms() as i64;
670 let fields = vec![
671 ("key".to_string(), Value::text(key.to_string())),
672 ("value".to_string(), value),
673 ("version".to_string(), Value::Integer(version)),
674 (
675 "value_type".to_string(),
676 config_value_type_value(value_type),
677 ),
678 (
679 "schema_version".to_string(),
680 schema_version.map(Value::Integer).unwrap_or(Value::Null),
681 ),
682 ("tombstone".to_string(), Value::Boolean(tombstone)),
683 ("op".to_string(), Value::text(op.to_string())),
684 ("created_at_ms".to_string(), Value::Integer(now)),
685 ("tags".to_string(), config_tags_value(tags)),
686 ];
687 let mut row = RowData::new(Vec::new());
688 row.named = Some(fields.into_iter().collect());
689 let entity = UnifiedEntity::new(
690 EntityId::new(0),
691 EntityKind::TableRow {
692 table: Arc::from(collection),
693 row_id: 0,
694 },
695 EntityData::Row(row),
696 );
697 self.inner
698 .db
699 .store()
700 .insert(collection, entity)
701 .map_err(|err| RedDBError::Internal(err.to_string()))
702 }
703
704 fn latest_config_version(
705 &self,
706 collection: &str,
707 key: &str,
708 ) -> RedDBResult<Option<ConfigVersion>> {
709 Ok(super::keyed_spine::latest_version(
710 self.config_versions(collection, key)?,
711 ))
712 }
713
714 fn config_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<ConfigVersion>> {
715 let store = self.inner.db.store();
716 let Some(manager) = store.get_collection(collection) else {
717 return Ok(Vec::new());
718 };
719 let mut versions = Vec::new();
720 for entity in manager.query_all(|_| true) {
721 let EntityData::Row(row) = &entity.data else {
722 continue;
723 };
724 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
725 continue;
726 };
727 if version.key != key {
728 continue;
729 }
730 versions.push(ConfigVersion::from_keyed_row(version, row));
731 }
732 Ok(versions)
733 }
734
735 fn latest_config_versions(
736 &self,
737 collection: &str,
738 prefix: Option<&str>,
739 ) -> RedDBResult<Vec<ConfigVersion>> {
740 let store = self.inner.db.store();
741 let Some(manager) = store.get_collection(collection) else {
742 return Ok(Vec::new());
743 };
744 let mut versions = Vec::new();
745 for entity in manager.query_all(|_| true) {
746 let EntityData::Row(row) = &entity.data else {
747 continue;
748 };
749 let Some(version) = super::keyed_spine::row_version(entity.id, row, 0) else {
750 continue;
751 };
752 versions.push(ConfigVersion::from_keyed_row(version, row));
753 }
754 Ok(super::keyed_spine::latest_versions(versions, prefix))
755 }
756
757 fn prune_config_history(&self, collection: &str, key: &str) -> RedDBResult<()> {
758 let mut versions = self.config_versions(collection, key)?;
759 if versions.len() <= CONFIG_HISTORY_LIMIT {
760 return Ok(());
761 }
762 versions = super::keyed_spine::history_versions(versions);
763 let drop_count = versions.len() - CONFIG_HISTORY_LIMIT;
764 let store = self.inner.db.store();
765 for version in versions.into_iter().take(drop_count) {
766 store
767 .delete(collection, version.id)
768 .map_err(|err| RedDBError::Internal(err.to_string()))?;
769 }
770 Ok(())
771 }
772
773 fn check_config_capability(
774 &self,
775 action: &str,
776 collection: &str,
777 key: &str,
778 ) -> Result<(), String> {
779 let Some(auth_store) = self.inner.auth_store.read().clone() else {
780 return Ok(());
781 };
782 if !auth_store.iam_authorization_enabled() {
783 return Ok(());
784 }
785 let Some((principal, role)) = current_auth_identity() else {
786 return Err(
787 "IAM authorization is enabled; config capability check requires an authenticated principal"
788 .to_string(),
789 );
790 };
791 let tenant = current_tenant();
792 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
793 let mut resource = crate::auth::policies::ResourceRef::new(
794 "config",
795 config_target_resource(collection, key),
796 );
797 if let Some(ref tenant) = tenant {
798 resource = resource.with_tenant(tenant.clone());
799 }
800 let ctx = crate::auth::policies::EvalContext {
801 principal_tenant: tenant.clone(),
802 current_tenant: tenant,
803 peer_ip: None,
804 mfa_present: false,
805 now_ms: crate::utils::now_unix_millis() as u128,
806 principal_is_admin_role: role == crate::auth::Role::Admin,
807 };
808 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
809 Ok(())
810 } else {
811 Err(format!(
812 "principal=`{}` action=`{}` resource=`config:{}` denied by IAM policy",
813 principal,
814 action,
815 config_target_resource(collection, key)
816 ))
817 }
818 }
819
820 fn check_system_config_capability(
821 &self,
822 action: &str,
823 collection: &str,
824 key: &str,
825 ) -> Result<(), String> {
826 if collection != "red.config" {
827 return Ok(());
828 }
829 self.check_config_capability(action, collection, key)
830 }
831
832 pub fn config_watch_events_since(
833 &self,
834 collection: &str,
835 key: &str,
836 since_lsn: u64,
837 max_count: usize,
838 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
839 self.kv_watch_events_since(collection, key, since_lsn, max_count)
840 .into_iter()
841 .map(|event| self.policy_filter_config_watch_event(event))
842 .collect()
843 }
844
845 pub fn config_watch_events_since_prefix(
846 &self,
847 collection: &str,
848 prefix: &str,
849 since_lsn: u64,
850 max_count: usize,
851 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
852 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
853 .into_iter()
854 .map(|event| self.policy_filter_config_watch_event(event))
855 .collect()
856 }
857
858 fn policy_filter_config_watch_event(
859 &self,
860 mut event: crate::replication::cdc::KvWatchEvent,
861 ) -> crate::replication::cdc::KvWatchEvent {
862 if self
863 .check_config_capability("config:read", &event.collection, &event.key)
864 .is_err()
865 {
866 event.before = None;
867 event.after = None;
868 }
869 event
870 }
871
872 fn audit_config_resolve(
873 &self,
874 collection: &str,
875 key: &str,
876 secret_ref: Option<&ConfigSecretRef>,
877 outcome: crate::runtime::audit_log::Outcome,
878 reason: &str,
879 ) {
880 let actor = current_auth_identity()
881 .map(|(principal, _)| principal)
882 .unwrap_or_else(|| "anonymous".to_string());
883 let request_id = match current_connection_id() {
884 0 => "embedded".to_string(),
885 id => format!("conn-{id}"),
886 };
887 let mut builder = crate::runtime::audit_log::AuditEvent::builder("config/resolve")
888 .principal(actor.clone())
889 .source(crate::runtime::audit_log::AuditAuthSource::Password)
890 .resource(format!(
891 "config:{}",
892 config_target_resource(collection, key)
893 ))
894 .outcome(outcome)
895 .correlation_id(request_id.clone())
896 .fields([
897 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
898 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
899 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
900 crate::runtime::audit_log::AuditFieldEscaper::field(
901 "target",
902 config_target_resource(collection, key),
903 ),
904 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
905 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
906 crate::runtime::audit_log::AuditFieldEscaper::field(
907 "connection_id",
908 current_connection_id(),
909 ),
910 ]);
911 if let Some(tenant) = current_tenant() {
912 builder = builder.tenant(tenant);
913 }
914 if let Some(secret_ref) = secret_ref {
915 builder = builder.fields([
916 crate::runtime::audit_log::AuditFieldEscaper::field("resolved_store", "vault"),
917 crate::runtime::audit_log::AuditFieldEscaper::field(
918 "resolved_collection",
919 secret_ref.collection.as_str(),
920 ),
921 crate::runtime::audit_log::AuditFieldEscaper::field(
922 "resolved_key",
923 secret_ref.key.as_str(),
924 ),
925 crate::runtime::audit_log::AuditFieldEscaper::field(
926 "resolved_target",
927 format!("{}.{}", secret_ref.collection, secret_ref.key),
928 ),
929 ]);
930 }
931 self.audit_log().record_event(builder.build());
932 }
933}
934
935fn parse_config_secret_ref(value: &Value) -> RedDBResult<ConfigSecretRef> {
936 let Value::Json(bytes) = value else {
937 return Err(RedDBError::InvalidConfig(
938 "CONFIG value is not a SecretRef".to_string(),
939 ));
940 };
941 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
942 RedDBError::InvalidConfig(format!("CONFIG SecretRef is malformed: {err}"))
943 })?;
944 let Some(object) = json.as_object() else {
945 return Err(RedDBError::InvalidConfig(
946 "CONFIG SecretRef must be an object".to_string(),
947 ));
948 };
949 let get_str = |field: &str| -> RedDBResult<&str> {
950 object
951 .get(field)
952 .and_then(|value| value.as_str())
953 .ok_or_else(|| RedDBError::InvalidConfig(format!("CONFIG SecretRef missing {field}")))
954 };
955 if get_str("type")? != "secret_ref" {
956 return Err(RedDBError::InvalidConfig(
957 "CONFIG value is not a SecretRef".to_string(),
958 ));
959 }
960 if get_str("store")? != "vault" {
961 return Err(RedDBError::InvalidConfig(
962 "CONFIG SecretRef store is unsupported".to_string(),
963 ));
964 }
965 Ok(ConfigSecretRef {
966 collection: get_str("collection")?.to_string(),
967 key: get_str("key")?.to_string(),
968 })
969}
970
971fn config_target_resource(collection: &str, key: &str) -> String {
972 if collection == "red.config" {
973 format!("red.config/{}", key.to_ascii_lowercase())
974 } else {
975 format!("{collection}.{key}")
976 }
977}
978
979fn config_write_output(
980 raw_query: &str,
981 collection: &str,
982 key: &str,
983 version: i64,
984 id: EntityId,
985 value_type: Option<ConfigValueType>,
986 schema_version: Option<i64>,
987 tags: &[String],
988 statement: &'static str,
989 affected_rows: u64,
990) -> RuntimeQueryResult {
991 let mut result = UnifiedResult::with_columns(vec![
992 "ok".into(),
993 "collection".into(),
994 "key".into(),
995 "version".into(),
996 "value_type".into(),
997 "schema_version".into(),
998 "tags".into(),
999 "id".into(),
1000 ]);
1001 let mut record = UnifiedRecord::new();
1002 record.set("ok", Value::Boolean(true));
1003 record.set("collection", Value::text(collection.to_string()));
1004 record.set("key", Value::text(key.to_string()));
1005 record.set("version", Value::Integer(version));
1006 record.set("value_type", config_value_type_value(value_type));
1007 record.set(
1008 "schema_version",
1009 schema_version.map(Value::Integer).unwrap_or(Value::Null),
1010 );
1011 record.set("tags", config_tags_value(tags));
1012 record.set("id", Value::Integer(id.raw() as i64));
1013 result.push(record);
1014 RuntimeQueryResult {
1015 query: raw_query.to_string(),
1016 mode: crate::storage::query::modes::QueryMode::Sql,
1017 statement,
1018 engine: "config",
1019 result,
1020 affected_rows,
1021 statement_type: if statement == "delete" {
1022 "delete"
1023 } else {
1024 "update"
1025 },
1026 }
1027}
1028
1029fn invalid_config_volatility(operation: &str) -> RedDBError {
1030 RedDBError::InvalidOperation(format!(
1031 "CONFIG does not support KV-only volatility operation {operation}"
1032 ))
1033}
1034
1035fn resolve_config_schema(
1036 latest: Option<&ConfigVersion>,
1037 requested_type: Option<ConfigValueType>,
1038) -> (Option<ConfigValueType>, Option<i64>) {
1039 let previous_type = latest.and_then(|version| version.value_type);
1040 let previous_schema_version = latest.and_then(|version| version.schema_version);
1041 match requested_type {
1042 Some(value_type) if Some(value_type) != previous_type => (
1043 Some(value_type),
1044 Some(previous_schema_version.unwrap_or(0) + 1),
1045 ),
1046 Some(value_type) => (Some(value_type), previous_schema_version.or(Some(1))),
1047 None => (previous_type, previous_schema_version),
1048 }
1049}
1050
1051fn validate_config_value_type(value: &Value, value_type: ConfigValueType) -> RedDBResult<()> {
1052 let valid = match value_type {
1053 ConfigValueType::Bool => matches!(value, Value::Boolean(_)),
1054 ConfigValueType::Int => matches!(
1055 value,
1056 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_)
1057 ),
1058 ConfigValueType::String => matches!(value, Value::Text(_)),
1059 ConfigValueType::Url => validate_config_url(value),
1060 ConfigValueType::Object => validate_config_json_shape(value, true),
1061 ConfigValueType::Array => {
1062 matches!(value, Value::Array(_) | Value::Vector(_))
1063 || validate_config_json_shape(value, false)
1064 }
1065 };
1066 if valid {
1067 Ok(())
1068 } else {
1069 Err(RedDBError::InvalidConfig(format!(
1070 "CONFIG value type mismatch: expected {}, got {}",
1071 value_type.as_str(),
1072 config_actual_value_type(value),
1073 )))
1074 }
1075}
1076
1077fn validate_config_url(value: &Value) -> bool {
1078 let url = match value {
1079 Value::Url(value) => value.as_str(),
1080 Value::Text(value) => value.as_ref(),
1081 _ => return false,
1082 };
1083 url.starts_with("http://") || url.starts_with("https://") || url.starts_with("ftp://")
1084}
1085
1086fn validate_config_json_shape(value: &Value, object: bool) -> bool {
1087 let Value::Json(bytes) = value else {
1088 return false;
1089 };
1090 let Ok(json) = crate::json::from_slice::<crate::json::Value>(bytes) else {
1091 return false;
1092 };
1093 matches!(
1094 (object, json),
1095 (true, crate::json::Value::Object(_)) | (false, crate::json::Value::Array(_))
1096 )
1097}
1098
1099fn config_actual_value_type(value: &Value) -> &'static str {
1100 match value {
1101 Value::Null => "null",
1102 Value::Boolean(_) => "bool",
1103 Value::Integer(_) | Value::UnsignedInteger(_) | Value::BigInt(_) => "int",
1104 Value::Text(_) => "string",
1105 Value::Url(_) => "url",
1106 Value::Json(bytes) => match crate::json::from_slice::<crate::json::Value>(bytes) {
1107 Ok(crate::json::Value::Object(_)) => "object",
1108 Ok(crate::json::Value::Array(_)) => "array",
1109 _ => "json",
1110 },
1111 Value::Array(_) | Value::Vector(_) => "array",
1112 _ => "other",
1113 }
1114}
1115
1116fn config_value_type_value(value_type: Option<ConfigValueType>) -> Value {
1117 value_type
1118 .map(|value_type| Value::text(value_type.as_str()))
1119 .unwrap_or(Value::Null)
1120}
1121
1122fn config_value_type_from_value(value: &Value) -> Option<ConfigValueType> {
1123 match value {
1124 Value::Text(value) => ConfigValueType::parse(value.as_ref()),
1125 _ => None,
1126 }
1127}
1128
1129fn config_tags_value(tags: &[String]) -> Value {
1130 if tags.is_empty() {
1131 return Value::Null;
1132 }
1133 Value::Array(tags.iter().map(|tag| Value::text(tag.clone())).collect())
1134}
1135
1136fn config_tags_from_value(value: Option<&Value>) -> Vec<String> {
1137 match value {
1138 Some(Value::Array(values)) => values
1139 .iter()
1140 .filter_map(|value| match value {
1141 Value::Text(tag) => Some(tag.to_string()),
1142 _ => None,
1143 })
1144 .collect(),
1145 Some(Value::Json(bytes)) => crate::json::from_slice::<crate::json::Value>(bytes)
1146 .ok()
1147 .and_then(|value| value.as_array().map(|values| values.to_vec()))
1148 .map(|values| {
1149 values
1150 .into_iter()
1151 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
1152 .collect()
1153 })
1154 .unwrap_or_default(),
1155 _ => Vec::new(),
1156 }
1157}
1158
1159fn current_unix_ms() -> u64 {
1160 std::time::SystemTime::now()
1161 .duration_since(std::time::UNIX_EPOCH)
1162 .unwrap_or_default()
1163 .as_millis() as u64
1164}