1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 let versioned = self.is_versioned_collection(model, collection);
229 let prior_versioned_entity = if versioned && was_present {
234 self.get_entity(model, collection, key)?
235 } else {
236 None
237 };
238 if was_present && !versioned {
239 self.delete(model, collection, key)?;
240 }
241
242 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
243 metadata.extend(ttl_metadata.fields);
244 }
245 if let Some(tags_metadata) = kv_tags_metadata(tags) {
246 metadata.push(tags_metadata);
247 }
248
249 let output = self
250 .runtime
251 .create_kv(crate::application::entity::CreateKvInput {
252 collection: collection.to_string(),
253 key: key.to_string(),
254 value,
255 metadata,
256 })?;
257
258 if versioned {
269 let version_xid = match self.runtime.current_xid() {
270 Some(xid) => xid,
271 None => {
272 let mgr = self.runtime.snapshot_manager();
273 let xid = mgr.begin();
274 mgr.commit(xid);
275 xid
276 }
277 };
278 if let Some(new_entity) = output.entity.clone() {
279 self.restamp_xmin(collection, new_entity, version_xid)?;
280 }
281 if let Some(prior) = prior_versioned_entity {
282 let previous_xmax = prior.xmax;
292 let old_id = prior.id;
293 self.tombstone_version(collection, prior, version_xid)?;
294 if self.runtime.current_xid().is_some() {
295 self.runtime.record_pending_versioned_update(
296 crate::runtime::impl_core::current_connection_id(),
297 collection,
298 old_id,
299 output.id,
300 version_xid,
301 previous_xmax,
302 );
303 }
304 }
305 }
306 if model == crate::catalog::CollectionModel::Kv {
307 self.runtime
308 .inner
309 .kv_tag_index
310 .replace(collection, key, output.id, tags);
311 }
312
313 if model == crate::catalog::CollectionModel::Kv {
314 self.runtime
315 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
316 }
317
318 if model == crate::catalog::CollectionModel::Kv {
319 self.runtime.inner.kv_stats.incr_puts();
320 }
321 Ok((!was_present, output.id))
322 }
323
324 pub fn get(
326 &self,
327 model: crate::catalog::CollectionModel,
328 collection: &str,
329 key: &str,
330 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
331 let result = self.get_entry(model, collection, key)?;
332 if model == crate::catalog::CollectionModel::Kv {
333 self.runtime.inner.kv_stats.incr_gets();
334 }
335 Ok(result.map(|(v, _)| v))
336 }
337
338 pub fn delete(
340 &self,
341 model: crate::catalog::CollectionModel,
342 collection: &str,
343 key: &str,
344 ) -> RedDBResult<bool> {
345 self.ensure_declared_model(model, collection)?;
346
347 if self.is_versioned_collection(model, collection) {
351 let Some(entity) = self.get_entity(model, collection, key)? else {
352 return Ok(false);
353 };
354 let id = entity.id;
355 let value = kv_value_from_entity(&entity);
356 let xid = self.runtime.current_xid().unwrap_or_else(|| {
357 let mgr = self.runtime.snapshot_manager();
358 let xid = mgr.begin();
359 mgr.commit(xid);
360 xid
361 });
362 let previous_xmax = entity.xmax;
371 self.tombstone_version(collection, entity, xid)?;
372 if self.runtime.current_xid().is_some() {
373 self.runtime.record_pending_tombstone(
374 crate::runtime::impl_core::current_connection_id(),
375 collection,
376 id,
377 xid,
378 previous_xmax,
379 );
380 }
381 self.runtime.inner.kv_tag_index.remove(collection, key);
382 self.runtime.record_kv_watch_event(
383 crate::replication::cdc::ChangeOperation::Delete,
384 collection,
385 key,
386 id.raw(),
387 value
388 .as_ref()
389 .map(crate::presentation::entity_json::storage_value_to_json),
390 None,
391 );
392 self.runtime.inner.kv_stats.incr_deletes();
393 return Ok(true);
394 }
395
396 let found = self.get_entry(model, collection, key)?;
397 if let Some((value, id)) = found {
398 let store = self.runtime.inner.db.store();
399 let deleted = store
400 .delete(collection, id)
401 .map_err(|err| RedDBError::Internal(err.to_string()))?;
402 if deleted {
403 store.context_index().remove_entity(id);
404 if model == crate::catalog::CollectionModel::Kv {
405 self.runtime.inner.kv_tag_index.remove(collection, key);
406 self.runtime.record_kv_watch_event(
407 crate::replication::cdc::ChangeOperation::Delete,
408 collection,
409 key,
410 id.raw(),
411 Some(crate::presentation::entity_json::storage_value_to_json(
412 &value,
413 )),
414 None,
415 );
416 self.runtime.inner.kv_stats.incr_deletes();
417 }
418 }
419 Ok(deleted)
420 } else {
421 Ok(false)
422 }
423 }
424
425 fn tombstone_version(
431 &self,
432 collection: &str,
433 mut entity: crate::storage::UnifiedEntity,
434 xid: crate::storage::transaction::snapshot::Xid,
435 ) -> RedDBResult<()> {
436 let store = self.runtime.inner.db.store();
437 let Some(manager) = store.get_collection(collection) else {
438 return Ok(());
439 };
440 let id = entity.id;
441 entity.set_xmax(xid);
442 manager
443 .update(entity.clone())
444 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
445 store
446 .persist_entities_to_pager(collection, std::slice::from_ref(&entity))
447 .map_err(|err| RedDBError::Internal(err.to_string()))?;
448 store.context_index().remove_entity(id);
451 Ok(())
452 }
453
454 fn restamp_xmin(
460 &self,
461 collection: &str,
462 mut entity: crate::storage::UnifiedEntity,
463 xid: crate::storage::transaction::snapshot::Xid,
464 ) -> RedDBResult<()> {
465 let store = self.runtime.inner.db.store();
466 let Some(manager) = store.get_collection(collection) else {
467 return Ok(());
468 };
469 entity.set_xmin(xid);
470 manager
471 .update(entity.clone())
472 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
473 store
474 .persist_entities_to_pager(collection, std::slice::from_ref(&entity))
475 .map_err(|err| RedDBError::Internal(err.to_string()))?;
476 Ok(())
477 }
478
479 pub fn incr(
484 &self,
485 model: crate::catalog::CollectionModel,
486 collection: &str,
487 key: &str,
488 by: i64,
489 ttl_ms: Option<u64>,
490 ) -> RedDBResult<i64> {
491 if model == crate::catalog::CollectionModel::Vault {
492 return Err(RedDBError::InvalidOperation(
493 "VAULT INCR is not supported for sealed secrets".to_string(),
494 ));
495 }
496 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
497 let _rmw_guard = rmw_lock.lock();
498 self.ensure_kv_collection(collection)?;
499 let existing = self.runtime.get_kv(collection, key)?;
500 let current: i64 = match existing.as_ref() {
501 None => 0,
502 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
503 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
504 Some((other, _)) => {
505 return Err(RedDBError::Internal(format!(
506 "INCR on non-integer value: {:?}",
507 other
508 )));
509 }
510 };
511
512 let next = current
513 .checked_add(by)
514 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
515
516 if existing.is_some() {
518 self.runtime.delete_kv(collection, key)?;
519 }
520
521 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
522 .map(|m| m.fields.into_iter().collect())
523 .unwrap_or_default();
524
525 let output = self
526 .runtime
527 .create_kv(crate::application::entity::CreateKvInput {
528 collection: collection.to_string(),
529 key: key.to_string(),
530 value: crate::storage::schema::Value::Integer(next),
531 metadata: meta_vec,
532 })?;
533 self.runtime
534 .inner
535 .kv_tag_index
536 .replace(collection, key, output.id, &[]);
537
538 self.runtime.record_kv_watch_event(
539 if existing.is_some() {
540 crate::replication::cdc::ChangeOperation::Update
541 } else {
542 crate::replication::cdc::ChangeOperation::Insert
543 },
544 collection,
545 key,
546 output.id.raw(),
547 existing
548 .as_ref()
549 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
550 Some(crate::presentation::entity_json::storage_value_to_json(
551 &crate::storage::schema::Value::Integer(next),
552 )),
553 );
554
555 self.runtime.inner.kv_stats.incr_incrs();
556 Ok(next)
557 }
558
559 pub fn cas(
567 &self,
568 model: crate::catalog::CollectionModel,
569 collection: &str,
570 key: &str,
571 expected: Option<&crate::storage::schema::Value>,
572 new_value: crate::storage::schema::Value,
573 ttl_ms: Option<u64>,
574 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
575 if model == crate::catalog::CollectionModel::Vault {
576 return Err(RedDBError::InvalidOperation(
577 "VAULT CAS is not supported for sealed secrets".to_string(),
578 ));
579 }
580 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
581 let _rmw_guard = rmw_lock.lock();
582 self.ensure_kv_collection(collection)?;
583 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
584
585 let matches = match (¤t, expected) {
586 (None, None) => true,
587 (Some(cur), Some(exp)) => cur == exp,
588 _ => false,
589 };
590
591 if !matches {
592 self.runtime.inner.kv_stats.incr_cas_conflict();
593 return Ok((false, current));
594 }
595
596 if current.is_some() {
598 self.runtime.delete_kv(collection, key)?;
599 }
600
601 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
602 .map(|m| m.fields.into_iter().collect())
603 .unwrap_or_default();
604
605 let output = self
606 .runtime
607 .create_kv(crate::application::entity::CreateKvInput {
608 collection: collection.to_string(),
609 key: key.to_string(),
610 value: new_value.clone(),
611 metadata: meta_vec,
612 })?;
613 self.runtime
614 .inner
615 .kv_tag_index
616 .replace(collection, key, output.id, &[]);
617
618 self.runtime.record_kv_watch_event(
619 if current.is_some() {
620 crate::replication::cdc::ChangeOperation::Update
621 } else {
622 crate::replication::cdc::ChangeOperation::Insert
623 },
624 collection,
625 key,
626 output.id.raw(),
627 current
628 .as_ref()
629 .map(crate::presentation::entity_json::storage_value_to_json),
630 Some(crate::presentation::entity_json::storage_value_to_json(
631 &new_value,
632 )),
633 );
634
635 self.runtime.inner.kv_stats.incr_cas_success();
636 Ok((true, current))
637 }
638
639 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
640 self.runtime
641 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
642 self.runtime.check_kv_invalidate_policy(collection)?;
643 self.ensure_kv_collection(collection)?;
644 let entries = self
645 .runtime
646 .inner
647 .kv_tag_index
648 .entries_for_tags(collection, tags);
649 if entries.is_empty() {
650 return Ok(0);
651 }
652
653 let store = self.runtime.inner.db.store();
654 let mut removed = 0usize;
655 for (key, id) in entries {
656 let before = store
657 .get(collection, id)
658 .and_then(|entity| kv_value_from_entity(&entity));
659 let deleted = store
660 .delete(collection, id)
661 .map_err(|err| RedDBError::Internal(err.to_string()))?;
662 if deleted {
663 store.context_index().remove_entity(id);
664 self.runtime.inner.kv_tag_index.remove(collection, &key);
665 self.runtime.record_kv_watch_event(
666 crate::replication::cdc::ChangeOperation::Delete,
667 collection,
668 &key,
669 id.raw(),
670 before
671 .as_ref()
672 .map(crate::presentation::entity_json::storage_value_to_json),
673 None,
674 );
675 removed += 1;
676 }
677 }
678 if removed > 0 {
679 self.runtime.inner.kv_stats.incr_deletes();
680 }
681 Ok(removed)
682 }
683
684 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
685 self.runtime
686 .inner
687 .kv_tag_index
688 .tags_for_key(collection, key)
689 }
690
691 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
693 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
694 }
695
696 fn ensure_keyed_collection(
697 &self,
698 model: crate::catalog::CollectionModel,
699 collection: &str,
700 ) -> RedDBResult<()> {
701 let store = self.runtime.inner.db.store();
702 if store.get_collection(collection).is_some() {
703 return self.ensure_declared_model(model, collection);
704 }
705 if model != crate::catalog::CollectionModel::Kv {
706 return Err(RedDBError::NotFound(format!(
707 "{} collection '{collection}' does not exist",
708 keyed_model_name(model)
709 )));
710 }
711 let auto_create = self
713 .runtime
714 .config_bool("red.config.kv.default_collection", true);
715 if !auto_create {
716 return Err(RedDBError::NotFound(format!(
717 "kv collection '{collection}' does not exist and auto-create is disabled \
718 (red.config.kv.default_collection = false)"
719 )));
720 }
721 store
722 .create_collection(collection)
723 .map_err(|err| RedDBError::Internal(err.to_string()))?;
724 self.runtime
725 .inner
726 .db
727 .save_collection_contract(kv_collection_contract(collection))
728 .map_err(|err| RedDBError::Internal(err.to_string()))?;
729 Ok(())
730 }
731
732 fn get_entry(
733 &self,
734 model: crate::catalog::CollectionModel,
735 collection: &str,
736 key: &str,
737 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
738 let Some(entity) = self.get_entity(model, collection, key)? else {
739 return Ok(None);
740 };
741 Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
742 }
743
744 fn is_versioned_collection(
751 &self,
752 model: crate::catalog::CollectionModel,
753 collection: &str,
754 ) -> bool {
755 if model != crate::catalog::CollectionModel::Kv {
758 return false;
759 }
760 self.runtime.vcs_is_versioned(collection).unwrap_or(false)
761 }
762
763 fn get_entity(
764 &self,
765 model: crate::catalog::CollectionModel,
766 collection: &str,
767 key: &str,
768 ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
769 self.ensure_declared_model(model, collection)?;
770 let store = self.runtime.inner.db.store();
771 let Some(manager) = store.get_collection(collection) else {
772 return Ok(None);
773 };
774
775 if self.is_versioned_collection(model, collection) {
780 let resolver =
781 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
782 let mut best: Option<crate::storage::UnifiedEntity> = None;
783 for entity in manager.query_all(|_| true) {
784 if !kv_entity_has_key(&entity, key) {
785 continue;
786 }
787 if resolver.resolve_read_candidate(&entity).is_none() {
788 continue;
789 }
790 let better = match &best {
791 Some(current) => entity.xmin >= current.xmin,
792 None => true,
793 };
794 if better {
795 best = Some(entity);
796 }
797 }
798 return Ok(best);
799 }
800
801 let entities = manager.query_all(|_| true);
802 for entity in entities {
803 if kv_entity_has_key(&entity, key) {
804 return Ok(Some(entity));
805 }
806 }
807 Ok(None)
808 }
809
810 fn latest_kv_entries(
811 &self,
812 collection: &str,
813 prefix: Option<&str>,
814 ) -> RedDBResult<Vec<(String, crate::storage::UnifiedEntity)>> {
815 self.ensure_declared_model(crate::catalog::CollectionModel::Kv, collection)?;
816 let store = self.runtime.inner.db.store();
817 let Some(manager) = store.get_collection(collection) else {
818 return Ok(Vec::new());
819 };
820 let versioned =
825 self.is_versioned_collection(crate::catalog::CollectionModel::Kv, collection);
826 let resolver = versioned.then(|| {
827 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
828 });
829
830 let mut entries: std::collections::BTreeMap<String, crate::storage::UnifiedEntity> =
831 std::collections::BTreeMap::new();
832 for entity in manager.query_all(|_| true) {
833 let key = match &entity.data {
834 crate::storage::EntityData::Row(row) => row
835 .named
836 .as_ref()
837 .and_then(|named| named.get("key"))
838 .and_then(|value| match value {
839 crate::storage::schema::Value::Text(key) => Some(key.to_string()),
840 _ => None,
841 }),
842 _ => None,
843 };
844 let Some(key) = key else {
845 continue;
846 };
847 if prefix.is_some_and(|prefix| !key.starts_with(prefix)) {
848 continue;
849 }
850 if let Some(resolver) = resolver.as_ref() {
851 if resolver.resolve_read_candidate(&entity).is_none() {
854 continue;
855 }
856 }
857 let should_replace = match entries.get(&key) {
858 Some(existing) if versioned => entity.xmin >= existing.xmin,
859 Some(existing) => entity.id.raw() >= existing.id.raw(),
860 None => true,
861 };
862 if should_replace {
863 entries.insert(key, entity);
864 }
865 }
866 Ok(entries.into_iter().collect())
867 }
868
869 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
870 self.vault_versions(collection, key)
871 .map(super::keyed_spine::latest_version)
872 }
873
874 fn get_vault_entry_version(
875 &self,
876 collection: &str,
877 key: &str,
878 version: i64,
879 ) -> RedDBResult<Option<VaultEntry>> {
880 Ok(self
881 .vault_versions(collection, key)?
882 .into_iter()
883 .find(|entry| entry.version == version))
884 }
885
886 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
887 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
888 let store = self.runtime.inner.db.store();
889 let Some(manager) = store.get_collection(collection) else {
890 return Ok(Vec::new());
891 };
892 let entities = manager.query_all(|_| true);
893 let mut versions = Vec::new();
894 for entity in entities {
895 let crate::storage::EntityData::Row(ref row) = entity.data else {
896 continue;
897 };
898 let Some(version) =
899 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
900 else {
901 continue;
902 };
903 if version.key != key {
904 continue;
905 }
906 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
907 versions.push(VaultEntry::from_keyed_row(
908 version,
909 metadata,
910 entity.created_at,
911 entity.updated_at,
912 entity.sequence_id,
913 ));
914 }
915 Ok(versions)
916 }
917
918 fn latest_vault_entries(
919 &self,
920 collection: &str,
921 prefix: Option<&str>,
922 ) -> RedDBResult<Vec<VaultEntry>> {
923 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
924 let store = self.runtime.inner.db.store();
925 let Some(manager) = store.get_collection(collection) else {
926 return Ok(Vec::new());
927 };
928 let mut versions = Vec::new();
929 for entity in manager.query_all(|_| true) {
930 let crate::storage::EntityData::Row(ref row) = entity.data else {
931 continue;
932 };
933 let Some(version) =
934 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
935 else {
936 continue;
937 };
938 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
939 let entry = VaultEntry::from_keyed_row(
940 version,
941 metadata,
942 entity.created_at,
943 entity.updated_at,
944 entity.sequence_id,
945 );
946 versions.push(entry);
947 }
948 Ok(super::keyed_spine::latest_versions(versions, prefix))
949 }
950
951 fn append_vault_version(
952 &self,
953 collection: &str,
954 key: &str,
955 value: crate::storage::schema::Value,
956 op: &str,
957 tombstone: bool,
958 tags: &[String],
959 ) -> RedDBResult<VaultEntry> {
960 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
961 let version = self
962 .get_vault_entry(collection, key)?
963 .map(|entry| entry.version)
964 .unwrap_or(0)
965 + 1;
966 let stored_value = if tombstone {
967 crate::storage::schema::Value::Null
968 } else {
969 self.runtime.seal_vault_value(collection, value)?
970 };
971 let now = current_unix_ms() as i64;
972 let fields = vec![
973 (
974 "key".to_string(),
975 crate::storage::schema::Value::text(key.to_string()),
976 ),
977 ("value".to_string(), stored_value),
978 (
979 "version".to_string(),
980 crate::storage::schema::Value::Integer(version),
981 ),
982 (
983 "tombstone".to_string(),
984 crate::storage::schema::Value::Boolean(tombstone),
985 ),
986 (
987 "op".to_string(),
988 crate::storage::schema::Value::text(op.to_string()),
989 ),
990 (
991 "created_at_ms".to_string(),
992 crate::storage::schema::Value::Integer(now),
993 ),
994 ];
995 let mut row = crate::storage::RowData::new(Vec::new());
996 row.named = Some(fields.into_iter().collect());
997 let entity = crate::storage::UnifiedEntity::new(
998 crate::storage::EntityId::new(0),
999 crate::storage::EntityKind::TableRow {
1000 table: std::sync::Arc::from(collection),
1001 row_id: 0,
1002 },
1003 crate::storage::EntityData::Row(row),
1004 );
1005 let id = self
1006 .runtime
1007 .inner
1008 .db
1009 .store()
1010 .insert(collection, entity)
1011 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1012 if !tags.is_empty() {
1013 self.runtime
1014 .inner
1015 .db
1016 .store()
1017 .set_metadata(
1018 collection,
1019 id,
1020 Metadata::with_fields(vault_tags_metadata(tags)),
1021 )
1022 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1023 self.runtime
1024 .inner
1025 .kv_tag_index
1026 .replace(collection, key, id, tags);
1027 }
1028 self.get_vault_entry_version(collection, key, version)?
1029 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
1030 }
1031
1032 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
1033 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
1034 let versions = self.vault_versions(collection, key)?;
1035 let store = self.runtime.inner.db.store();
1036 let mut purged = 0usize;
1037 for entry in versions {
1038 if store
1039 .delete(collection, entry.id)
1040 .map_err(|err| RedDBError::Internal(err.to_string()))?
1041 {
1042 store.context_index().remove_entity(entry.id);
1043 purged += 1;
1044 }
1045 }
1046 Ok(purged)
1047 }
1048
1049 fn ensure_declared_model(
1050 &self,
1051 model: crate::catalog::CollectionModel,
1052 collection: &str,
1053 ) -> RedDBResult<()> {
1054 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
1055 return Ok(());
1056 };
1057 if contract.declared_model == model
1058 || contract.declared_model == crate::catalog::CollectionModel::Mixed
1059 {
1060 return Ok(());
1061 }
1062 Err(RedDBError::InvalidOperation(format!(
1063 "collection '{}' is declared as '{}' and does not allow '{}' operations",
1064 collection,
1065 keyed_model_name(contract.declared_model),
1066 keyed_model_name(model)
1067 )))
1068 }
1069}
1070
1071impl RedDBRuntime {
1072 pub(crate) fn seal_vault_value(
1073 &self,
1074 collection: &str,
1075 value: crate::storage::schema::Value,
1076 ) -> RedDBResult<crate::storage::schema::Value> {
1077 let key = self.vault_encryption_key(collection)?;
1078 let plaintext = value.to_bytes();
1079 let nonce_bytes = crate::auth::store::random_bytes(12);
1080 let mut nonce = [0u8; 12];
1081 nonce.copy_from_slice(&nonce_bytes[..12]);
1082 let aad = format!("reddb.vault.{collection}");
1083 let ciphertext =
1084 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
1085 let mut payload = Vec::with_capacity(12 + ciphertext.len());
1086 payload.extend_from_slice(&nonce);
1087 payload.extend_from_slice(&ciphertext);
1088 Ok(crate::storage::schema::Value::Secret(payload))
1089 }
1090
1091 fn vault_key_available(&self, collection: &str) -> bool {
1092 self.vault_encryption_key(collection).is_ok()
1093 }
1094
1095 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
1096 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
1097 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
1098 })?;
1099 if !auth_store.is_vault_backed() {
1100 return Err(RedDBError::Query(
1101 "vault sealed_unavailable: key provider is sealed".to_string(),
1102 ));
1103 }
1104
1105 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
1106 return decode_vault_key(&hex_key);
1107 }
1108 auth_store.vault_secret_key().ok_or_else(|| {
1109 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
1110 })
1111 }
1112
1113 fn unseal_vault_value(
1114 &self,
1115 collection: &str,
1116 sealed: &crate::storage::schema::Value,
1117 ) -> RedDBResult<crate::storage::schema::Value> {
1118 let crate::storage::schema::Value::Secret(payload) = sealed else {
1119 return Err(RedDBError::Query(
1120 "vault unseal failed: stored value is not sealed".to_string(),
1121 ));
1122 };
1123 if payload.len() < 12 {
1124 return Err(RedDBError::Query(
1125 "vault unseal failed: sealed payload is truncated".to_string(),
1126 ));
1127 }
1128 let key = self.vault_encryption_key(collection)?;
1129 let mut nonce = [0u8; 12];
1130 nonce.copy_from_slice(&payload[..12]);
1131 let aad = format!("reddb.vault.{collection}");
1132 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
1133 &key,
1134 &nonce,
1135 aad.as_bytes(),
1136 &payload[12..],
1137 )
1138 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
1139 let (value, consumed) =
1140 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
1141 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
1142 })?;
1143 if consumed != plaintext.len() {
1144 return Err(RedDBError::Query(
1145 "vault unseal failed: trailing plaintext bytes".to_string(),
1146 ));
1147 }
1148 Ok(value)
1149 }
1150
1151 pub(crate) fn peek_vault_unsealed(
1157 &self,
1158 collection: &str,
1159 key: &str,
1160 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
1161 let ops = KvAtomicOps::new(self);
1162 let Some(entry) = ops.get_vault_entry(collection, key)? else {
1163 return Ok(None);
1164 };
1165 if entry.tombstone {
1166 return Ok(None);
1167 }
1168 Ok(self.unseal_vault_value(collection, &entry.value).ok())
1169 }
1170
1171 fn vault_target_resource(collection: &str, key: &str) -> String {
1172 if collection == "red.vault" {
1173 return format!("red.vault/{}", key.to_ascii_lowercase());
1174 }
1175 format!("{collection}.{key}")
1176 }
1177
1178 fn current_vault_actor() -> String {
1179 current_auth_identity()
1180 .map(|(principal, _)| principal)
1181 .unwrap_or_else(|| "anonymous".to_string())
1182 }
1183
1184 fn vault_request_id() -> String {
1185 let conn_id = current_connection_id();
1186 if conn_id == 0 {
1187 "embedded".to_string()
1188 } else {
1189 format!("conn-{conn_id}")
1190 }
1191 }
1192
1193 fn check_vault_capability(
1194 &self,
1195 action: &str,
1196 collection: &str,
1197 key: &str,
1198 ) -> Result<(), String> {
1199 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1200 return Ok(());
1201 };
1202 if !auth_store.iam_authorization_enabled() {
1203 return Ok(());
1204 }
1205 let Some((principal, role)) = current_auth_identity() else {
1206 return Err(
1207 "IAM authorization is enabled; vault capability check requires an authenticated principal"
1208 .to_string(),
1209 );
1210 };
1211 let tenant = current_tenant();
1212 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1213 let mut resource = crate::auth::policies::ResourceRef::new(
1214 "vault",
1215 Self::vault_target_resource(collection, key),
1216 );
1217 if let Some(ref tenant) = tenant {
1218 resource = resource.with_tenant(tenant.clone());
1219 }
1220 let ctx = crate::auth::policies::EvalContext {
1221 principal_tenant: tenant.clone(),
1222 current_tenant: tenant,
1223 peer_ip: None,
1224 mfa_present: false,
1225 now_ms: crate::utils::now_unix_millis() as u128,
1226 principal_is_admin_role: role == crate::auth::Role::Admin,
1227 principal_is_platform_scoped: principal_id.tenant.is_none(),
1228 };
1229 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1230 Ok(())
1231 } else {
1232 Err(format!(
1233 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
1234 principal,
1235 action,
1236 Self::vault_target_resource(collection, key)
1237 ))
1238 }
1239 }
1240
1241 fn check_system_vault_capability(
1242 &self,
1243 action: &str,
1244 collection: &str,
1245 key: &str,
1246 ) -> Result<(), String> {
1247 if collection != "red.vault" {
1248 return Ok(());
1249 }
1250 self.check_vault_capability(action, collection, key)
1251 }
1252
1253 fn audit_vault_unseal(
1254 &self,
1255 collection: &str,
1256 key: &str,
1257 outcome: crate::runtime::audit_log::Outcome,
1258 reason: &str,
1259 entry: Option<&VaultEntry>,
1260 ) {
1261 let actor = Self::current_vault_actor();
1262 let request_id = Self::vault_request_id();
1263 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
1264 .principal(actor.clone())
1265 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1266 .resource(format!(
1267 "vault:{}",
1268 Self::vault_target_resource(collection, key)
1269 ))
1270 .outcome(outcome)
1271 .correlation_id(request_id.clone())
1272 .fields([
1273 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1274 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1275 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1276 crate::runtime::audit_log::AuditFieldEscaper::field(
1277 "target",
1278 Self::vault_target_resource(collection, key),
1279 ),
1280 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1281 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1282 crate::runtime::audit_log::AuditFieldEscaper::field(
1283 "connection_id",
1284 current_connection_id(),
1285 ),
1286 ]);
1287 if let Some(tenant) = current_tenant() {
1288 builder = builder.tenant(tenant);
1289 }
1290 if let Some(entry) = entry {
1291 builder = builder.fields([
1292 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1293 crate::runtime::audit_log::AuditFieldEscaper::field(
1294 "sequence_id",
1295 entry.sequence_id,
1296 ),
1297 ]);
1298 }
1299 self.audit_log().record_event(builder.build());
1300 }
1301
1302 fn audit_vault_lifecycle(
1303 &self,
1304 operation: &str,
1305 collection: &str,
1306 key: &str,
1307 outcome: crate::runtime::audit_log::Outcome,
1308 reason: &str,
1309 entry: Option<&VaultEntry>,
1310 ) {
1311 let actor = Self::current_vault_actor();
1312 let request_id = Self::vault_request_id();
1313 let mut builder =
1314 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1315 .principal(actor.clone())
1316 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1317 .resource(format!(
1318 "vault:{}",
1319 Self::vault_target_resource(collection, key)
1320 ))
1321 .outcome(outcome)
1322 .correlation_id(request_id.clone())
1323 .fields([
1324 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1325 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1326 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1327 crate::runtime::audit_log::AuditFieldEscaper::field(
1328 "target",
1329 Self::vault_target_resource(collection, key),
1330 ),
1331 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1332 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1333 crate::runtime::audit_log::AuditFieldEscaper::field(
1334 "connection_id",
1335 current_connection_id(),
1336 ),
1337 ]);
1338 if let Some(tenant) = current_tenant() {
1339 builder = builder.tenant(tenant);
1340 }
1341 if let Some(entry) = entry {
1342 builder = builder.fields([
1343 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1344 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1345 crate::runtime::audit_log::AuditFieldEscaper::field(
1346 "sequence_id",
1347 entry.sequence_id,
1348 ),
1349 ]);
1350 }
1351 self.audit_log().record_event(builder.build());
1352 }
1353
1354 fn emit_vault_control_event(
1355 &self,
1356 kind: crate::runtime::control_events::EventKind,
1357 outcome: crate::runtime::control_events::Outcome,
1358 action: &'static str,
1359 collection: &str,
1360 key: &str,
1361 reason: &str,
1362 entry: Option<&VaultEntry>,
1363 extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1364 ) -> RedDBResult<()> {
1365 use crate::runtime::control_events::{
1366 ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1367 };
1368 use std::borrow::Cow;
1369
1370 let tenant = current_tenant();
1371 let principal = current_auth_identity().map(|(principal, _)| principal);
1372 let actor_user = principal
1373 .as_ref()
1374 .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1375 let request_id = Self::vault_request_id();
1376 let actor = actor_user
1377 .as_ref()
1378 .map(ActorRef::User)
1379 .unwrap_or(ActorRef::Anonymous);
1380 let ctx = ControlEventCtx {
1381 actor,
1382 scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1383 request_id: Some(Cow::Borrowed(request_id.as_str())),
1384 trace_id: None,
1385 };
1386
1387 let target = Self::vault_target_resource(collection, key);
1388 let mut fields = std::collections::HashMap::new();
1389 fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1390 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1391 fields.insert("key".to_string(), Sensitivity::raw(key));
1392 fields.insert(
1393 "connection_id".to_string(),
1394 Sensitivity::raw(current_connection_id().to_string()),
1395 );
1396 if let Some(entry) = entry {
1397 fields.insert(
1398 "entity_id".to_string(),
1399 Sensitivity::raw(entry.id.raw().to_string()),
1400 );
1401 fields.insert(
1402 "sequence_id".to_string(),
1403 Sensitivity::raw(entry.sequence_id.to_string()),
1404 );
1405 fields.insert(
1406 "version".to_string(),
1407 Sensitivity::raw(entry.version.to_string()),
1408 );
1409 fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1410 fields.insert(
1411 "tombstone".to_string(),
1412 Sensitivity::raw(entry.tombstone.to_string()),
1413 );
1414 if !entry.tombstone {
1415 fields.insert(
1416 "fingerprint".to_string(),
1417 Sensitivity::raw(vault_fingerprint(&entry.value)),
1418 );
1419 }
1420 fields.insert(
1421 "tags".to_string(),
1422 Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1423 );
1424 }
1425 for (key, value) in extra_fields {
1426 fields.insert(key, value);
1427 }
1428
1429 let event = ControlEvent {
1430 kind,
1431 outcome,
1432 action: Cow::Borrowed(action),
1433 resource: Some(format!("vault:{target}")),
1434 reason: Some(reason.to_string()),
1435 matched_policy_id: None,
1436 fields,
1437 };
1438 let ledger = self.inner.control_event_ledger.read();
1439 match ledger.emit(&ctx, event) {
1440 Ok(_) => Ok(()),
1441 Err(err) if self.inner.control_event_config.require_persistence() => {
1442 Err(RedDBError::Internal(err.to_string()))
1443 }
1444 Err(_) => Ok(()),
1445 }
1446 }
1447
1448 pub(crate) fn resolve_vault_secret_value(
1449 &self,
1450 collection: &str,
1451 key: &str,
1452 ) -> RedDBResult<Value> {
1453 let ops = KvAtomicOps::new(self);
1454 let entry = ops.get_vault_entry(collection, key)?;
1455 if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1456 self.audit_vault_unseal(
1457 collection,
1458 key,
1459 crate::runtime::audit_log::Outcome::Denied,
1460 &reason,
1461 entry.as_ref(),
1462 );
1463 self.emit_vault_control_event(
1464 crate::runtime::control_events::EventKind::VaultRead,
1465 crate::runtime::control_events::Outcome::Denied,
1466 "vault:read",
1467 collection,
1468 key,
1469 &reason,
1470 entry.as_ref(),
1471 Vec::new(),
1472 )?;
1473 return Err(RedDBError::Query(reason));
1474 }
1475 let Some(entry) = entry else {
1476 let reason = "not_found";
1477 self.audit_vault_unseal(
1478 collection,
1479 key,
1480 crate::runtime::audit_log::Outcome::Denied,
1481 reason,
1482 None,
1483 );
1484 self.emit_vault_control_event(
1485 crate::runtime::control_events::EventKind::VaultRead,
1486 crate::runtime::control_events::Outcome::Denied,
1487 "vault:read",
1488 collection,
1489 key,
1490 reason,
1491 None,
1492 Vec::new(),
1493 )?;
1494 return Err(RedDBError::NotFound(format!(
1495 "vault secret '{}.{}' not found",
1496 collection, key
1497 )));
1498 };
1499 if entry.tombstone {
1500 let reason = "deleted";
1501 self.audit_vault_unseal(
1502 collection,
1503 key,
1504 crate::runtime::audit_log::Outcome::Denied,
1505 reason,
1506 Some(&entry),
1507 );
1508 self.emit_vault_control_event(
1509 crate::runtime::control_events::EventKind::VaultRead,
1510 crate::runtime::control_events::Outcome::Denied,
1511 "vault:read",
1512 collection,
1513 key,
1514 reason,
1515 Some(&entry),
1516 Vec::new(),
1517 )?;
1518 return Err(RedDBError::NotFound(format!(
1519 "vault secret '{}.{}' is deleted",
1520 collection, key
1521 )));
1522 }
1523 match self.unseal_vault_value(collection, &entry.value) {
1524 Ok(value) => {
1525 self.audit_vault_unseal(
1526 collection,
1527 key,
1528 crate::runtime::audit_log::Outcome::Success,
1529 "ok",
1530 Some(&entry),
1531 );
1532 self.emit_vault_control_event(
1533 crate::runtime::control_events::EventKind::VaultRead,
1534 crate::runtime::control_events::Outcome::Allowed,
1535 "vault:read",
1536 collection,
1537 key,
1538 "ok",
1539 Some(&entry),
1540 Vec::new(),
1541 )?;
1542 Ok(value)
1543 }
1544 Err(err) => {
1545 let reason = err.to_string();
1546 self.audit_vault_unseal(
1547 collection,
1548 key,
1549 crate::runtime::audit_log::Outcome::Error,
1550 &reason,
1551 Some(&entry),
1552 );
1553 self.emit_vault_control_event(
1554 crate::runtime::control_events::EventKind::VaultRead,
1555 crate::runtime::control_events::Outcome::Error,
1556 "vault:read",
1557 collection,
1558 key,
1559 &reason,
1560 Some(&entry),
1561 Vec::new(),
1562 )?;
1563 Err(err)
1564 }
1565 }
1566 }
1567
1568 pub fn execute_kv_command(
1570 &self,
1571 raw_query: &str,
1572 cmd: &crate::storage::query::ast::KvCommand,
1573 ) -> RedDBResult<RuntimeQueryResult> {
1574 use crate::storage::query::ast::KvCommand;
1575
1576 let ops = KvAtomicOps::new(self);
1577
1578 match cmd {
1579 KvCommand::Put {
1580 model,
1581 collection,
1582 key,
1583 value,
1584 ttl_ms,
1585 tags,
1586 if_not_exists,
1587 } => {
1588 if *model == crate::catalog::CollectionModel::Vault {
1589 self.check_system_vault_capability("vault:write", collection, key)
1590 .map_err(RedDBError::Query)?;
1591 }
1592 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1593 let (created, id) = ops.set_with_tags_for_model(
1594 *model,
1595 collection,
1596 key,
1597 value.clone(),
1598 *ttl_ms,
1599 tags,
1600 *if_not_exists,
1601 )?;
1602
1603 let mut result = UnifiedResult::with_columns(vec![
1604 "ok".into(),
1605 "collection".into(),
1606 "key".into(),
1607 "id".into(),
1608 "created".into(),
1609 "tags".into(),
1610 ]);
1611 let mut record = UnifiedRecord::new();
1612 record.set("ok", Value::Boolean(true));
1613 record.set("collection", Value::text(collection.clone()));
1614 record.set("key", Value::text(key.clone()));
1615 record.set("id", Value::Integer(id.raw() as i64));
1616 record.set("created", Value::Boolean(created));
1617 record.set("tags", kv_tags_value(tags));
1618 result.push(record);
1619
1620 Ok(RuntimeQueryResult {
1621 query: raw_query.to_string(),
1622 mode: crate::storage::query::modes::QueryMode::Sql,
1623 statement: if *model == crate::catalog::CollectionModel::Vault {
1624 "vault_put"
1625 } else {
1626 "kv_put"
1627 },
1628 engine: if *model == crate::catalog::CollectionModel::Vault {
1629 "vault"
1630 } else {
1631 "kv"
1632 },
1633 result,
1634 affected_rows: 1,
1635 statement_type: if created { "insert" } else { "update" },
1636 bookmark: None,
1637 })
1638 }
1639 KvCommand::InvalidateTags { collection, tags } => {
1640 let invalidated = ops.invalidate_tags(collection, tags)?;
1641
1642 let mut result = UnifiedResult::with_columns(vec![
1643 "ok".into(),
1644 "collection".into(),
1645 "invalidated".into(),
1646 "tags".into(),
1647 ]);
1648 let mut record = UnifiedRecord::new();
1649 record.set("ok", Value::Boolean(true));
1650 record.set("collection", Value::text(collection.clone()));
1651 record.set("invalidated", Value::Integer(invalidated as i64));
1652 record.set("tags", kv_tags_value(tags));
1653 result.push(record);
1654
1655 Ok(RuntimeQueryResult {
1656 query: raw_query.to_string(),
1657 mode: crate::storage::query::modes::QueryMode::Sql,
1658 statement: "kv_invalidate_tags",
1659 engine: "kv",
1660 result,
1661 affected_rows: invalidated as u64,
1662 statement_type: "delete",
1663 bookmark: None,
1664 })
1665 }
1666
1667 KvCommand::Rotate {
1668 collection,
1669 key,
1670 value,
1671 tags,
1672 } => {
1673 self.check_system_vault_capability("vault:write", collection, key)
1674 .map_err(RedDBError::Query)?;
1675 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1676 let entry = ops.append_vault_version(
1677 collection,
1678 key,
1679 value.clone(),
1680 "rotate",
1681 false,
1682 tags,
1683 )?;
1684 self.record_kv_watch_event(
1685 crate::replication::cdc::ChangeOperation::Update,
1686 collection,
1687 key,
1688 entry.id.raw(),
1689 None,
1690 Some(vault_entry_metadata_json(&entry)),
1691 );
1692 self.audit_vault_lifecycle(
1693 "rotate",
1694 collection,
1695 key,
1696 crate::runtime::audit_log::Outcome::Success,
1697 "ok",
1698 Some(&entry),
1699 );
1700 self.emit_vault_control_event(
1701 crate::runtime::control_events::EventKind::VaultRotate,
1702 crate::runtime::control_events::Outcome::Allowed,
1703 "vault:rotate",
1704 collection,
1705 key,
1706 "ok",
1707 Some(&entry),
1708 Vec::new(),
1709 )?;
1710 Ok(vault_write_result(
1711 raw_query,
1712 "vault_rotate",
1713 "update",
1714 collection,
1715 key,
1716 &entry,
1717 1,
1718 ))
1719 }
1720
1721 KvCommand::List {
1722 model,
1723 collection,
1724 prefix,
1725 limit,
1726 offset,
1727 as_json,
1728 } => {
1729 if *model == crate::catalog::CollectionModel::Vault {
1730 if *as_json {
1731 return Err(RedDBError::Query(
1732 "LIST VAULT AS JSON is not supported; vault list only exposes metadata"
1733 .to_string(),
1734 ));
1735 }
1736 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1737 entries.sort_by(|left, right| left.key.cmp(&right.key));
1738 let mut result = UnifiedResult::with_columns(vec![
1739 "collection".into(),
1740 "key".into(),
1741 "version".into(),
1742 "fingerprint".into(),
1743 "tags".into(),
1744 "created_at".into(),
1745 "updated_at".into(),
1746 "status".into(),
1747 "tombstone".into(),
1748 "op".into(),
1749 ]);
1750 let mut visible = Vec::new();
1751 for entry in entries {
1752 match self.check_vault_capability(
1753 "vault:read_metadata",
1754 collection,
1755 &entry.key,
1756 ) {
1757 Ok(()) => {
1758 self.emit_vault_control_event(
1759 crate::runtime::control_events::EventKind::VaultMetadataRead,
1760 crate::runtime::control_events::Outcome::Allowed,
1761 "vault:read_metadata",
1762 collection,
1763 &entry.key,
1764 "ok",
1765 Some(&entry),
1766 Vec::new(),
1767 )?;
1768 visible.push(entry);
1769 }
1770 Err(reason) => {
1771 self.emit_vault_control_event(
1772 crate::runtime::control_events::EventKind::VaultMetadataRead,
1773 crate::runtime::control_events::Outcome::Denied,
1774 "vault:read_metadata",
1775 collection,
1776 &entry.key,
1777 &reason,
1778 Some(&entry),
1779 Vec::new(),
1780 )?;
1781 }
1782 }
1783 }
1784 for entry in visible
1785 .into_iter()
1786 .skip(*offset)
1787 .take(limit.unwrap_or(usize::MAX))
1788 {
1789 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1790 }
1791 Ok(RuntimeQueryResult {
1792 query: raw_query.to_string(),
1793 mode: crate::storage::query::modes::QueryMode::Sql,
1794 statement: "vault_list",
1795 engine: "vault",
1796 result,
1797 affected_rows: 0,
1798 statement_type: "select",
1799 bookmark: None,
1800 })
1801 } else {
1802 let mut result = UnifiedResult::with_columns(vec![
1803 "rid".into(),
1804 "collection".into(),
1805 "kind".into(),
1806 "tenant".into(),
1807 "created_at".into(),
1808 "updated_at".into(),
1809 "key".into(),
1810 "value".into(),
1811 "tags".into(),
1812 ]);
1813 let entries = ops.latest_kv_entries(collection, prefix.as_deref())?;
1814 if *as_json {
1815 let mut tree = crate::json::Value::Object(crate::json::Map::new());
1816 for (key, entity) in entries
1817 .into_iter()
1818 .skip(*offset)
1819 .take(limit.unwrap_or(usize::MAX))
1820 {
1821 let Some(value) = kv_value_from_entity(&entity) else {
1822 continue;
1823 };
1824 let relative = match prefix {
1825 Some(pfx) if key == *pfx => "",
1826 Some(pfx) => key
1827 .strip_prefix(pfx.as_str())
1828 .map(|tail| tail.strip_prefix('.').unwrap_or(tail))
1829 .unwrap_or(key.as_str()),
1830 None => key.as_str(),
1831 };
1832 insert_kv_json_path(
1833 &mut tree,
1834 relative,
1835 crate::presentation::entity_json::storage_value_to_json(&value),
1836 );
1837 }
1838 Ok(kv_list_json_result(raw_query, collection, prefix, tree))
1839 } else {
1840 for (key, entity) in entries
1841 .into_iter()
1842 .skip(*offset)
1843 .take(limit.unwrap_or(usize::MAX))
1844 {
1845 let mut record = UnifiedRecord::new();
1846 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1847 record.set("collection", Value::text(collection.clone()));
1848 record.set("kind", Value::text("kv"));
1849 record.set("tenant", Value::Null);
1850 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1851 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1852 record.set("key", Value::text(key.clone()));
1853 record.set(
1854 "value",
1855 kv_value_from_entity(&entity)
1856 .unwrap_or(crate::storage::schema::Value::Null),
1857 );
1858 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, &key)));
1859 result.push(record);
1860 }
1861 Ok(RuntimeQueryResult {
1862 query: raw_query.to_string(),
1863 mode: crate::storage::query::modes::QueryMode::Sql,
1864 statement: "kv_list",
1865 engine: "kv",
1866 result,
1867 affected_rows: 0,
1868 statement_type: "select",
1869 bookmark: None,
1870 })
1871 }
1872 }
1873 }
1874
1875 KvCommand::History { collection, key } => {
1876 let latest = ops.get_vault_entry(collection, key)?;
1877 if let Err(reason) =
1878 self.check_vault_capability("vault:read_metadata", collection, key)
1879 {
1880 self.emit_vault_control_event(
1881 crate::runtime::control_events::EventKind::VaultMetadataRead,
1882 crate::runtime::control_events::Outcome::Denied,
1883 "vault:read_metadata",
1884 collection,
1885 key,
1886 &reason,
1887 latest.as_ref(),
1888 Vec::new(),
1889 )?;
1890 return Err(RedDBError::Query(reason));
1891 }
1892 let versions =
1893 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1894 let result = vault_history_result(collection, key, &versions);
1895 self.emit_vault_control_event(
1896 crate::runtime::control_events::EventKind::VaultMetadataRead,
1897 crate::runtime::control_events::Outcome::Allowed,
1898 "vault:read_metadata",
1899 collection,
1900 key,
1901 "ok",
1902 latest.as_ref(),
1903 Vec::new(),
1904 )?;
1905 Ok(RuntimeQueryResult {
1906 query: raw_query.to_string(),
1907 mode: crate::storage::query::modes::QueryMode::Sql,
1908 statement: "vault_history",
1909 engine: "vault",
1910 result,
1911 affected_rows: 0,
1912 statement_type: "select",
1913 bookmark: None,
1914 })
1915 }
1916
1917 KvCommand::Purge { collection, key } => {
1918 let entry = ops.get_vault_entry(collection, key)?;
1919 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1920 self.audit_vault_lifecycle(
1921 "purge",
1922 collection,
1923 key,
1924 crate::runtime::audit_log::Outcome::Denied,
1925 &reason,
1926 entry.as_ref(),
1927 );
1928 self.emit_vault_control_event(
1929 crate::runtime::control_events::EventKind::VaultPurge,
1930 crate::runtime::control_events::Outcome::Denied,
1931 "vault:purge",
1932 collection,
1933 key,
1934 &reason,
1935 entry.as_ref(),
1936 Vec::new(),
1937 )?;
1938 return Err(RedDBError::Query(reason));
1939 }
1940 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1941 let purged = ops.purge_vault_versions(collection, key)?;
1942 self.audit_vault_lifecycle(
1943 "purge",
1944 collection,
1945 key,
1946 crate::runtime::audit_log::Outcome::Success,
1947 "ok",
1948 entry.as_ref(),
1949 );
1950 self.emit_vault_control_event(
1951 crate::runtime::control_events::EventKind::VaultPurge,
1952 crate::runtime::control_events::Outcome::Allowed,
1953 "vault:purge",
1954 collection,
1955 key,
1956 "ok",
1957 entry.as_ref(),
1958 vec![(
1959 "purged".to_string(),
1960 crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1961 )],
1962 )?;
1963 let mut result = UnifiedResult::with_columns(vec![
1964 "ok".into(),
1965 "collection".into(),
1966 "key".into(),
1967 "purged".into(),
1968 ]);
1969 let mut record = UnifiedRecord::new();
1970 record.set("ok", Value::Boolean(true));
1971 record.set("collection", Value::text(collection.clone()));
1972 record.set("key", Value::text(key.clone()));
1973 record.set("purged", Value::Integer(purged as i64));
1974 result.push(record);
1975 Ok(RuntimeQueryResult {
1976 query: raw_query.to_string(),
1977 mode: crate::storage::query::modes::QueryMode::Sql,
1978 statement: "vault_purge",
1979 engine: "vault",
1980 result,
1981 affected_rows: purged as u64,
1982 statement_type: "delete",
1983 bookmark: None,
1984 })
1985 }
1986
1987 KvCommand::Get {
1988 model,
1989 collection,
1990 key,
1991 } => {
1992 if *model == crate::catalog::CollectionModel::Vault {
1993 let entry = ops.get_vault_entry(collection, key)?;
1994 if let Err(reason) =
1995 self.check_vault_capability("vault:read_metadata", collection, key)
1996 {
1997 self.emit_vault_control_event(
1998 crate::runtime::control_events::EventKind::VaultMetadataRead,
1999 crate::runtime::control_events::Outcome::Denied,
2000 "vault:read_metadata",
2001 collection,
2002 key,
2003 &reason,
2004 entry.as_ref(),
2005 Vec::new(),
2006 )?;
2007 return Err(RedDBError::Query(reason));
2008 }
2009 let key_available = self.vault_key_available(collection);
2010 let result =
2011 vault_metadata_result(collection, key, entry.as_ref(), key_available);
2012 self.emit_vault_control_event(
2013 crate::runtime::control_events::EventKind::VaultMetadataRead,
2014 crate::runtime::control_events::Outcome::Allowed,
2015 "vault:read_metadata",
2016 collection,
2017 key,
2018 "ok",
2019 entry.as_ref(),
2020 Vec::new(),
2021 )?;
2022 return Ok(RuntimeQueryResult {
2023 query: raw_query.to_string(),
2024 mode: crate::storage::query::modes::QueryMode::Sql,
2025 statement: "vault_get",
2026 engine: "vault",
2027 result,
2028 affected_rows: 0,
2029 statement_type: "select",
2030 bookmark: None,
2031 });
2032 }
2033
2034 let entity = ops.get_entity(*model, collection, key)?;
2035 let value = entity.as_ref().and_then(kv_value_from_entity);
2036 if *model == crate::catalog::CollectionModel::Kv {
2037 self.inner.kv_stats.incr_gets();
2038 }
2039 let mut result = UnifiedResult::with_columns(vec![
2040 "rid".into(),
2041 "collection".into(),
2042 "kind".into(),
2043 "tenant".into(),
2044 "created_at".into(),
2045 "updated_at".into(),
2046 "key".into(),
2047 "value".into(),
2048 "tags".into(),
2049 ]);
2050 let mut record = UnifiedRecord::new();
2051 if let Some(entity) = entity.as_ref() {
2052 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
2053 record.set("created_at", Value::UnsignedInteger(entity.created_at));
2054 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
2055 } else {
2056 record.set("rid", Value::Null);
2057 record.set("created_at", Value::Null);
2058 record.set("updated_at", Value::Null);
2059 }
2060 record.set("collection", Value::text(collection.clone()));
2061 record.set("kind", Value::text(keyed_model_name(*model).to_string()));
2062 record.set("tenant", Value::Null);
2063 record.set("key", Value::text(key.clone()));
2064 record.set(
2065 "value",
2066 value.unwrap_or(crate::storage::schema::Value::Null),
2067 );
2068 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
2069 result.push(record);
2070
2071 Ok(RuntimeQueryResult {
2072 query: raw_query.to_string(),
2073 mode: crate::storage::query::modes::QueryMode::Sql,
2074 statement: "kv_get",
2075 engine: "kv",
2076 result,
2077 affected_rows: 0,
2078 statement_type: "select",
2079 bookmark: None,
2080 })
2081 }
2082 KvCommand::Watch {
2083 model,
2084 collection,
2085 key,
2086 prefix,
2087 from_lsn,
2088 } => {
2089 let watch_key = if *prefix {
2090 format!("{key}.*")
2091 } else {
2092 key.clone()
2093 };
2094 let endpoint = match from_lsn {
2095 Some(lsn) => format!(
2096 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
2097 keyed_model_name(*model)
2098 ),
2099 None => format!(
2100 "/collections/{collection}/{}/{watch_key}/watch",
2101 keyed_model_name(*model)
2102 ),
2103 };
2104 let mut result = UnifiedResult::with_columns(vec![
2105 "collection".into(),
2106 "key".into(),
2107 "prefix".into(),
2108 "from_lsn".into(),
2109 "watch_url".into(),
2110 "streaming".into(),
2111 ]);
2112 let mut record = UnifiedRecord::new();
2113 record.set("collection", Value::text(collection.clone()));
2114 record.set("key", Value::text(watch_key));
2115 record.set("prefix", Value::Boolean(*prefix));
2116 record.set(
2117 "from_lsn",
2118 from_lsn
2119 .map(Value::UnsignedInteger)
2120 .unwrap_or(crate::storage::schema::Value::Null),
2121 );
2122 record.set("watch_url", Value::text(endpoint));
2123 record.set("streaming", Value::Boolean(true));
2124 result.push(record);
2125
2126 Ok(RuntimeQueryResult {
2127 query: raw_query.to_string(),
2128 mode: crate::storage::query::modes::QueryMode::Sql,
2129 statement: "kv_watch",
2130 engine: keyed_model_name(*model),
2131 result,
2132 affected_rows: 0,
2133 statement_type: "stream",
2134 bookmark: None,
2135 })
2136 }
2137
2138 KvCommand::Unseal {
2139 collection,
2140 key,
2141 version,
2142 } => {
2143 let latest = ops.get_vault_entry(collection, key)?;
2144 let entry = match version {
2145 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
2146 None => latest.clone(),
2147 };
2148 let action = match (version, latest.as_ref()) {
2149 (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
2150 (Some(_), _) => "vault:unseal_history",
2151 _ => "vault:read",
2152 };
2153 let event_kind = if action == "vault:read" {
2154 crate::runtime::control_events::EventKind::VaultRead
2155 } else {
2156 crate::runtime::control_events::EventKind::VaultUnseal
2157 };
2158 if let Err(reason) = self.check_vault_capability(action, collection, key) {
2159 self.audit_vault_unseal(
2160 collection,
2161 key,
2162 crate::runtime::audit_log::Outcome::Denied,
2163 &reason,
2164 entry.as_ref(),
2165 );
2166 self.emit_vault_control_event(
2167 event_kind,
2168 crate::runtime::control_events::Outcome::Denied,
2169 action,
2170 collection,
2171 key,
2172 &reason,
2173 entry.as_ref(),
2174 Vec::new(),
2175 )?;
2176 return Err(RedDBError::Query(reason));
2177 }
2178 let Some(entry) = entry else {
2179 let reason = "not_found";
2180 self.audit_vault_unseal(
2181 collection,
2182 key,
2183 crate::runtime::audit_log::Outcome::Denied,
2184 reason,
2185 None,
2186 );
2187 self.emit_vault_control_event(
2188 event_kind,
2189 crate::runtime::control_events::Outcome::Denied,
2190 action,
2191 collection,
2192 key,
2193 reason,
2194 None,
2195 Vec::new(),
2196 )?;
2197 return Err(RedDBError::NotFound(format!(
2198 "vault secret '{}.{}' not found",
2199 collection, key
2200 )));
2201 };
2202 if entry.tombstone {
2203 let reason = "deleted";
2204 self.audit_vault_unseal(
2205 collection,
2206 key,
2207 crate::runtime::audit_log::Outcome::Denied,
2208 reason,
2209 Some(&entry),
2210 );
2211 self.emit_vault_control_event(
2212 event_kind,
2213 crate::runtime::control_events::Outcome::Denied,
2214 action,
2215 collection,
2216 key,
2217 reason,
2218 Some(&entry),
2219 Vec::new(),
2220 )?;
2221 return Err(RedDBError::NotFound(format!(
2222 "vault secret '{}.{}' is deleted",
2223 collection, key
2224 )));
2225 }
2226 match self.unseal_vault_value(collection, &entry.value) {
2227 Ok(value) => {
2228 self.audit_vault_unseal(
2229 collection,
2230 key,
2231 crate::runtime::audit_log::Outcome::Success,
2232 "ok",
2233 Some(&entry),
2234 );
2235 self.emit_vault_control_event(
2236 event_kind,
2237 crate::runtime::control_events::Outcome::Allowed,
2238 action,
2239 collection,
2240 key,
2241 "ok",
2242 Some(&entry),
2243 Vec::new(),
2244 )?;
2245 let mut result = UnifiedResult::with_columns(vec![
2246 "collection".into(),
2247 "key".into(),
2248 "value".into(),
2249 ]);
2250 let mut record = UnifiedRecord::new();
2251 record.set("collection", Value::text(collection.clone()));
2252 record.set("key", Value::text(key.clone()));
2253 record.set("value", value);
2254 result.push(record);
2255 Ok(RuntimeQueryResult {
2256 query: raw_query.to_string(),
2257 mode: crate::storage::query::modes::QueryMode::Sql,
2258 statement: "vault_unseal",
2259 engine: "vault",
2260 result,
2261 affected_rows: 0,
2262 statement_type: "select",
2263 bookmark: None,
2264 })
2265 }
2266 Err(err) => {
2267 let reason = err.to_string();
2268 self.audit_vault_unseal(
2269 collection,
2270 key,
2271 crate::runtime::audit_log::Outcome::Error,
2272 &reason,
2273 Some(&entry),
2274 );
2275 self.emit_vault_control_event(
2276 event_kind,
2277 crate::runtime::control_events::Outcome::Error,
2278 action,
2279 collection,
2280 key,
2281 &reason,
2282 Some(&entry),
2283 Vec::new(),
2284 )?;
2285 Err(err)
2286 }
2287 }
2288 }
2289
2290 KvCommand::Incr {
2291 model,
2292 collection,
2293 key,
2294 by,
2295 ttl_ms,
2296 } => {
2297 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2298 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
2299
2300 let mut result = UnifiedResult::with_columns(vec![
2301 "ok".into(),
2302 "collection".into(),
2303 "key".into(),
2304 "value".into(),
2305 ]);
2306 let mut record = UnifiedRecord::new();
2307 record.set("ok", Value::Boolean(true));
2308 record.set("collection", Value::text(collection.clone()));
2309 record.set("key", Value::text(key.clone()));
2310 record.set("value", Value::Integer(new_value));
2311 result.push(record);
2312
2313 Ok(RuntimeQueryResult {
2314 query: raw_query.to_string(),
2315 mode: crate::storage::query::modes::QueryMode::Sql,
2316 statement: "kv_incr",
2317 engine: "kv",
2318 result,
2319 affected_rows: 1,
2320 statement_type: "update",
2321 bookmark: None,
2322 })
2323 }
2324
2325 KvCommand::Cas {
2326 model,
2327 collection,
2328 key,
2329 expected,
2330 new_value,
2331 ttl_ms,
2332 } => {
2333 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2334 let (ok, current) = ops.cas(
2335 *model,
2336 collection,
2337 key,
2338 expected.as_ref(),
2339 new_value.clone(),
2340 *ttl_ms,
2341 )?;
2342
2343 let mut result = UnifiedResult::with_columns(vec![
2344 "ok".into(),
2345 "collection".into(),
2346 "key".into(),
2347 "current".into(),
2348 ]);
2349 let mut record = UnifiedRecord::new();
2350 record.set("ok", Value::Boolean(ok));
2351 record.set("collection", Value::text(collection.clone()));
2352 record.set("key", Value::text(key.clone()));
2353 record.set(
2354 "current",
2355 current.unwrap_or(crate::storage::schema::Value::Null),
2356 );
2357 result.push(record);
2358
2359 Ok(RuntimeQueryResult {
2360 query: raw_query.to_string(),
2361 mode: crate::storage::query::modes::QueryMode::Sql,
2362 statement: "kv_cas",
2363 engine: "kv",
2364 result,
2365 affected_rows: if ok { 1 } else { 0 },
2366 statement_type: "update",
2367 bookmark: None,
2368 })
2369 }
2370
2371 KvCommand::Delete {
2372 model,
2373 collection,
2374 key,
2375 } => {
2376 if *model == crate::catalog::CollectionModel::Vault {
2377 self.check_system_vault_capability("vault:write", collection, key)
2378 .map_err(RedDBError::Query)?;
2379 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2380 let entry = ops.append_vault_version(
2381 collection,
2382 key,
2383 Value::Null,
2384 "delete",
2385 true,
2386 &[],
2387 )?;
2388 self.record_kv_watch_event(
2389 crate::replication::cdc::ChangeOperation::Delete,
2390 collection,
2391 key,
2392 entry.id.raw(),
2393 None,
2394 Some(vault_entry_metadata_json(&entry)),
2395 );
2396 self.audit_vault_lifecycle(
2397 "delete",
2398 collection,
2399 key,
2400 crate::runtime::audit_log::Outcome::Success,
2401 "ok",
2402 Some(&entry),
2403 );
2404 return Ok(vault_write_result(
2405 raw_query,
2406 "vault_delete",
2407 "delete",
2408 collection,
2409 key,
2410 &entry,
2411 1,
2412 ));
2413 }
2414 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2415 let deleted = ops.delete(*model, collection, key)?;
2416
2417 let mut result = UnifiedResult::with_columns(vec![
2418 "ok".into(),
2419 "collection".into(),
2420 "key".into(),
2421 "deleted".into(),
2422 ]);
2423 let mut record = UnifiedRecord::new();
2424 record.set("ok", Value::Boolean(true));
2425 record.set("collection", Value::text(collection.clone()));
2426 record.set("key", Value::text(key.clone()));
2427 record.set("deleted", Value::Boolean(deleted));
2428 result.push(record);
2429
2430 Ok(RuntimeQueryResult {
2431 query: raw_query.to_string(),
2432 mode: crate::storage::query::modes::QueryMode::Sql,
2433 statement: "kv_delete",
2434 engine: "kv",
2435 result,
2436 affected_rows: if deleted { 1 } else { 0 },
2437 statement_type: "delete",
2438 bookmark: None,
2439 })
2440 }
2441 }
2442 }
2443
2444 pub fn vault_watch_events_since(
2445 &self,
2446 collection: &str,
2447 key: &str,
2448 since_lsn: u64,
2449 max_count: usize,
2450 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2451 self.kv_watch_events_since(collection, key, since_lsn, max_count)
2452 .into_iter()
2453 .filter(|event| {
2454 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2455 .is_ok()
2456 })
2457 .map(vault_filter_watch_event)
2458 .collect()
2459 }
2460
2461 pub fn vault_watch_events_since_prefix(
2462 &self,
2463 collection: &str,
2464 prefix: &str,
2465 since_lsn: u64,
2466 max_count: usize,
2467 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2468 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2469 .into_iter()
2470 .filter(|event| {
2471 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2472 .is_ok()
2473 })
2474 .map(vault_filter_watch_event)
2475 .collect()
2476 }
2477
2478 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2479 let auth_store = match self.inner.auth_store.read().clone() {
2480 Some(store) => store,
2481 None => return Ok(()),
2482 };
2483 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2484 Some(identity) => identity,
2485 None => return Ok(()),
2486 };
2487 if role < crate::auth::Role::Write {
2488 return Err(RedDBError::Query(format!(
2489 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2490 )));
2491 }
2492 if !auth_store.iam_authorization_enabled() {
2493 return Ok(());
2494 }
2495
2496 let tenant = crate::runtime::impl_core::current_tenant();
2497 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2498 let mut resource =
2499 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2500 if let Some(tenant) = tenant.clone() {
2501 resource = resource.with_tenant(tenant);
2502 }
2503 let ctx = crate::auth::policies::EvalContext {
2504 principal_tenant: tenant.clone(),
2505 current_tenant: tenant,
2506 peer_ip: None,
2507 mfa_present: false,
2508 now_ms: current_unix_ms(),
2509 principal_is_admin_role: role == crate::auth::Role::Admin,
2510 principal_is_platform_scoped: principal.tenant.is_none(),
2511 };
2512 if auth_store.check_policy_authz_with_role(
2513 &principal,
2514 "kv:invalidate",
2515 &resource,
2516 &ctx,
2517 role,
2518 ) {
2519 Ok(())
2520 } else {
2521 Err(RedDBError::Query(format!(
2522 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2523 )))
2524 }
2525 }
2526}
2527
2528fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2529 let ttl_ms = ttl_ms?;
2530 Some(Metadata::with_fields(
2531 [(
2532 "_ttl_ms".to_string(),
2533 if ttl_ms <= i64::MAX as u64 {
2534 MetadataValue::Int(ttl_ms as i64)
2535 } else {
2536 MetadataValue::Timestamp(ttl_ms)
2537 },
2538 )]
2539 .into_iter()
2540 .collect(),
2541 ))
2542}
2543
2544fn vault_write_result(
2545 raw_query: &str,
2546 statement: &'static str,
2547 statement_type: &'static str,
2548 collection: &str,
2549 key: &str,
2550 entry: &VaultEntry,
2551 affected_rows: u64,
2552) -> RuntimeQueryResult {
2553 let mut result = UnifiedResult::with_columns(vec![
2554 "ok".into(),
2555 "collection".into(),
2556 "key".into(),
2557 "version".into(),
2558 "fingerprint".into(),
2559 "tombstone".into(),
2560 "op".into(),
2561 "id".into(),
2562 ]);
2563 let mut record = UnifiedRecord::new();
2564 record.set("ok", Value::Boolean(true));
2565 record.set("collection", Value::text(collection.to_string()));
2566 record.set("key", Value::text(key.to_string()));
2567 record.set("version", Value::Integer(entry.version));
2568 if entry.tombstone {
2569 record.set("fingerprint", Value::Null);
2570 } else {
2571 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2572 }
2573 record.set("tombstone", Value::Boolean(entry.tombstone));
2574 record.set("op", Value::text(entry.op.clone()));
2575 record.set("id", Value::Integer(entry.id.raw() as i64));
2576 result.push(record);
2577 RuntimeQueryResult {
2578 query: raw_query.to_string(),
2579 mode: crate::storage::query::modes::QueryMode::Sql,
2580 statement,
2581 engine: "vault",
2582 result,
2583 affected_rows,
2584 statement_type,
2585 bookmark: None,
2586 }
2587}
2588
2589fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2590 let mut result = UnifiedResult::with_columns(vec![
2591 "collection".into(),
2592 "key".into(),
2593 "version".into(),
2594 "fingerprint".into(),
2595 "tags".into(),
2596 "created_at".into(),
2597 "updated_at".into(),
2598 "status".into(),
2599 "tombstone".into(),
2600 "op".into(),
2601 ]);
2602 for entry in versions {
2603 push_vault_metadata_record(&mut result, collection, key, entry);
2604 }
2605 result
2606}
2607
2608fn push_vault_metadata_record(
2609 result: &mut UnifiedResult,
2610 collection: &str,
2611 key: &str,
2612 entry: &VaultEntry,
2613) {
2614 let mut record = UnifiedRecord::new();
2615 record.set("collection", Value::text(collection.to_string()));
2616 record.set("key", Value::text(key.to_string()));
2617 record.set("version", Value::Integer(entry.version));
2618 if entry.tombstone {
2619 record.set("fingerprint", Value::Null);
2620 record.set("status", Value::text("deleted"));
2621 } else {
2622 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2623 record.set("status", Value::text("sealed"));
2624 }
2625 record.set("tags", vault_tags_value(&entry.metadata));
2626 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2627 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2628 record.set("tombstone", Value::Boolean(entry.tombstone));
2629 record.set("op", Value::text(entry.op.clone()));
2630 result.push(record);
2631}
2632
2633fn vault_metadata_result(
2634 collection: &str,
2635 key: &str,
2636 entry: Option<&VaultEntry>,
2637 key_available: bool,
2638) -> UnifiedResult {
2639 let mut result = UnifiedResult::with_columns(vec![
2640 "collection".into(),
2641 "key".into(),
2642 "version".into(),
2643 "fingerprint".into(),
2644 "tags".into(),
2645 "created_at".into(),
2646 "updated_at".into(),
2647 "value".into(),
2648 "status".into(),
2649 "tombstone".into(),
2650 "op".into(),
2651 ]);
2652 let mut record = UnifiedRecord::new();
2653 record.set("collection", Value::text(collection.to_string()));
2654 record.set("key", Value::text(key.to_string()));
2655 match entry {
2656 Some(entry) => {
2657 record.set("version", Value::Integer(entry.version));
2658 if entry.tombstone {
2659 record.set("fingerprint", Value::Null);
2660 } else {
2661 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2662 }
2663 record.set("tags", vault_tags_value(&entry.metadata));
2664 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2665 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2666 record.set("value", Value::text("***"));
2667 record.set(
2668 "status",
2669 Value::text(if entry.tombstone {
2670 "deleted"
2671 } else if key_available {
2672 "sealed"
2673 } else {
2674 "sealed_unavailable"
2675 }),
2676 );
2677 record.set("tombstone", Value::Boolean(entry.tombstone));
2678 record.set("op", Value::text(entry.op.clone()));
2679 }
2680 None => {
2681 record.set("version", Value::Null);
2682 record.set("fingerprint", Value::Null);
2683 record.set("tags", Value::Array(Vec::new()));
2684 record.set("created_at", Value::Null);
2685 record.set("updated_at", Value::Null);
2686 record.set("value", Value::text(""));
2687 record.set("status", Value::text("missing"));
2688 record.set("tombstone", Value::Boolean(false));
2689 record.set("op", Value::Null);
2690 }
2691 }
2692 result.push(record);
2693 result
2694}
2695
2696fn vault_fingerprint(value: &Value) -> String {
2697 match value {
2698 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2699 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2700 }
2701}
2702
2703fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2704 let mut object = crate::json::Map::new();
2705 object.insert(
2706 "key".to_string(),
2707 crate::json::Value::String(entry.key.clone()),
2708 );
2709 object.insert(
2710 "version".to_string(),
2711 crate::json::Value::Number(entry.version as f64),
2712 );
2713 object.insert(
2714 "fingerprint".to_string(),
2715 if entry.tombstone {
2716 crate::json::Value::Null
2717 } else {
2718 crate::json::Value::String(vault_fingerprint(&entry.value))
2719 },
2720 );
2721 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2722 object.insert(
2723 "actor".to_string(),
2724 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2725 );
2726 object.insert(
2727 "sequence_id".to_string(),
2728 crate::json::Value::Number(entry.sequence_id as f64),
2729 );
2730 object.insert(
2731 "tombstone".to_string(),
2732 crate::json::Value::Bool(entry.tombstone),
2733 );
2734 object.insert(
2735 "op".to_string(),
2736 crate::json::Value::String(entry.op.clone()),
2737 );
2738 crate::json::Value::Object(object)
2739}
2740
2741fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2742 match vault_tags_value(metadata) {
2743 Value::Array(values) => crate::json::Value::Array(
2744 values
2745 .into_iter()
2746 .filter_map(|value| match value {
2747 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2748 _ => None,
2749 })
2750 .collect(),
2751 ),
2752 _ => crate::json::Value::Array(Vec::new()),
2753 }
2754}
2755
2756fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2757 [(
2758 "tags".to_string(),
2759 MetadataValue::Array(
2760 tags.iter()
2761 .map(|tag| MetadataValue::String(tag.clone()))
2762 .collect(),
2763 ),
2764 )]
2765 .into_iter()
2766 .collect()
2767}
2768
2769fn vault_filter_watch_event(
2770 mut event: crate::replication::cdc::KvWatchEvent,
2771) -> crate::replication::cdc::KvWatchEvent {
2772 event.before = event.before.and_then(vault_metadata_json_only);
2773 event.after = event.after.and_then(vault_metadata_json_only);
2774 event
2775}
2776
2777fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2778 let object = value.as_object()?;
2779 let mut out = crate::json::Map::new();
2780 for field in [
2781 "key",
2782 "version",
2783 "fingerprint",
2784 "tags",
2785 "actor",
2786 "sequence_id",
2787 "tombstone",
2788 "op",
2789 ] {
2790 if let Some(value) = object.get(field) {
2791 out.insert(field.to_string(), value.clone());
2792 }
2793 }
2794 Some(crate::json::Value::Object(out))
2795}
2796
2797fn vault_tags_value(metadata: &Metadata) -> Value {
2798 match metadata.get("tags") {
2799 Some(MetadataValue::Array(values)) => Value::Array(
2800 values
2801 .iter()
2802 .filter_map(|value| match value {
2803 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2804 _ => None,
2805 })
2806 .collect(),
2807 ),
2808 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2809 Value::Array(vec![Value::text(tag.clone())])
2810 }
2811 _ => Value::Array(Vec::new()),
2812 }
2813}
2814
2815fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2816 let bytes = hex::decode(hex_key)
2817 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2818 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2819 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2820 })?;
2821 Ok(key)
2822}
2823
2824fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2825 if tags.is_empty() {
2826 return None;
2827 }
2828 let values = tags
2829 .iter()
2830 .map(|tag| MetadataValue::String(tag.clone()))
2831 .collect();
2832 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2833}
2834
2835fn kv_tags_value(tags: &[String]) -> Value {
2836 let json = crate::json::Value::Array(
2837 tags.iter()
2838 .map(|tag| crate::json::Value::String(tag.clone()))
2839 .collect(),
2840 );
2841 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2842}
2843
2844fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2845 if let crate::storage::EntityData::Row(ref row) = entity.data {
2846 if let Some(ref named) = row.named {
2847 return named.get("value").cloned();
2848 }
2849 }
2850 None
2851}
2852
2853fn kv_entity_has_key(entity: &crate::storage::UnifiedEntity, key: &str) -> bool {
2855 if let crate::storage::EntityData::Row(ref row) = entity.data {
2856 if let Some(ref named) = row.named {
2857 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2858 return &**k == key;
2859 }
2860 }
2861 }
2862 false
2863}
2864
2865fn insert_kv_json_path(root: &mut crate::json::Value, path: &str, value: crate::json::Value) {
2866 let segments: Vec<&str> = path
2867 .split('.')
2868 .filter(|segment| !segment.is_empty())
2869 .collect();
2870 insert_kv_json_segments(root, &segments, value);
2871}
2872
2873fn insert_kv_json_segments(
2874 root: &mut crate::json::Value,
2875 segments: &[&str],
2876 value: crate::json::Value,
2877) {
2878 if segments.is_empty() {
2879 *root = value;
2880 return;
2881 }
2882
2883 if !matches!(root, crate::json::Value::Object(_)) {
2884 *root = crate::json::Value::Object(crate::json::Map::new());
2885 }
2886
2887 let crate::json::Value::Object(map) = root else {
2888 return;
2889 };
2890 if segments.len() == 1 {
2891 map.insert(segments[0].to_string(), value);
2892 return;
2893 }
2894 let entry = map
2895 .entry(segments[0].to_string())
2896 .or_insert_with(|| crate::json::Value::Object(crate::json::Map::new()));
2897 insert_kv_json_segments(entry, &segments[1..], value);
2898}
2899
2900fn kv_list_json_result(
2901 raw_query: &str,
2902 collection: &str,
2903 prefix: &Option<String>,
2904 value: crate::json::Value,
2905) -> RuntimeQueryResult {
2906 let mut result =
2907 UnifiedResult::with_columns(vec!["collection".into(), "prefix".into(), "value".into()]);
2908 let mut record = UnifiedRecord::new();
2909 record.set("collection", Value::text(collection.to_string()));
2910 record.set(
2911 "prefix",
2912 prefix
2913 .as_ref()
2914 .map(|prefix| Value::text(prefix.clone()))
2915 .unwrap_or(Value::Null),
2916 );
2917 record.set("value", Value::Json(value.to_string_compact().into_bytes()));
2918 result.push(record);
2919 RuntimeQueryResult {
2920 query: raw_query.to_string(),
2921 mode: crate::storage::query::modes::QueryMode::Sql,
2922 statement: "kv_list_json",
2923 engine: "kv",
2924 result,
2925 affected_rows: 0,
2926 statement_type: "select",
2927 bookmark: None,
2928 }
2929}
2930
2931fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2932 let now = current_unix_ms();
2933 crate::physical::CollectionContract {
2934 name: name.to_string(),
2935 declared_model: crate::catalog::CollectionModel::Kv,
2936 schema_mode: crate::catalog::SchemaMode::Dynamic,
2937 origin: crate::physical::ContractOrigin::Implicit,
2938 version: 1,
2939 created_at_unix_ms: now,
2940 updated_at_unix_ms: now,
2941 default_ttl_ms: None,
2942 vector_dimension: None,
2943 vector_metric: None,
2944 context_index_fields: Vec::new(),
2945 declared_columns: Vec::new(),
2946 table_def: None,
2947 timestamps_enabled: false,
2948 context_index_enabled: false,
2949 metrics_raw_retention_ms: None,
2950 metrics_rollup_policies: Vec::new(),
2951 metrics_tenant_identity: None,
2952 metrics_namespace: None,
2953 append_only: false,
2954 subscriptions: Vec::new(),
2955 analytics_config: Vec::new(),
2956 session_key: None,
2957 session_gap_ms: None,
2958 retention_duration_ms: None,
2959 analytical_storage: None,
2960
2961 ai_policy: None,
2962 }
2963}
2964
2965fn current_unix_ms() -> u128 {
2966 std::time::SystemTime::now()
2967 .duration_since(std::time::UNIX_EPOCH)
2968 .unwrap_or_default()
2969 .as_millis()
2970}
2971
2972#[cfg(test)]
2973mod tests {
2974 use crate::api::RedDBOptions;
2975 use crate::catalog::CollectionModel;
2976 use crate::runtime::RedDBRuntime;
2977
2978 fn rt() -> RedDBRuntime {
2979 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2980 }
2981
2982 #[test]
2983 fn incr_missing_key_initialises_at_by() {
2984 let r = rt();
2985 let ops = super::KvAtomicOps::new(&r);
2986 let v = ops
2987 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2988 .unwrap();
2989 assert_eq!(v, 5);
2990 }
2991
2992 #[test]
2993 fn kv_runtime_stats_count_public_ops() {
2994 let r = rt();
2995 let ops = super::KvAtomicOps::new(&r);
2996
2997 ops.set(
2998 CollectionModel::Kv,
2999 "kv_default",
3000 "profile",
3001 crate::storage::schema::Value::text("alice"),
3002 None,
3003 false,
3004 )
3005 .unwrap();
3006 ops.get(CollectionModel::Kv, "kv_default", "profile")
3007 .unwrap();
3008 ops.delete(CollectionModel::Kv, "kv_default", "profile")
3009 .unwrap();
3010 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
3011 .unwrap();
3012 ops.cas(
3013 CollectionModel::Kv,
3014 "kv_default",
3015 "profile",
3016 None,
3017 crate::storage::schema::Value::text("created"),
3018 None,
3019 )
3020 .unwrap();
3021 ops.cas(
3022 CollectionModel::Kv,
3023 "kv_default",
3024 "profile",
3025 Some(&crate::storage::schema::Value::text("different")),
3026 crate::storage::schema::Value::text("ignored"),
3027 None,
3028 )
3029 .unwrap();
3030
3031 let stats = r.stats().kv;
3032 assert_eq!(stats.puts, 1);
3033 assert_eq!(stats.gets, 1);
3034 assert_eq!(stats.deletes, 1);
3035 assert_eq!(stats.incrs, 1);
3036 assert_eq!(stats.cas_success, 1);
3037 assert_eq!(stats.cas_conflict, 1);
3038 }
3039
3040 #[test]
3041 fn kv_invalidate_tags_removes_matching_entries_only() {
3042 let r = rt();
3043
3044 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
3045 .unwrap();
3046
3047 let miss = r
3048 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
3049 .unwrap();
3050 assert_eq!(miss.affected_rows, 0);
3051 assert!(matches!(
3052 r.execute_query("KV GET sessions.blob")
3053 .unwrap()
3054 .result
3055 .records[0]
3056 .get("value"),
3057 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
3058 ));
3059
3060 let hit = r
3061 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
3062 .unwrap();
3063 assert_eq!(hit.affected_rows, 1);
3064 assert!(matches!(
3065 r.execute_query("KV GET sessions.blob")
3066 .unwrap()
3067 .result
3068 .records[0]
3069 .get("value"),
3070 Some(crate::storage::schema::Value::Null)
3071 ));
3072 }
3073
3074 #[test]
3075 fn kv_runtime_stats_count_watch_streams_and_events() {
3076 let r = rt();
3077 let ops = super::KvAtomicOps::new(&r);
3078 assert_eq!(r.stats().kv.watch_streams_active, 0);
3079
3080 {
3081 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
3082 assert_eq!(r.stats().kv.watch_streams_active, 1);
3083
3084 ops.set(
3085 CollectionModel::Kv,
3086 "kv_default",
3087 "watched",
3088 crate::storage::schema::Value::Integer(1),
3089 None,
3090 false,
3091 )
3092 .unwrap();
3093 let event = stream.poll_next().expect("watch event");
3094 assert_eq!(event.key, "watched");
3095 assert_eq!(r.stats().kv.watch_events_emitted, 1);
3096
3097 stream.record_drop_count(3);
3098 assert_eq!(r.stats().kv.watch_drops, 3);
3099 }
3100
3101 assert_eq!(r.stats().kv.watch_streams_active, 0);
3102 }
3103
3104 #[test]
3105 fn incr_existing_integer_accumulates() {
3106 let r = rt();
3107 let ops = super::KvAtomicOps::new(&r);
3108 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3109 .unwrap();
3110 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3111 .unwrap();
3112 let v = ops
3113 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3114 .unwrap();
3115 assert_eq!(v, 3);
3116 }
3117
3118 #[test]
3119 fn decr_via_negative_by() {
3120 let r = rt();
3121 let ops = super::KvAtomicOps::new(&r);
3122 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
3123 .unwrap();
3124 let v = ops
3125 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
3126 .unwrap();
3127 assert_eq!(v, 7);
3128 }
3129
3130 #[test]
3131 fn concurrent_incr_single_key_is_atomic() {
3132 const THREADS: usize = 8;
3133 const ITERS: usize = 1000;
3134
3135 let runtime = std::sync::Arc::new(rt());
3136 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3137 let mut handles = Vec::new();
3138
3139 for _ in 0..THREADS {
3140 let runtime = std::sync::Arc::clone(&runtime);
3141 let barrier = std::sync::Arc::clone(&barrier);
3142 handles.push(std::thread::spawn(move || {
3143 let ops = super::KvAtomicOps::new(&runtime);
3144 barrier.wait();
3145 for _ in 0..ITERS {
3146 ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
3147 .unwrap();
3148 }
3149 }));
3150 }
3151
3152 for handle in handles {
3153 handle.join().expect("worker should finish");
3154 }
3155
3156 let ops = super::KvAtomicOps::new(&runtime);
3157 assert_eq!(
3158 ops.get(CollectionModel::Kv, "kv_default", "counter")
3159 .unwrap(),
3160 Some(crate::storage::schema::Value::Integer(
3161 (THREADS * ITERS) as i64
3162 ))
3163 );
3164 }
3165
3166 #[test]
3167 fn incr_on_string_value_returns_error() {
3168 let r = rt();
3169 let ops = super::KvAtomicOps::new(&r);
3170 ops.set(
3171 CollectionModel::Kv,
3172 "kv_default",
3173 "name",
3174 crate::storage::schema::Value::text("alice"),
3175 None,
3176 false,
3177 )
3178 .unwrap();
3179 let err = ops
3180 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
3181 .unwrap_err();
3182 assert!(err.to_string().contains("non-integer"));
3183 }
3184
3185 #[test]
3188 fn cas_matching_value_succeeds() {
3189 let r = rt();
3190 let ops = super::KvAtomicOps::new(&r);
3191 ops.set(
3192 CollectionModel::Kv,
3193 "kv_default",
3194 "lock",
3195 crate::storage::schema::Value::text("free"),
3196 None,
3197 false,
3198 )
3199 .unwrap();
3200 let (ok, prev) = ops
3201 .cas(
3202 CollectionModel::Kv,
3203 "kv_default",
3204 "lock",
3205 Some(&crate::storage::schema::Value::text("free")),
3206 crate::storage::schema::Value::text("held"),
3207 None,
3208 )
3209 .unwrap();
3210 assert!(ok);
3211 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
3212 assert_eq!(
3214 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3215 Some(crate::storage::schema::Value::text("held"))
3216 );
3217 }
3218
3219 #[test]
3220 fn concurrent_cas_allows_one_success_per_round() {
3221 const THREADS: usize = 8;
3222 const ROUNDS: usize = 100;
3223
3224 let runtime = std::sync::Arc::new(rt());
3225 let ops = super::KvAtomicOps::new(&runtime);
3226 ops.set(
3227 CollectionModel::Kv,
3228 "kv_default",
3229 "cas_counter",
3230 crate::storage::schema::Value::Integer(0),
3231 None,
3232 false,
3233 )
3234 .unwrap();
3235
3236 let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3237 let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3238 let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
3239 let mut handles = Vec::new();
3240
3241 for _ in 0..THREADS {
3242 let runtime = std::sync::Arc::clone(&runtime);
3243 let start_round = std::sync::Arc::clone(&start_round);
3244 let finish_round = std::sync::Arc::clone(&finish_round);
3245 let successes = std::sync::Arc::clone(&successes);
3246 handles.push(std::thread::spawn(move || {
3247 let ops = super::KvAtomicOps::new(&runtime);
3248 for round in 0..ROUNDS {
3249 start_round.wait();
3250 let (ok, _) = ops
3251 .cas(
3252 CollectionModel::Kv,
3253 "kv_default",
3254 "cas_counter",
3255 Some(&crate::storage::schema::Value::Integer(round as i64)),
3256 crate::storage::schema::Value::Integer((round + 1) as i64),
3257 None,
3258 )
3259 .unwrap();
3260 if ok {
3261 successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3262 }
3263 finish_round.wait();
3264 }
3265 }));
3266 }
3267
3268 for handle in handles {
3269 handle.join().expect("worker should finish");
3270 }
3271
3272 assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
3273 assert_eq!(
3274 ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
3275 .unwrap(),
3276 Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
3277 );
3278 }
3279
3280 #[test]
3281 fn cas_mismatching_value_fails() {
3282 let r = rt();
3283 let ops = super::KvAtomicOps::new(&r);
3284 ops.set(
3285 CollectionModel::Kv,
3286 "kv_default",
3287 "lock",
3288 crate::storage::schema::Value::text("free"),
3289 None,
3290 false,
3291 )
3292 .unwrap();
3293 let (ok, current) = ops
3294 .cas(
3295 CollectionModel::Kv,
3296 "kv_default",
3297 "lock",
3298 Some(&crate::storage::schema::Value::text("held")),
3299 crate::storage::schema::Value::text("worker-7"),
3300 None,
3301 )
3302 .unwrap();
3303 assert!(!ok);
3304 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
3305 assert_eq!(
3307 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3308 Some(crate::storage::schema::Value::text("free"))
3309 );
3310 }
3311
3312 #[test]
3313 fn cas_expect_null_on_missing_key_creates() {
3314 let r = rt();
3315 let ops = super::KvAtomicOps::new(&r);
3316 let (ok, prev) = ops
3317 .cas(
3318 CollectionModel::Kv,
3319 "kv_default",
3320 "new_key",
3321 None,
3322 crate::storage::schema::Value::text("created"),
3323 None,
3324 )
3325 .unwrap();
3326 assert!(ok);
3327 assert_eq!(prev, None);
3328 assert_eq!(
3329 ops.get(CollectionModel::Kv, "kv_default", "new_key")
3330 .unwrap(),
3331 Some(crate::storage::schema::Value::text("created"))
3332 );
3333 }
3334
3335 #[test]
3336 fn cas_expect_null_on_existing_key_fails() {
3337 let r = rt();
3338 let ops = super::KvAtomicOps::new(&r);
3339 ops.set(
3340 CollectionModel::Kv,
3341 "kv_default",
3342 "taken",
3343 crate::storage::schema::Value::text("worker-1"),
3344 None,
3345 false,
3346 )
3347 .unwrap();
3348 let (ok, current) = ops
3349 .cas(
3350 CollectionModel::Kv,
3351 "kv_default",
3352 "taken",
3353 None,
3354 crate::storage::schema::Value::text("worker-2"),
3355 None,
3356 )
3357 .unwrap();
3358 assert!(!ok);
3359 assert_eq!(
3360 current,
3361 Some(crate::storage::schema::Value::text("worker-1"))
3362 );
3363 }
3364
3365 #[test]
3366 fn cas_via_sql_roundtrip() {
3367 let r = rt();
3368 r.execute_query("KV PUT lock = 'free'").unwrap();
3370 let res = r
3372 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3373 .unwrap();
3374 let row = &res.result.records[0];
3375 assert_eq!(
3376 row.get("ok"),
3377 Some(&crate::storage::schema::Value::Boolean(true))
3378 );
3379 let res2 = r
3381 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3382 .unwrap();
3383 let row2 = &res2.result.records[0];
3384 assert_eq!(
3385 row2.get("ok"),
3386 Some(&crate::storage::schema::Value::Boolean(false))
3387 );
3388 }
3389
3390 #[test]
3391 fn cas_expect_null_via_sql() {
3392 let r = rt();
3393 let res = r
3394 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
3395 .unwrap();
3396 let row = &res.result.records[0];
3397 assert_eq!(
3398 row.get("ok"),
3399 Some(&crate::storage::schema::Value::Boolean(true))
3400 );
3401 let res2 = r
3403 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
3404 .unwrap();
3405 let row2 = &res2.result.records[0];
3406 assert_eq!(
3407 row2.get("ok"),
3408 Some(&crate::storage::schema::Value::Boolean(false))
3409 );
3410 }
3411
3412 #[test]
3413 fn incr_via_sql_roundtrip() {
3414 let r = rt();
3415 let res = r.execute_query("KV INCR hits").unwrap();
3416 let row = &res.result.records[0];
3417 assert_eq!(
3418 row.get("value"),
3419 Some(&crate::storage::schema::Value::Integer(1))
3420 );
3421 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
3422 let row2 = &res2.result.records[0];
3423 assert_eq!(
3424 row2.get("value"),
3425 Some(&crate::storage::schema::Value::Integer(5))
3426 );
3427 }
3428
3429 #[test]
3430 fn concurrent_self_referential_update_is_atomic() {
3431 const THREADS: usize = 8;
3432 const ITERS: usize = 100;
3433
3434 let runtime = std::sync::Arc::new(rt());
3435 runtime
3436 .execute_query("CREATE TABLE counters (id INT, n INT)")
3437 .unwrap();
3438 runtime
3439 .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3440 .unwrap();
3441
3442 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3443 let mut handles = Vec::new();
3444 for _ in 0..THREADS {
3445 let runtime = std::sync::Arc::clone(&runtime);
3446 let barrier = std::sync::Arc::clone(&barrier);
3447 handles.push(std::thread::spawn(move || {
3448 barrier.wait();
3449 for _ in 0..ITERS {
3450 runtime
3451 .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3452 .unwrap();
3453 }
3454 }));
3455 }
3456
3457 for handle in handles {
3458 handle.join().expect("worker should finish");
3459 }
3460
3461 let selected = runtime
3462 .execute_query("SELECT n FROM counters WHERE id = 1")
3463 .unwrap();
3464 assert_eq!(
3465 selected.result.records[0].get("n"),
3466 Some(&crate::storage::schema::Value::Integer(
3467 (THREADS * ITERS) as i64
3468 ))
3469 );
3470 }
3471
3472 #[test]
3473 fn watch_stream_delivers_key_events_in_lsn_order() {
3474 let r = rt();
3475 let ops = super::KvAtomicOps::new(&r);
3476 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3477
3478 ops.set(
3479 CollectionModel::Kv,
3480 "kv_default",
3481 "seq",
3482 crate::storage::schema::Value::Integer(1),
3483 None,
3484 false,
3485 )
3486 .unwrap();
3487 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3488 .unwrap();
3489 ops.delete(CollectionModel::Kv, "kv_default", "seq")
3490 .unwrap();
3491 ops.set(
3492 CollectionModel::Kv,
3493 "kv_default",
3494 "seq",
3495 crate::storage::schema::Value::Integer(9),
3496 None,
3497 false,
3498 )
3499 .unwrap();
3500
3501 let mut events = Vec::new();
3502 while let Some(event) = stream.poll_next() {
3503 events.push(event);
3504 if events.len() == 4 {
3505 break;
3506 }
3507 }
3508
3509 assert_eq!(events.len(), 4);
3510 assert_eq!(
3511 events[0].op,
3512 crate::replication::cdc::ChangeOperation::Insert
3513 );
3514 assert_eq!(
3515 events[1].op,
3516 crate::replication::cdc::ChangeOperation::Update
3517 );
3518 assert_eq!(
3519 events[2].op,
3520 crate::replication::cdc::ChangeOperation::Delete
3521 );
3522 assert_eq!(
3523 events[3].op,
3524 crate::replication::cdc::ChangeOperation::Insert
3525 );
3526 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3527 }
3528
3529 #[test]
3530 fn watch_prefix_stream_delivers_matching_events_only() {
3531 let r = rt();
3532 let ops = super::KvAtomicOps::new(&r);
3533 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3534
3535 ops.set(
3536 CollectionModel::Kv,
3537 "kv_default",
3538 "acct:1",
3539 crate::storage::schema::Value::Integer(1),
3540 None,
3541 false,
3542 )
3543 .unwrap();
3544 ops.set(
3545 CollectionModel::Kv,
3546 "kv_default",
3547 "session:1",
3548 crate::storage::schema::Value::Integer(2),
3549 None,
3550 false,
3551 )
3552 .unwrap();
3553 ops.set(
3554 CollectionModel::Kv,
3555 "kv_default",
3556 "acct:2",
3557 crate::storage::schema::Value::Integer(3),
3558 None,
3559 false,
3560 )
3561 .unwrap();
3562
3563 let first = stream.poll_next().expect("first prefix event");
3564 let second = stream.poll_next().expect("second prefix event");
3565 assert_eq!(first.key, "acct:1");
3566 assert_eq!(second.key, "acct:2");
3567 assert!(stream.poll_next().is_none());
3568 }
3569
3570 #[test]
3571 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3572 let r = rt();
3573 let ops = super::KvAtomicOps::new(&r);
3574 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3575
3576 let mut last_seen_lsn = 0;
3577 for value in 0..5 {
3578 ops.set(
3579 CollectionModel::Kv,
3580 "kv_default",
3581 "resume",
3582 crate::storage::schema::Value::Integer(value),
3583 None,
3584 false,
3585 )
3586 .unwrap();
3587 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3588 }
3589 drop(stream);
3590
3591 for value in 5..55 {
3592 ops.set(
3593 CollectionModel::Kv,
3594 "kv_default",
3595 "resume",
3596 crate::storage::schema::Value::Integer(value),
3597 None,
3598 false,
3599 )
3600 .unwrap();
3601 }
3602
3603 let expected_lsns: Vec<u64> = r
3604 .kv_watch_events_since("kv_default", "resume", last_seen_lsn, 200)
3605 .into_iter()
3606 .map(|event| event.lsn)
3607 .collect();
3608 assert!(!expected_lsns.is_empty());
3609
3610 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3611 let mut lsns = Vec::new();
3612 while let Some(event) = resumed.poll_next() {
3613 lsns.push(event.lsn);
3614 if lsns.len() == expected_lsns.len() {
3615 break;
3616 }
3617 }
3618
3619 assert_eq!(lsns, expected_lsns);
3620 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3621 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3622 assert!(resumed.poll_next().is_none());
3623 }
3624
3625 #[test]
3626 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3627 let r = rt();
3628 let ops = super::KvAtomicOps::new(&r);
3629 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3630
3631 for value in 0..1_200 {
3632 ops.set(
3633 CollectionModel::Kv,
3634 "kv_default",
3635 "slow",
3636 crate::storage::schema::Value::Integer(value),
3637 None,
3638 false,
3639 )
3640 .unwrap();
3641 }
3642
3643 let event = stream.poll_next().expect("tail event after drops");
3644 assert!(event.lsn > 1);
3645 assert!(event.dropped_event_count > 0);
3646 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3647 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3648 }
3649
3650 #[test]
3651 fn watch_stream_idle_timeout_closes_subscription() {
3652 let r = rt();
3653 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3654 .unwrap();
3655
3656 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3657 assert_eq!(r.stats().kv.watch_streams_active, 1);
3658 std::thread::sleep(std::time::Duration::from_millis(5));
3659
3660 assert!(stream.poll_next().is_none());
3661 assert_eq!(r.stats().kv.watch_streams_active, 0);
3662 }
3663
3664 #[test]
3665 fn watch_stream_does_not_emit_rolled_back_put() {
3666 let r = rt();
3667 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3668
3669 r.execute_query("BEGIN").unwrap();
3670 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3671 r.execute_query("ROLLBACK").unwrap();
3672
3673 assert!(stream.poll_next().is_none());
3674 }
3675}