1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 if was_present {
225 self.delete(model, collection, key)?;
226 }
227
228 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229 metadata.extend(ttl_metadata.fields);
230 }
231 if let Some(tags_metadata) = kv_tags_metadata(tags) {
232 metadata.push(tags_metadata);
233 }
234
235 let output = self
236 .runtime
237 .create_kv(crate::application::entity::CreateKvInput {
238 collection: collection.to_string(),
239 key: key.to_string(),
240 value,
241 metadata,
242 })?;
243 if model == crate::catalog::CollectionModel::Kv {
244 self.runtime
245 .inner
246 .kv_tag_index
247 .replace(collection, key, output.id, tags);
248 }
249
250 if model == crate::catalog::CollectionModel::Kv {
251 self.runtime
252 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253 }
254
255 if model == crate::catalog::CollectionModel::Kv {
256 self.runtime.inner.kv_stats.incr_puts();
257 }
258 Ok((!was_present, output.id))
259 }
260
261 pub fn get(
263 &self,
264 model: crate::catalog::CollectionModel,
265 collection: &str,
266 key: &str,
267 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268 let result = self.get_entry(model, collection, key)?;
269 if model == crate::catalog::CollectionModel::Kv {
270 self.runtime.inner.kv_stats.incr_gets();
271 }
272 Ok(result.map(|(v, _)| v))
273 }
274
275 pub fn delete(
277 &self,
278 model: crate::catalog::CollectionModel,
279 collection: &str,
280 key: &str,
281 ) -> RedDBResult<bool> {
282 self.ensure_declared_model(model, collection)?;
283 let found = self.get_entry(model, collection, key)?;
284 if let Some((value, id)) = found {
285 let store = self.runtime.inner.db.store();
286 let deleted = store
287 .delete(collection, id)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 if deleted {
290 store.context_index().remove_entity(id);
291 if model == crate::catalog::CollectionModel::Kv {
292 self.runtime.inner.kv_tag_index.remove(collection, key);
293 self.runtime.record_kv_watch_event(
294 crate::replication::cdc::ChangeOperation::Delete,
295 collection,
296 key,
297 id.raw(),
298 Some(crate::presentation::entity_json::storage_value_to_json(
299 &value,
300 )),
301 None,
302 );
303 self.runtime.inner.kv_stats.incr_deletes();
304 }
305 }
306 Ok(deleted)
307 } else {
308 Ok(false)
309 }
310 }
311
312 pub fn incr(
317 &self,
318 model: crate::catalog::CollectionModel,
319 collection: &str,
320 key: &str,
321 by: i64,
322 ttl_ms: Option<u64>,
323 ) -> RedDBResult<i64> {
324 if model == crate::catalog::CollectionModel::Vault {
325 return Err(RedDBError::InvalidOperation(
326 "VAULT INCR is not supported for sealed secrets".to_string(),
327 ));
328 }
329 self.ensure_kv_collection(collection)?;
330
331 let existing = self.runtime.get_kv(collection, key)?;
332 let current: i64 = match existing.as_ref() {
333 None => 0,
334 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
335 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
336 Some((other, _)) => {
337 return Err(RedDBError::Internal(format!(
338 "INCR on non-integer value: {:?}",
339 other
340 )));
341 }
342 };
343
344 let next = current
345 .checked_add(by)
346 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
347
348 if existing.is_some() {
350 self.runtime.delete_kv(collection, key)?;
351 }
352
353 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
354 .map(|m| m.fields.into_iter().collect())
355 .unwrap_or_default();
356
357 let output = self
358 .runtime
359 .create_kv(crate::application::entity::CreateKvInput {
360 collection: collection.to_string(),
361 key: key.to_string(),
362 value: crate::storage::schema::Value::Integer(next),
363 metadata: meta_vec,
364 })?;
365 self.runtime
366 .inner
367 .kv_tag_index
368 .replace(collection, key, output.id, &[]);
369
370 self.runtime.record_kv_watch_event(
371 if existing.is_some() {
372 crate::replication::cdc::ChangeOperation::Update
373 } else {
374 crate::replication::cdc::ChangeOperation::Insert
375 },
376 collection,
377 key,
378 output.id.raw(),
379 existing
380 .as_ref()
381 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
382 Some(crate::presentation::entity_json::storage_value_to_json(
383 &crate::storage::schema::Value::Integer(next),
384 )),
385 );
386
387 self.runtime.inner.kv_stats.incr_incrs();
388 Ok(next)
389 }
390
391 pub fn cas(
399 &self,
400 model: crate::catalog::CollectionModel,
401 collection: &str,
402 key: &str,
403 expected: Option<&crate::storage::schema::Value>,
404 new_value: crate::storage::schema::Value,
405 ttl_ms: Option<u64>,
406 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
407 if model == crate::catalog::CollectionModel::Vault {
408 return Err(RedDBError::InvalidOperation(
409 "VAULT CAS is not supported for sealed secrets".to_string(),
410 ));
411 }
412 self.ensure_kv_collection(collection)?;
413
414 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
415
416 let matches = match (¤t, expected) {
417 (None, None) => true,
418 (Some(cur), Some(exp)) => cur == exp,
419 _ => false,
420 };
421
422 if !matches {
423 self.runtime.inner.kv_stats.incr_cas_conflict();
424 return Ok((false, current));
425 }
426
427 if current.is_some() {
429 self.runtime.delete_kv(collection, key)?;
430 }
431
432 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
433 .map(|m| m.fields.into_iter().collect())
434 .unwrap_or_default();
435
436 let output = self
437 .runtime
438 .create_kv(crate::application::entity::CreateKvInput {
439 collection: collection.to_string(),
440 key: key.to_string(),
441 value: new_value.clone(),
442 metadata: meta_vec,
443 })?;
444 self.runtime
445 .inner
446 .kv_tag_index
447 .replace(collection, key, output.id, &[]);
448
449 self.runtime.record_kv_watch_event(
450 if current.is_some() {
451 crate::replication::cdc::ChangeOperation::Update
452 } else {
453 crate::replication::cdc::ChangeOperation::Insert
454 },
455 collection,
456 key,
457 output.id.raw(),
458 current
459 .as_ref()
460 .map(crate::presentation::entity_json::storage_value_to_json),
461 Some(crate::presentation::entity_json::storage_value_to_json(
462 &new_value,
463 )),
464 );
465
466 self.runtime.inner.kv_stats.incr_cas_success();
467 Ok((true, current))
468 }
469
470 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
471 self.runtime
472 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
473 self.runtime.check_kv_invalidate_policy(collection)?;
474 self.ensure_kv_collection(collection)?;
475 let entries = self
476 .runtime
477 .inner
478 .kv_tag_index
479 .entries_for_tags(collection, tags);
480 if entries.is_empty() {
481 return Ok(0);
482 }
483
484 let store = self.runtime.inner.db.store();
485 let mut removed = 0usize;
486 for (key, id) in entries {
487 let before = store
488 .get(collection, id)
489 .and_then(|entity| kv_value_from_entity(&entity));
490 let deleted = store
491 .delete(collection, id)
492 .map_err(|err| RedDBError::Internal(err.to_string()))?;
493 if deleted {
494 store.context_index().remove_entity(id);
495 self.runtime.inner.kv_tag_index.remove(collection, &key);
496 self.runtime.record_kv_watch_event(
497 crate::replication::cdc::ChangeOperation::Delete,
498 collection,
499 &key,
500 id.raw(),
501 before
502 .as_ref()
503 .map(crate::presentation::entity_json::storage_value_to_json),
504 None,
505 );
506 removed += 1;
507 }
508 }
509 if removed > 0 {
510 self.runtime.inner.kv_stats.incr_deletes();
511 }
512 Ok(removed)
513 }
514
515 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
516 self.runtime
517 .inner
518 .kv_tag_index
519 .tags_for_key(collection, key)
520 }
521
522 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
524 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
525 }
526
527 fn ensure_keyed_collection(
528 &self,
529 model: crate::catalog::CollectionModel,
530 collection: &str,
531 ) -> RedDBResult<()> {
532 let store = self.runtime.inner.db.store();
533 if store.get_collection(collection).is_some() {
534 return self.ensure_declared_model(model, collection);
535 }
536 if model != crate::catalog::CollectionModel::Kv {
537 return Err(RedDBError::NotFound(format!(
538 "{} collection '{collection}' does not exist",
539 keyed_model_name(model)
540 )));
541 }
542 let auto_create = self
544 .runtime
545 .config_bool("red.config.kv.default_collection", true);
546 if !auto_create {
547 return Err(RedDBError::NotFound(format!(
548 "kv collection '{collection}' does not exist and auto-create is disabled \
549 (red.config.kv.default_collection = false)"
550 )));
551 }
552 store
553 .create_collection(collection)
554 .map_err(|err| RedDBError::Internal(err.to_string()))?;
555 self.runtime
556 .inner
557 .db
558 .save_collection_contract(kv_collection_contract(collection))
559 .map_err(|err| RedDBError::Internal(err.to_string()))?;
560 Ok(())
561 }
562
563 fn get_entry(
564 &self,
565 model: crate::catalog::CollectionModel,
566 collection: &str,
567 key: &str,
568 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
569 self.ensure_declared_model(model, collection)?;
570 let store = self.runtime.inner.db.store();
571 let Some(manager) = store.get_collection(collection) else {
572 return Ok(None);
573 };
574 let entities = manager.query_all(|_| true);
575 for entity in entities {
576 if let crate::storage::EntityData::Row(ref row) = entity.data {
577 if let Some(ref named) = row.named {
578 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
579 if &**k == key {
580 let value = named
581 .get("value")
582 .cloned()
583 .unwrap_or(crate::storage::schema::Value::Null);
584 return Ok(Some((value, entity.id)));
585 }
586 }
587 }
588 }
589 }
590 Ok(None)
591 }
592
593 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
594 self.vault_versions(collection, key)
595 .map(super::keyed_spine::latest_version)
596 }
597
598 fn get_vault_entry_version(
599 &self,
600 collection: &str,
601 key: &str,
602 version: i64,
603 ) -> RedDBResult<Option<VaultEntry>> {
604 Ok(self
605 .vault_versions(collection, key)?
606 .into_iter()
607 .find(|entry| entry.version == version))
608 }
609
610 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
611 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
612 let store = self.runtime.inner.db.store();
613 let Some(manager) = store.get_collection(collection) else {
614 return Ok(Vec::new());
615 };
616 let entities = manager.query_all(|_| true);
617 let mut versions = Vec::new();
618 for entity in entities {
619 let crate::storage::EntityData::Row(ref row) = entity.data else {
620 continue;
621 };
622 let Some(version) =
623 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
624 else {
625 continue;
626 };
627 if version.key != key {
628 continue;
629 }
630 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
631 versions.push(VaultEntry::from_keyed_row(
632 version,
633 metadata,
634 entity.created_at,
635 entity.updated_at,
636 entity.sequence_id,
637 ));
638 }
639 Ok(versions)
640 }
641
642 fn latest_vault_entries(
643 &self,
644 collection: &str,
645 prefix: Option<&str>,
646 ) -> RedDBResult<Vec<VaultEntry>> {
647 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
648 let store = self.runtime.inner.db.store();
649 let Some(manager) = store.get_collection(collection) else {
650 return Ok(Vec::new());
651 };
652 let mut versions = Vec::new();
653 for entity in manager.query_all(|_| true) {
654 let crate::storage::EntityData::Row(ref row) = entity.data else {
655 continue;
656 };
657 let Some(version) =
658 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
659 else {
660 continue;
661 };
662 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
663 let entry = VaultEntry::from_keyed_row(
664 version,
665 metadata,
666 entity.created_at,
667 entity.updated_at,
668 entity.sequence_id,
669 );
670 versions.push(entry);
671 }
672 Ok(super::keyed_spine::latest_versions(versions, prefix))
673 }
674
675 fn append_vault_version(
676 &self,
677 collection: &str,
678 key: &str,
679 value: crate::storage::schema::Value,
680 op: &str,
681 tombstone: bool,
682 tags: &[String],
683 ) -> RedDBResult<VaultEntry> {
684 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
685 let version = self
686 .get_vault_entry(collection, key)?
687 .map(|entry| entry.version)
688 .unwrap_or(0)
689 + 1;
690 let stored_value = if tombstone {
691 crate::storage::schema::Value::Null
692 } else {
693 self.runtime.seal_vault_value(collection, value)?
694 };
695 let now = current_unix_ms() as i64;
696 let fields = vec![
697 (
698 "key".to_string(),
699 crate::storage::schema::Value::text(key.to_string()),
700 ),
701 ("value".to_string(), stored_value),
702 (
703 "version".to_string(),
704 crate::storage::schema::Value::Integer(version),
705 ),
706 (
707 "tombstone".to_string(),
708 crate::storage::schema::Value::Boolean(tombstone),
709 ),
710 (
711 "op".to_string(),
712 crate::storage::schema::Value::text(op.to_string()),
713 ),
714 (
715 "created_at_ms".to_string(),
716 crate::storage::schema::Value::Integer(now),
717 ),
718 ];
719 let mut row = crate::storage::RowData::new(Vec::new());
720 row.named = Some(fields.into_iter().collect());
721 let entity = crate::storage::UnifiedEntity::new(
722 crate::storage::EntityId::new(0),
723 crate::storage::EntityKind::TableRow {
724 table: std::sync::Arc::from(collection),
725 row_id: 0,
726 },
727 crate::storage::EntityData::Row(row),
728 );
729 let id = self
730 .runtime
731 .inner
732 .db
733 .store()
734 .insert(collection, entity)
735 .map_err(|err| RedDBError::Internal(err.to_string()))?;
736 if !tags.is_empty() {
737 self.runtime
738 .inner
739 .db
740 .store()
741 .set_metadata(
742 collection,
743 id,
744 Metadata::with_fields(vault_tags_metadata(tags)),
745 )
746 .map_err(|err| RedDBError::Internal(err.to_string()))?;
747 self.runtime
748 .inner
749 .kv_tag_index
750 .replace(collection, key, id, tags);
751 }
752 self.get_vault_entry_version(collection, key, version)?
753 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
754 }
755
756 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
757 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
758 let versions = self.vault_versions(collection, key)?;
759 let store = self.runtime.inner.db.store();
760 let mut purged = 0usize;
761 for entry in versions {
762 if store
763 .delete(collection, entry.id)
764 .map_err(|err| RedDBError::Internal(err.to_string()))?
765 {
766 store.context_index().remove_entity(entry.id);
767 purged += 1;
768 }
769 }
770 Ok(purged)
771 }
772
773 fn ensure_declared_model(
774 &self,
775 model: crate::catalog::CollectionModel,
776 collection: &str,
777 ) -> RedDBResult<()> {
778 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
779 return Ok(());
780 };
781 if contract.declared_model == model
782 || contract.declared_model == crate::catalog::CollectionModel::Mixed
783 {
784 return Ok(());
785 }
786 Err(RedDBError::InvalidOperation(format!(
787 "collection '{}' is declared as '{}' and does not allow '{}' operations",
788 collection,
789 keyed_model_name(contract.declared_model),
790 keyed_model_name(model)
791 )))
792 }
793}
794
795impl RedDBRuntime {
796 pub(crate) fn seal_vault_value(
797 &self,
798 collection: &str,
799 value: crate::storage::schema::Value,
800 ) -> RedDBResult<crate::storage::schema::Value> {
801 let key = self.vault_encryption_key(collection)?;
802 let plaintext = value.to_bytes();
803 let nonce_bytes = crate::auth::store::random_bytes(12);
804 let mut nonce = [0u8; 12];
805 nonce.copy_from_slice(&nonce_bytes[..12]);
806 let aad = format!("reddb.vault.{collection}");
807 let ciphertext =
808 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
809 let mut payload = Vec::with_capacity(12 + ciphertext.len());
810 payload.extend_from_slice(&nonce);
811 payload.extend_from_slice(&ciphertext);
812 Ok(crate::storage::schema::Value::Secret(payload))
813 }
814
815 fn vault_key_available(&self, collection: &str) -> bool {
816 self.vault_encryption_key(collection).is_ok()
817 }
818
819 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
820 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
821 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
822 })?;
823 if !auth_store.is_vault_backed() {
824 return Err(RedDBError::Query(
825 "vault sealed_unavailable: key provider is sealed".to_string(),
826 ));
827 }
828
829 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
830 return decode_vault_key(&hex_key);
831 }
832 auth_store.vault_secret_key().ok_or_else(|| {
833 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
834 })
835 }
836
837 fn unseal_vault_value(
838 &self,
839 collection: &str,
840 sealed: &crate::storage::schema::Value,
841 ) -> RedDBResult<crate::storage::schema::Value> {
842 let crate::storage::schema::Value::Secret(payload) = sealed else {
843 return Err(RedDBError::Query(
844 "vault unseal failed: stored value is not sealed".to_string(),
845 ));
846 };
847 if payload.len() < 12 {
848 return Err(RedDBError::Query(
849 "vault unseal failed: sealed payload is truncated".to_string(),
850 ));
851 }
852 let key = self.vault_encryption_key(collection)?;
853 let mut nonce = [0u8; 12];
854 nonce.copy_from_slice(&payload[..12]);
855 let aad = format!("reddb.vault.{collection}");
856 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
857 &key,
858 &nonce,
859 aad.as_bytes(),
860 &payload[12..],
861 )
862 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
863 let (value, consumed) =
864 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
865 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
866 })?;
867 if consumed != plaintext.len() {
868 return Err(RedDBError::Query(
869 "vault unseal failed: trailing plaintext bytes".to_string(),
870 ));
871 }
872 Ok(value)
873 }
874
875 fn vault_target_resource(collection: &str, key: &str) -> String {
876 if collection == "red.vault" {
877 return format!("red.vault/{}", key.to_ascii_lowercase());
878 }
879 format!("{collection}.{key}")
880 }
881
882 fn current_vault_actor() -> String {
883 current_auth_identity()
884 .map(|(principal, _)| principal)
885 .unwrap_or_else(|| "anonymous".to_string())
886 }
887
888 fn vault_request_id() -> String {
889 let conn_id = current_connection_id();
890 if conn_id == 0 {
891 "embedded".to_string()
892 } else {
893 format!("conn-{conn_id}")
894 }
895 }
896
897 fn check_vault_capability(
898 &self,
899 action: &str,
900 collection: &str,
901 key: &str,
902 ) -> Result<(), String> {
903 let Some(auth_store) = self.inner.auth_store.read().clone() else {
904 return Ok(());
905 };
906 if !auth_store.iam_authorization_enabled() {
907 return Ok(());
908 }
909 let Some((principal, role)) = current_auth_identity() else {
910 return Err(
911 "IAM authorization is enabled; vault capability check requires an authenticated principal"
912 .to_string(),
913 );
914 };
915 let tenant = current_tenant();
916 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
917 let mut resource = crate::auth::policies::ResourceRef::new(
918 "vault",
919 Self::vault_target_resource(collection, key),
920 );
921 if let Some(ref tenant) = tenant {
922 resource = resource.with_tenant(tenant.clone());
923 }
924 let ctx = crate::auth::policies::EvalContext {
925 principal_tenant: tenant.clone(),
926 current_tenant: tenant,
927 peer_ip: None,
928 mfa_present: false,
929 now_ms: crate::utils::now_unix_millis() as u128,
930 principal_is_admin_role: role == crate::auth::Role::Admin,
931 };
932 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
933 Ok(())
934 } else {
935 Err(format!(
936 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
937 principal,
938 action,
939 Self::vault_target_resource(collection, key)
940 ))
941 }
942 }
943
944 fn check_system_vault_capability(
945 &self,
946 action: &str,
947 collection: &str,
948 key: &str,
949 ) -> Result<(), String> {
950 if collection != "red.vault" {
951 return Ok(());
952 }
953 self.check_vault_capability(action, collection, key)
954 }
955
956 fn audit_vault_unseal(
957 &self,
958 collection: &str,
959 key: &str,
960 outcome: crate::runtime::audit_log::Outcome,
961 reason: &str,
962 entry: Option<&VaultEntry>,
963 ) {
964 let actor = Self::current_vault_actor();
965 let request_id = Self::vault_request_id();
966 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
967 .principal(actor.clone())
968 .source(crate::runtime::audit_log::AuditAuthSource::Password)
969 .resource(format!(
970 "vault:{}",
971 Self::vault_target_resource(collection, key)
972 ))
973 .outcome(outcome)
974 .correlation_id(request_id.clone())
975 .fields([
976 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
977 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
978 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
979 crate::runtime::audit_log::AuditFieldEscaper::field(
980 "target",
981 Self::vault_target_resource(collection, key),
982 ),
983 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
984 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
985 crate::runtime::audit_log::AuditFieldEscaper::field(
986 "connection_id",
987 current_connection_id(),
988 ),
989 ]);
990 if let Some(tenant) = current_tenant() {
991 builder = builder.tenant(tenant);
992 }
993 if let Some(entry) = entry {
994 builder = builder.fields([
995 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
996 crate::runtime::audit_log::AuditFieldEscaper::field(
997 "sequence_id",
998 entry.sequence_id,
999 ),
1000 ]);
1001 }
1002 self.audit_log().record_event(builder.build());
1003 }
1004
1005 fn audit_vault_lifecycle(
1006 &self,
1007 operation: &str,
1008 collection: &str,
1009 key: &str,
1010 outcome: crate::runtime::audit_log::Outcome,
1011 reason: &str,
1012 entry: Option<&VaultEntry>,
1013 ) {
1014 let actor = Self::current_vault_actor();
1015 let request_id = Self::vault_request_id();
1016 let mut builder =
1017 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1018 .principal(actor.clone())
1019 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1020 .resource(format!(
1021 "vault:{}",
1022 Self::vault_target_resource(collection, key)
1023 ))
1024 .outcome(outcome)
1025 .correlation_id(request_id.clone())
1026 .fields([
1027 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1028 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1029 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1030 crate::runtime::audit_log::AuditFieldEscaper::field(
1031 "target",
1032 Self::vault_target_resource(collection, key),
1033 ),
1034 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1035 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1036 crate::runtime::audit_log::AuditFieldEscaper::field(
1037 "connection_id",
1038 current_connection_id(),
1039 ),
1040 ]);
1041 if let Some(tenant) = current_tenant() {
1042 builder = builder.tenant(tenant);
1043 }
1044 if let Some(entry) = entry {
1045 builder = builder.fields([
1046 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1047 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1048 crate::runtime::audit_log::AuditFieldEscaper::field(
1049 "sequence_id",
1050 entry.sequence_id,
1051 ),
1052 ]);
1053 }
1054 self.audit_log().record_event(builder.build());
1055 }
1056
1057 pub(crate) fn resolve_vault_secret_value(
1058 &self,
1059 collection: &str,
1060 key: &str,
1061 ) -> RedDBResult<Value> {
1062 let ops = KvAtomicOps::new(self);
1063 let entry = ops.get_vault_entry(collection, key)?;
1064 if let Err(reason) = self.check_vault_capability("vault:unseal", collection, key) {
1065 self.audit_vault_unseal(
1066 collection,
1067 key,
1068 crate::runtime::audit_log::Outcome::Denied,
1069 &reason,
1070 entry.as_ref(),
1071 );
1072 return Err(RedDBError::Query(reason));
1073 }
1074 let Some(entry) = entry else {
1075 let reason = "not_found";
1076 self.audit_vault_unseal(
1077 collection,
1078 key,
1079 crate::runtime::audit_log::Outcome::Denied,
1080 reason,
1081 None,
1082 );
1083 return Err(RedDBError::NotFound(format!(
1084 "vault secret '{}.{}' not found",
1085 collection, key
1086 )));
1087 };
1088 if entry.tombstone {
1089 let reason = "deleted";
1090 self.audit_vault_unseal(
1091 collection,
1092 key,
1093 crate::runtime::audit_log::Outcome::Denied,
1094 reason,
1095 Some(&entry),
1096 );
1097 return Err(RedDBError::NotFound(format!(
1098 "vault secret '{}.{}' is deleted",
1099 collection, key
1100 )));
1101 }
1102 match self.unseal_vault_value(collection, &entry.value) {
1103 Ok(value) => {
1104 self.audit_vault_unseal(
1105 collection,
1106 key,
1107 crate::runtime::audit_log::Outcome::Success,
1108 "ok",
1109 Some(&entry),
1110 );
1111 Ok(value)
1112 }
1113 Err(err) => {
1114 let reason = err.to_string();
1115 self.audit_vault_unseal(
1116 collection,
1117 key,
1118 crate::runtime::audit_log::Outcome::Error,
1119 &reason,
1120 Some(&entry),
1121 );
1122 Err(err)
1123 }
1124 }
1125 }
1126
1127 pub fn execute_kv_command(
1129 &self,
1130 raw_query: &str,
1131 cmd: &crate::storage::query::ast::KvCommand,
1132 ) -> RedDBResult<RuntimeQueryResult> {
1133 use crate::storage::query::ast::KvCommand;
1134
1135 let ops = KvAtomicOps::new(self);
1136
1137 match cmd {
1138 KvCommand::Put {
1139 model,
1140 collection,
1141 key,
1142 value,
1143 ttl_ms,
1144 tags,
1145 if_not_exists,
1146 } => {
1147 if *model == crate::catalog::CollectionModel::Vault {
1148 self.check_system_vault_capability("vault:write", collection, key)
1149 .map_err(RedDBError::Query)?;
1150 }
1151 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1152 let (created, id) = ops.set_with_tags_for_model(
1153 *model,
1154 collection,
1155 key,
1156 value.clone(),
1157 *ttl_ms,
1158 tags,
1159 *if_not_exists,
1160 )?;
1161
1162 let mut result = UnifiedResult::with_columns(vec![
1163 "ok".into(),
1164 "collection".into(),
1165 "key".into(),
1166 "id".into(),
1167 "created".into(),
1168 "tags".into(),
1169 ]);
1170 let mut record = UnifiedRecord::new();
1171 record.set("ok", Value::Boolean(true));
1172 record.set("collection", Value::text(collection.clone()));
1173 record.set("key", Value::text(key.clone()));
1174 record.set("id", Value::Integer(id.raw() as i64));
1175 record.set("created", Value::Boolean(created));
1176 record.set("tags", kv_tags_value(tags));
1177 result.push(record);
1178
1179 Ok(RuntimeQueryResult {
1180 query: raw_query.to_string(),
1181 mode: crate::storage::query::modes::QueryMode::Sql,
1182 statement: if *model == crate::catalog::CollectionModel::Vault {
1183 "vault_put"
1184 } else {
1185 "kv_put"
1186 },
1187 engine: if *model == crate::catalog::CollectionModel::Vault {
1188 "vault"
1189 } else {
1190 "kv"
1191 },
1192 result,
1193 affected_rows: 1,
1194 statement_type: if created { "insert" } else { "update" },
1195 })
1196 }
1197 KvCommand::InvalidateTags { collection, tags } => {
1198 let invalidated = ops.invalidate_tags(collection, tags)?;
1199
1200 let mut result = UnifiedResult::with_columns(vec![
1201 "ok".into(),
1202 "collection".into(),
1203 "invalidated".into(),
1204 "tags".into(),
1205 ]);
1206 let mut record = UnifiedRecord::new();
1207 record.set("ok", Value::Boolean(true));
1208 record.set("collection", Value::text(collection.clone()));
1209 record.set("invalidated", Value::Integer(invalidated as i64));
1210 record.set("tags", kv_tags_value(tags));
1211 result.push(record);
1212
1213 Ok(RuntimeQueryResult {
1214 query: raw_query.to_string(),
1215 mode: crate::storage::query::modes::QueryMode::Sql,
1216 statement: "kv_invalidate_tags",
1217 engine: "kv",
1218 result,
1219 affected_rows: invalidated as u64,
1220 statement_type: "delete",
1221 })
1222 }
1223
1224 KvCommand::Rotate {
1225 collection,
1226 key,
1227 value,
1228 tags,
1229 } => {
1230 self.check_system_vault_capability("vault:write", collection, key)
1231 .map_err(RedDBError::Query)?;
1232 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1233 let entry = ops.append_vault_version(
1234 collection,
1235 key,
1236 value.clone(),
1237 "rotate",
1238 false,
1239 tags,
1240 )?;
1241 self.record_kv_watch_event(
1242 crate::replication::cdc::ChangeOperation::Update,
1243 collection,
1244 key,
1245 entry.id.raw(),
1246 None,
1247 Some(vault_entry_metadata_json(&entry)),
1248 );
1249 self.audit_vault_lifecycle(
1250 "rotate",
1251 collection,
1252 key,
1253 crate::runtime::audit_log::Outcome::Success,
1254 "ok",
1255 Some(&entry),
1256 );
1257 Ok(vault_write_result(
1258 raw_query,
1259 "vault_rotate",
1260 "update",
1261 collection,
1262 key,
1263 &entry,
1264 1,
1265 ))
1266 }
1267
1268 KvCommand::List {
1269 model,
1270 collection,
1271 prefix,
1272 limit,
1273 offset,
1274 } => {
1275 if *model != crate::catalog::CollectionModel::Vault {
1276 return Err(RedDBError::InvalidOperation(
1277 "LIST is not supported through normal KV command execution".to_string(),
1278 ));
1279 }
1280 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1281 entries.sort_by(|left, right| left.key.cmp(&right.key));
1282 let mut result = UnifiedResult::with_columns(vec![
1283 "collection".into(),
1284 "key".into(),
1285 "version".into(),
1286 "fingerprint".into(),
1287 "tags".into(),
1288 "created_at".into(),
1289 "updated_at".into(),
1290 "status".into(),
1291 "tombstone".into(),
1292 "op".into(),
1293 ]);
1294 for entry in entries
1295 .into_iter()
1296 .filter(|entry| {
1297 self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1298 .is_ok()
1299 })
1300 .skip(*offset)
1301 .take(limit.unwrap_or(usize::MAX))
1302 {
1303 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1304 }
1305 Ok(RuntimeQueryResult {
1306 query: raw_query.to_string(),
1307 mode: crate::storage::query::modes::QueryMode::Sql,
1308 statement: "vault_list",
1309 engine: "vault",
1310 result,
1311 affected_rows: 0,
1312 statement_type: "select",
1313 })
1314 }
1315
1316 KvCommand::History { collection, key } => {
1317 self.check_vault_capability("vault:read_metadata", collection, key)
1318 .map_err(RedDBError::Query)?;
1319 let versions =
1320 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1321 let result = vault_history_result(collection, key, &versions);
1322 Ok(RuntimeQueryResult {
1323 query: raw_query.to_string(),
1324 mode: crate::storage::query::modes::QueryMode::Sql,
1325 statement: "vault_history",
1326 engine: "vault",
1327 result,
1328 affected_rows: 0,
1329 statement_type: "select",
1330 })
1331 }
1332
1333 KvCommand::Purge { collection, key } => {
1334 let entry = ops.get_vault_entry(collection, key)?;
1335 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1336 self.audit_vault_lifecycle(
1337 "purge",
1338 collection,
1339 key,
1340 crate::runtime::audit_log::Outcome::Denied,
1341 &reason,
1342 entry.as_ref(),
1343 );
1344 return Err(RedDBError::Query(reason));
1345 }
1346 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1347 let purged = ops.purge_vault_versions(collection, key)?;
1348 self.audit_vault_lifecycle(
1349 "purge",
1350 collection,
1351 key,
1352 crate::runtime::audit_log::Outcome::Success,
1353 "ok",
1354 entry.as_ref(),
1355 );
1356 let mut result = UnifiedResult::with_columns(vec![
1357 "ok".into(),
1358 "collection".into(),
1359 "key".into(),
1360 "purged".into(),
1361 ]);
1362 let mut record = UnifiedRecord::new();
1363 record.set("ok", Value::Boolean(true));
1364 record.set("collection", Value::text(collection.clone()));
1365 record.set("key", Value::text(key.clone()));
1366 record.set("purged", Value::Integer(purged as i64));
1367 result.push(record);
1368 Ok(RuntimeQueryResult {
1369 query: raw_query.to_string(),
1370 mode: crate::storage::query::modes::QueryMode::Sql,
1371 statement: "vault_purge",
1372 engine: "vault",
1373 result,
1374 affected_rows: purged as u64,
1375 statement_type: "delete",
1376 })
1377 }
1378
1379 KvCommand::Get {
1380 model,
1381 collection,
1382 key,
1383 } => {
1384 if *model == crate::catalog::CollectionModel::Vault {
1385 self.check_vault_capability("vault:read_metadata", collection, key)
1386 .map_err(RedDBError::Query)?;
1387 let entry = ops.get_vault_entry(collection, key)?;
1388 let key_available = self.vault_key_available(collection);
1389 let result =
1390 vault_metadata_result(collection, key, entry.as_ref(), key_available);
1391 return Ok(RuntimeQueryResult {
1392 query: raw_query.to_string(),
1393 mode: crate::storage::query::modes::QueryMode::Sql,
1394 statement: "vault_get",
1395 engine: "vault",
1396 result,
1397 affected_rows: 0,
1398 statement_type: "select",
1399 });
1400 }
1401
1402 let value = ops.get(*model, collection, key)?;
1403 let mut result = UnifiedResult::with_columns(vec![
1404 "collection".into(),
1405 "key".into(),
1406 "value".into(),
1407 "tags".into(),
1408 ]);
1409 let mut record = UnifiedRecord::new();
1410 record.set("collection", Value::text(collection.clone()));
1411 record.set("key", Value::text(key.clone()));
1412 record.set(
1413 "value",
1414 value.unwrap_or(crate::storage::schema::Value::Null),
1415 );
1416 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1417 result.push(record);
1418
1419 Ok(RuntimeQueryResult {
1420 query: raw_query.to_string(),
1421 mode: crate::storage::query::modes::QueryMode::Sql,
1422 statement: "kv_get",
1423 engine: "kv",
1424 result,
1425 affected_rows: 0,
1426 statement_type: "select",
1427 })
1428 }
1429 KvCommand::Watch {
1430 model,
1431 collection,
1432 key,
1433 prefix,
1434 from_lsn,
1435 } => {
1436 let watch_key = if *prefix {
1437 format!("{key}.*")
1438 } else {
1439 key.clone()
1440 };
1441 let endpoint = match from_lsn {
1442 Some(lsn) => format!(
1443 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1444 keyed_model_name(*model)
1445 ),
1446 None => format!(
1447 "/collections/{collection}/{}/{watch_key}/watch",
1448 keyed_model_name(*model)
1449 ),
1450 };
1451 let mut result = UnifiedResult::with_columns(vec![
1452 "collection".into(),
1453 "key".into(),
1454 "prefix".into(),
1455 "from_lsn".into(),
1456 "watch_url".into(),
1457 "streaming".into(),
1458 ]);
1459 let mut record = UnifiedRecord::new();
1460 record.set("collection", Value::text(collection.clone()));
1461 record.set("key", Value::text(watch_key));
1462 record.set("prefix", Value::Boolean(*prefix));
1463 record.set(
1464 "from_lsn",
1465 from_lsn
1466 .map(Value::UnsignedInteger)
1467 .unwrap_or(crate::storage::schema::Value::Null),
1468 );
1469 record.set("watch_url", Value::text(endpoint));
1470 record.set("streaming", Value::Boolean(true));
1471 result.push(record);
1472
1473 Ok(RuntimeQueryResult {
1474 query: raw_query.to_string(),
1475 mode: crate::storage::query::modes::QueryMode::Sql,
1476 statement: "kv_watch",
1477 engine: keyed_model_name(*model),
1478 result,
1479 affected_rows: 0,
1480 statement_type: "stream",
1481 })
1482 }
1483
1484 KvCommand::Unseal {
1485 collection,
1486 key,
1487 version,
1488 } => {
1489 let latest = ops.get_vault_entry(collection, key)?;
1490 let entry = match version {
1491 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1492 None => latest.clone(),
1493 };
1494 let action = match (version, latest.as_ref()) {
1495 (Some(requested), Some(latest)) if *requested == latest.version => {
1496 "vault:unseal"
1497 }
1498 (Some(_), _) => "vault:unseal_history",
1499 _ => "vault:unseal",
1500 };
1501 if let Err(reason) = self.check_vault_capability(action, collection, key) {
1502 self.audit_vault_unseal(
1503 collection,
1504 key,
1505 crate::runtime::audit_log::Outcome::Denied,
1506 &reason,
1507 entry.as_ref(),
1508 );
1509 return Err(RedDBError::Query(reason));
1510 }
1511 let Some(entry) = entry else {
1512 let reason = "not_found";
1513 self.audit_vault_unseal(
1514 collection,
1515 key,
1516 crate::runtime::audit_log::Outcome::Denied,
1517 reason,
1518 None,
1519 );
1520 return Err(RedDBError::NotFound(format!(
1521 "vault secret '{}.{}' not found",
1522 collection, key
1523 )));
1524 };
1525 if entry.tombstone {
1526 let reason = "deleted";
1527 self.audit_vault_unseal(
1528 collection,
1529 key,
1530 crate::runtime::audit_log::Outcome::Denied,
1531 reason,
1532 Some(&entry),
1533 );
1534 return Err(RedDBError::NotFound(format!(
1535 "vault secret '{}.{}' is deleted",
1536 collection, key
1537 )));
1538 }
1539 match self.unseal_vault_value(collection, &entry.value) {
1540 Ok(value) => {
1541 self.audit_vault_unseal(
1542 collection,
1543 key,
1544 crate::runtime::audit_log::Outcome::Success,
1545 "ok",
1546 Some(&entry),
1547 );
1548 let mut result = UnifiedResult::with_columns(vec![
1549 "collection".into(),
1550 "key".into(),
1551 "value".into(),
1552 ]);
1553 let mut record = UnifiedRecord::new();
1554 record.set("collection", Value::text(collection.clone()));
1555 record.set("key", Value::text(key.clone()));
1556 record.set("value", value);
1557 result.push(record);
1558 Ok(RuntimeQueryResult {
1559 query: raw_query.to_string(),
1560 mode: crate::storage::query::modes::QueryMode::Sql,
1561 statement: "vault_unseal",
1562 engine: "vault",
1563 result,
1564 affected_rows: 0,
1565 statement_type: "select",
1566 })
1567 }
1568 Err(err) => {
1569 let reason = err.to_string();
1570 self.audit_vault_unseal(
1571 collection,
1572 key,
1573 crate::runtime::audit_log::Outcome::Error,
1574 &reason,
1575 Some(&entry),
1576 );
1577 Err(err)
1578 }
1579 }
1580 }
1581
1582 KvCommand::Incr {
1583 model,
1584 collection,
1585 key,
1586 by,
1587 ttl_ms,
1588 } => {
1589 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1590 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1591
1592 let mut result = UnifiedResult::with_columns(vec![
1593 "ok".into(),
1594 "collection".into(),
1595 "key".into(),
1596 "value".into(),
1597 ]);
1598 let mut record = UnifiedRecord::new();
1599 record.set("ok", Value::Boolean(true));
1600 record.set("collection", Value::text(collection.clone()));
1601 record.set("key", Value::text(key.clone()));
1602 record.set("value", Value::Integer(new_value));
1603 result.push(record);
1604
1605 Ok(RuntimeQueryResult {
1606 query: raw_query.to_string(),
1607 mode: crate::storage::query::modes::QueryMode::Sql,
1608 statement: "kv_incr",
1609 engine: "kv",
1610 result,
1611 affected_rows: 1,
1612 statement_type: "update",
1613 })
1614 }
1615
1616 KvCommand::Cas {
1617 model,
1618 collection,
1619 key,
1620 expected,
1621 new_value,
1622 ttl_ms,
1623 } => {
1624 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1625 let (ok, current) = ops.cas(
1626 *model,
1627 collection,
1628 key,
1629 expected.as_ref(),
1630 new_value.clone(),
1631 *ttl_ms,
1632 )?;
1633
1634 let mut result = UnifiedResult::with_columns(vec![
1635 "ok".into(),
1636 "collection".into(),
1637 "key".into(),
1638 "current".into(),
1639 ]);
1640 let mut record = UnifiedRecord::new();
1641 record.set("ok", Value::Boolean(ok));
1642 record.set("collection", Value::text(collection.clone()));
1643 record.set("key", Value::text(key.clone()));
1644 record.set(
1645 "current",
1646 current.unwrap_or(crate::storage::schema::Value::Null),
1647 );
1648 result.push(record);
1649
1650 Ok(RuntimeQueryResult {
1651 query: raw_query.to_string(),
1652 mode: crate::storage::query::modes::QueryMode::Sql,
1653 statement: "kv_cas",
1654 engine: "kv",
1655 result,
1656 affected_rows: if ok { 1 } else { 0 },
1657 statement_type: "update",
1658 })
1659 }
1660
1661 KvCommand::Delete {
1662 model,
1663 collection,
1664 key,
1665 } => {
1666 if *model == crate::catalog::CollectionModel::Vault {
1667 self.check_system_vault_capability("vault:write", collection, key)
1668 .map_err(RedDBError::Query)?;
1669 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1670 let entry = ops.append_vault_version(
1671 collection,
1672 key,
1673 Value::Null,
1674 "delete",
1675 true,
1676 &[],
1677 )?;
1678 self.record_kv_watch_event(
1679 crate::replication::cdc::ChangeOperation::Delete,
1680 collection,
1681 key,
1682 entry.id.raw(),
1683 None,
1684 Some(vault_entry_metadata_json(&entry)),
1685 );
1686 self.audit_vault_lifecycle(
1687 "delete",
1688 collection,
1689 key,
1690 crate::runtime::audit_log::Outcome::Success,
1691 "ok",
1692 Some(&entry),
1693 );
1694 return Ok(vault_write_result(
1695 raw_query,
1696 "vault_delete",
1697 "delete",
1698 collection,
1699 key,
1700 &entry,
1701 1,
1702 ));
1703 }
1704 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1705 let deleted = ops.delete(*model, collection, key)?;
1706
1707 let mut result = UnifiedResult::with_columns(vec![
1708 "ok".into(),
1709 "collection".into(),
1710 "key".into(),
1711 "deleted".into(),
1712 ]);
1713 let mut record = UnifiedRecord::new();
1714 record.set("ok", Value::Boolean(true));
1715 record.set("collection", Value::text(collection.clone()));
1716 record.set("key", Value::text(key.clone()));
1717 record.set("deleted", Value::Boolean(deleted));
1718 result.push(record);
1719
1720 Ok(RuntimeQueryResult {
1721 query: raw_query.to_string(),
1722 mode: crate::storage::query::modes::QueryMode::Sql,
1723 statement: "kv_delete",
1724 engine: "kv",
1725 result,
1726 affected_rows: if deleted { 1 } else { 0 },
1727 statement_type: "delete",
1728 })
1729 }
1730 }
1731 }
1732
1733 pub fn vault_watch_events_since(
1734 &self,
1735 collection: &str,
1736 key: &str,
1737 since_lsn: u64,
1738 max_count: usize,
1739 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1740 self.kv_watch_events_since(collection, key, since_lsn, max_count)
1741 .into_iter()
1742 .filter(|event| {
1743 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1744 .is_ok()
1745 })
1746 .map(vault_filter_watch_event)
1747 .collect()
1748 }
1749
1750 pub fn vault_watch_events_since_prefix(
1751 &self,
1752 collection: &str,
1753 prefix: &str,
1754 since_lsn: u64,
1755 max_count: usize,
1756 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1757 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1758 .into_iter()
1759 .filter(|event| {
1760 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1761 .is_ok()
1762 })
1763 .map(vault_filter_watch_event)
1764 .collect()
1765 }
1766
1767 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
1768 let auth_store = match self.inner.auth_store.read().clone() {
1769 Some(store) => store,
1770 None => return Ok(()),
1771 };
1772 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1773 Some(identity) => identity,
1774 None => return Ok(()),
1775 };
1776 if role < crate::auth::Role::Write {
1777 return Err(RedDBError::Query(format!(
1778 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
1779 )));
1780 }
1781 if !auth_store.iam_authorization_enabled() {
1782 return Ok(());
1783 }
1784
1785 let tenant = crate::runtime::impl_core::current_tenant();
1786 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1787 let mut resource =
1788 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
1789 if let Some(tenant) = tenant.clone() {
1790 resource = resource.with_tenant(tenant);
1791 }
1792 let ctx = crate::auth::policies::EvalContext {
1793 principal_tenant: tenant.clone(),
1794 current_tenant: tenant,
1795 peer_ip: None,
1796 mfa_present: false,
1797 now_ms: current_unix_ms(),
1798 principal_is_admin_role: role == crate::auth::Role::Admin,
1799 };
1800 if auth_store.check_policy_authz(&principal, "kv:invalidate", &resource, &ctx) {
1801 Ok(())
1802 } else {
1803 Err(RedDBError::Query(format!(
1804 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
1805 )))
1806 }
1807 }
1808}
1809
1810fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
1811 let ttl_ms = ttl_ms?;
1812 Some(Metadata::with_fields(
1813 [(
1814 "_ttl_ms".to_string(),
1815 if ttl_ms <= i64::MAX as u64 {
1816 MetadataValue::Int(ttl_ms as i64)
1817 } else {
1818 MetadataValue::Timestamp(ttl_ms)
1819 },
1820 )]
1821 .into_iter()
1822 .collect(),
1823 ))
1824}
1825
1826fn vault_write_result(
1827 raw_query: &str,
1828 statement: &'static str,
1829 statement_type: &'static str,
1830 collection: &str,
1831 key: &str,
1832 entry: &VaultEntry,
1833 affected_rows: u64,
1834) -> RuntimeQueryResult {
1835 let mut result = UnifiedResult::with_columns(vec![
1836 "ok".into(),
1837 "collection".into(),
1838 "key".into(),
1839 "version".into(),
1840 "fingerprint".into(),
1841 "tombstone".into(),
1842 "op".into(),
1843 "id".into(),
1844 ]);
1845 let mut record = UnifiedRecord::new();
1846 record.set("ok", Value::Boolean(true));
1847 record.set("collection", Value::text(collection.to_string()));
1848 record.set("key", Value::text(key.to_string()));
1849 record.set("version", Value::Integer(entry.version));
1850 if entry.tombstone {
1851 record.set("fingerprint", Value::Null);
1852 } else {
1853 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1854 }
1855 record.set("tombstone", Value::Boolean(entry.tombstone));
1856 record.set("op", Value::text(entry.op.clone()));
1857 record.set("id", Value::Integer(entry.id.raw() as i64));
1858 result.push(record);
1859 RuntimeQueryResult {
1860 query: raw_query.to_string(),
1861 mode: crate::storage::query::modes::QueryMode::Sql,
1862 statement,
1863 engine: "vault",
1864 result,
1865 affected_rows,
1866 statement_type,
1867 }
1868}
1869
1870fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
1871 let mut result = UnifiedResult::with_columns(vec![
1872 "collection".into(),
1873 "key".into(),
1874 "version".into(),
1875 "fingerprint".into(),
1876 "tags".into(),
1877 "created_at".into(),
1878 "updated_at".into(),
1879 "status".into(),
1880 "tombstone".into(),
1881 "op".into(),
1882 ]);
1883 for entry in versions {
1884 push_vault_metadata_record(&mut result, collection, key, entry);
1885 }
1886 result
1887}
1888
1889fn push_vault_metadata_record(
1890 result: &mut UnifiedResult,
1891 collection: &str,
1892 key: &str,
1893 entry: &VaultEntry,
1894) {
1895 let mut record = UnifiedRecord::new();
1896 record.set("collection", Value::text(collection.to_string()));
1897 record.set("key", Value::text(key.to_string()));
1898 record.set("version", Value::Integer(entry.version));
1899 if entry.tombstone {
1900 record.set("fingerprint", Value::Null);
1901 record.set("status", Value::text("deleted"));
1902 } else {
1903 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1904 record.set("status", Value::text("sealed"));
1905 }
1906 record.set("tags", vault_tags_value(&entry.metadata));
1907 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1908 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1909 record.set("tombstone", Value::Boolean(entry.tombstone));
1910 record.set("op", Value::text(entry.op.clone()));
1911 result.push(record);
1912}
1913
1914fn vault_metadata_result(
1915 collection: &str,
1916 key: &str,
1917 entry: Option<&VaultEntry>,
1918 key_available: bool,
1919) -> UnifiedResult {
1920 let mut result = UnifiedResult::with_columns(vec![
1921 "collection".into(),
1922 "key".into(),
1923 "version".into(),
1924 "fingerprint".into(),
1925 "tags".into(),
1926 "created_at".into(),
1927 "updated_at".into(),
1928 "value".into(),
1929 "status".into(),
1930 "tombstone".into(),
1931 "op".into(),
1932 ]);
1933 let mut record = UnifiedRecord::new();
1934 record.set("collection", Value::text(collection.to_string()));
1935 record.set("key", Value::text(key.to_string()));
1936 match entry {
1937 Some(entry) => {
1938 record.set("version", Value::Integer(entry.version));
1939 if entry.tombstone {
1940 record.set("fingerprint", Value::Null);
1941 } else {
1942 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1943 }
1944 record.set("tags", vault_tags_value(&entry.metadata));
1945 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1946 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1947 record.set("value", Value::text("***"));
1948 record.set(
1949 "status",
1950 Value::text(if entry.tombstone {
1951 "deleted"
1952 } else if key_available {
1953 "sealed"
1954 } else {
1955 "sealed_unavailable"
1956 }),
1957 );
1958 record.set("tombstone", Value::Boolean(entry.tombstone));
1959 record.set("op", Value::text(entry.op.clone()));
1960 }
1961 None => {
1962 record.set("version", Value::Null);
1963 record.set("fingerprint", Value::Null);
1964 record.set("tags", Value::Array(Vec::new()));
1965 record.set("created_at", Value::Null);
1966 record.set("updated_at", Value::Null);
1967 record.set("value", Value::text(""));
1968 record.set("status", Value::text("missing"));
1969 record.set("tombstone", Value::Boolean(false));
1970 record.set("op", Value::Null);
1971 }
1972 }
1973 result.push(record);
1974 result
1975}
1976
1977fn vault_fingerprint(value: &Value) -> String {
1978 match value {
1979 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
1980 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
1981 }
1982}
1983
1984fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
1985 let mut object = crate::json::Map::new();
1986 object.insert(
1987 "key".to_string(),
1988 crate::json::Value::String(entry.key.clone()),
1989 );
1990 object.insert(
1991 "version".to_string(),
1992 crate::json::Value::Number(entry.version as f64),
1993 );
1994 object.insert(
1995 "fingerprint".to_string(),
1996 if entry.tombstone {
1997 crate::json::Value::Null
1998 } else {
1999 crate::json::Value::String(vault_fingerprint(&entry.value))
2000 },
2001 );
2002 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2003 object.insert(
2004 "actor".to_string(),
2005 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2006 );
2007 object.insert(
2008 "sequence_id".to_string(),
2009 crate::json::Value::Number(entry.sequence_id as f64),
2010 );
2011 object.insert(
2012 "tombstone".to_string(),
2013 crate::json::Value::Bool(entry.tombstone),
2014 );
2015 object.insert(
2016 "op".to_string(),
2017 crate::json::Value::String(entry.op.clone()),
2018 );
2019 crate::json::Value::Object(object)
2020}
2021
2022fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2023 match vault_tags_value(metadata) {
2024 Value::Array(values) => crate::json::Value::Array(
2025 values
2026 .into_iter()
2027 .filter_map(|value| match value {
2028 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2029 _ => None,
2030 })
2031 .collect(),
2032 ),
2033 _ => crate::json::Value::Array(Vec::new()),
2034 }
2035}
2036
2037fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2038 [(
2039 "tags".to_string(),
2040 MetadataValue::Array(
2041 tags.iter()
2042 .map(|tag| MetadataValue::String(tag.clone()))
2043 .collect(),
2044 ),
2045 )]
2046 .into_iter()
2047 .collect()
2048}
2049
2050fn vault_filter_watch_event(
2051 mut event: crate::replication::cdc::KvWatchEvent,
2052) -> crate::replication::cdc::KvWatchEvent {
2053 event.before = event.before.and_then(vault_metadata_json_only);
2054 event.after = event.after.and_then(vault_metadata_json_only);
2055 event
2056}
2057
2058fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2059 let object = value.as_object()?;
2060 let mut out = crate::json::Map::new();
2061 for field in [
2062 "key",
2063 "version",
2064 "fingerprint",
2065 "tags",
2066 "actor",
2067 "sequence_id",
2068 "tombstone",
2069 "op",
2070 ] {
2071 if let Some(value) = object.get(field) {
2072 out.insert(field.to_string(), value.clone());
2073 }
2074 }
2075 Some(crate::json::Value::Object(out))
2076}
2077
2078fn vault_tags_value(metadata: &Metadata) -> Value {
2079 match metadata.get("tags") {
2080 Some(MetadataValue::Array(values)) => Value::Array(
2081 values
2082 .iter()
2083 .filter_map(|value| match value {
2084 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2085 _ => None,
2086 })
2087 .collect(),
2088 ),
2089 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2090 Value::Array(vec![Value::text(tag.clone())])
2091 }
2092 _ => Value::Array(Vec::new()),
2093 }
2094}
2095
2096fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2097 let bytes = hex::decode(hex_key)
2098 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2099 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2100 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2101 })?;
2102 Ok(key)
2103}
2104
2105fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2106 if tags.is_empty() {
2107 return None;
2108 }
2109 let values = tags
2110 .iter()
2111 .map(|tag| MetadataValue::String(tag.clone()))
2112 .collect();
2113 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2114}
2115
2116fn kv_tags_value(tags: &[String]) -> Value {
2117 let json = crate::json::Value::Array(
2118 tags.iter()
2119 .map(|tag| crate::json::Value::String(tag.clone()))
2120 .collect(),
2121 );
2122 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2123}
2124
2125fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2126 if let crate::storage::EntityData::Row(ref row) = entity.data {
2127 if let Some(ref named) = row.named {
2128 return named.get("value").cloned();
2129 }
2130 }
2131 None
2132}
2133
2134fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2135 let now = current_unix_ms();
2136 crate::physical::CollectionContract {
2137 name: name.to_string(),
2138 declared_model: crate::catalog::CollectionModel::Kv,
2139 schema_mode: crate::catalog::SchemaMode::Dynamic,
2140 origin: crate::physical::ContractOrigin::Implicit,
2141 version: 1,
2142 created_at_unix_ms: now,
2143 updated_at_unix_ms: now,
2144 default_ttl_ms: None,
2145 context_index_fields: Vec::new(),
2146 declared_columns: Vec::new(),
2147 table_def: None,
2148 timestamps_enabled: false,
2149 context_index_enabled: false,
2150 append_only: false,
2151 subscriptions: Vec::new(),
2152 }
2153}
2154
2155fn current_unix_ms() -> u128 {
2156 std::time::SystemTime::now()
2157 .duration_since(std::time::UNIX_EPOCH)
2158 .unwrap_or_default()
2159 .as_millis()
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164 use crate::api::RedDBOptions;
2165 use crate::catalog::CollectionModel;
2166 use crate::runtime::RedDBRuntime;
2167
2168 fn rt() -> RedDBRuntime {
2169 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2170 }
2171
2172 #[test]
2173 fn incr_missing_key_initialises_at_by() {
2174 let r = rt();
2175 let ops = super::KvAtomicOps::new(&r);
2176 let v = ops
2177 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2178 .unwrap();
2179 assert_eq!(v, 5);
2180 }
2181
2182 #[test]
2183 fn kv_runtime_stats_count_public_ops() {
2184 let r = rt();
2185 let ops = super::KvAtomicOps::new(&r);
2186
2187 ops.set(
2188 CollectionModel::Kv,
2189 "kv_default",
2190 "profile",
2191 crate::storage::schema::Value::text("alice"),
2192 None,
2193 false,
2194 )
2195 .unwrap();
2196 ops.get(CollectionModel::Kv, "kv_default", "profile")
2197 .unwrap();
2198 ops.delete(CollectionModel::Kv, "kv_default", "profile")
2199 .unwrap();
2200 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2201 .unwrap();
2202 ops.cas(
2203 CollectionModel::Kv,
2204 "kv_default",
2205 "profile",
2206 None,
2207 crate::storage::schema::Value::text("created"),
2208 None,
2209 )
2210 .unwrap();
2211 ops.cas(
2212 CollectionModel::Kv,
2213 "kv_default",
2214 "profile",
2215 Some(&crate::storage::schema::Value::text("different")),
2216 crate::storage::schema::Value::text("ignored"),
2217 None,
2218 )
2219 .unwrap();
2220
2221 let stats = r.stats().kv;
2222 assert_eq!(stats.puts, 1);
2223 assert_eq!(stats.gets, 1);
2224 assert_eq!(stats.deletes, 1);
2225 assert_eq!(stats.incrs, 1);
2226 assert_eq!(stats.cas_success, 1);
2227 assert_eq!(stats.cas_conflict, 1);
2228 }
2229
2230 #[test]
2231 fn kv_invalidate_tags_removes_matching_entries_only() {
2232 let r = rt();
2233
2234 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2235 .unwrap();
2236
2237 let miss = r
2238 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2239 .unwrap();
2240 assert_eq!(miss.affected_rows, 0);
2241 assert!(matches!(
2242 r.execute_query("KV GET sessions.blob")
2243 .unwrap()
2244 .result
2245 .records[0]
2246 .get("value"),
2247 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2248 ));
2249
2250 let hit = r
2251 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2252 .unwrap();
2253 assert_eq!(hit.affected_rows, 1);
2254 assert!(matches!(
2255 r.execute_query("KV GET sessions.blob")
2256 .unwrap()
2257 .result
2258 .records[0]
2259 .get("value"),
2260 Some(crate::storage::schema::Value::Null)
2261 ));
2262 }
2263
2264 #[test]
2265 fn kv_runtime_stats_count_watch_streams_and_events() {
2266 let r = rt();
2267 let ops = super::KvAtomicOps::new(&r);
2268 assert_eq!(r.stats().kv.watch_streams_active, 0);
2269
2270 {
2271 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2272 assert_eq!(r.stats().kv.watch_streams_active, 1);
2273
2274 ops.set(
2275 CollectionModel::Kv,
2276 "kv_default",
2277 "watched",
2278 crate::storage::schema::Value::Integer(1),
2279 None,
2280 false,
2281 )
2282 .unwrap();
2283 let event = stream.poll_next().expect("watch event");
2284 assert_eq!(event.key, "watched");
2285 assert_eq!(r.stats().kv.watch_events_emitted, 1);
2286
2287 stream.record_drop_count(3);
2288 assert_eq!(r.stats().kv.watch_drops, 3);
2289 }
2290
2291 assert_eq!(r.stats().kv.watch_streams_active, 0);
2292 }
2293
2294 #[test]
2295 fn incr_existing_integer_accumulates() {
2296 let r = rt();
2297 let ops = super::KvAtomicOps::new(&r);
2298 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2299 .unwrap();
2300 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2301 .unwrap();
2302 let v = ops
2303 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2304 .unwrap();
2305 assert_eq!(v, 3);
2306 }
2307
2308 #[test]
2309 fn decr_via_negative_by() {
2310 let r = rt();
2311 let ops = super::KvAtomicOps::new(&r);
2312 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2313 .unwrap();
2314 let v = ops
2315 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2316 .unwrap();
2317 assert_eq!(v, 7);
2318 }
2319
2320 #[test]
2321 fn incr_on_string_value_returns_error() {
2322 let r = rt();
2323 let ops = super::KvAtomicOps::new(&r);
2324 ops.set(
2325 CollectionModel::Kv,
2326 "kv_default",
2327 "name",
2328 crate::storage::schema::Value::text("alice"),
2329 None,
2330 false,
2331 )
2332 .unwrap();
2333 let err = ops
2334 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2335 .unwrap_err();
2336 assert!(err.to_string().contains("non-integer"));
2337 }
2338
2339 #[test]
2342 fn cas_matching_value_succeeds() {
2343 let r = rt();
2344 let ops = super::KvAtomicOps::new(&r);
2345 ops.set(
2346 CollectionModel::Kv,
2347 "kv_default",
2348 "lock",
2349 crate::storage::schema::Value::text("free"),
2350 None,
2351 false,
2352 )
2353 .unwrap();
2354 let (ok, prev) = ops
2355 .cas(
2356 CollectionModel::Kv,
2357 "kv_default",
2358 "lock",
2359 Some(&crate::storage::schema::Value::text("free")),
2360 crate::storage::schema::Value::text("held"),
2361 None,
2362 )
2363 .unwrap();
2364 assert!(ok);
2365 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2366 assert_eq!(
2368 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2369 Some(crate::storage::schema::Value::text("held"))
2370 );
2371 }
2372
2373 #[test]
2374 fn cas_mismatching_value_fails() {
2375 let r = rt();
2376 let ops = super::KvAtomicOps::new(&r);
2377 ops.set(
2378 CollectionModel::Kv,
2379 "kv_default",
2380 "lock",
2381 crate::storage::schema::Value::text("free"),
2382 None,
2383 false,
2384 )
2385 .unwrap();
2386 let (ok, current) = ops
2387 .cas(
2388 CollectionModel::Kv,
2389 "kv_default",
2390 "lock",
2391 Some(&crate::storage::schema::Value::text("held")),
2392 crate::storage::schema::Value::text("worker-7"),
2393 None,
2394 )
2395 .unwrap();
2396 assert!(!ok);
2397 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2398 assert_eq!(
2400 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2401 Some(crate::storage::schema::Value::text("free"))
2402 );
2403 }
2404
2405 #[test]
2406 fn cas_expect_null_on_missing_key_creates() {
2407 let r = rt();
2408 let ops = super::KvAtomicOps::new(&r);
2409 let (ok, prev) = ops
2410 .cas(
2411 CollectionModel::Kv,
2412 "kv_default",
2413 "new_key",
2414 None,
2415 crate::storage::schema::Value::text("created"),
2416 None,
2417 )
2418 .unwrap();
2419 assert!(ok);
2420 assert_eq!(prev, None);
2421 assert_eq!(
2422 ops.get(CollectionModel::Kv, "kv_default", "new_key")
2423 .unwrap(),
2424 Some(crate::storage::schema::Value::text("created"))
2425 );
2426 }
2427
2428 #[test]
2429 fn cas_expect_null_on_existing_key_fails() {
2430 let r = rt();
2431 let ops = super::KvAtomicOps::new(&r);
2432 ops.set(
2433 CollectionModel::Kv,
2434 "kv_default",
2435 "taken",
2436 crate::storage::schema::Value::text("worker-1"),
2437 None,
2438 false,
2439 )
2440 .unwrap();
2441 let (ok, current) = ops
2442 .cas(
2443 CollectionModel::Kv,
2444 "kv_default",
2445 "taken",
2446 None,
2447 crate::storage::schema::Value::text("worker-2"),
2448 None,
2449 )
2450 .unwrap();
2451 assert!(!ok);
2452 assert_eq!(
2453 current,
2454 Some(crate::storage::schema::Value::text("worker-1"))
2455 );
2456 }
2457
2458 #[test]
2459 fn cas_via_sql_roundtrip() {
2460 let r = rt();
2461 r.execute_query("KV PUT lock = 'free'").unwrap();
2463 let res = r
2465 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2466 .unwrap();
2467 let row = &res.result.records[0];
2468 assert_eq!(
2469 row.get("ok"),
2470 Some(&crate::storage::schema::Value::Boolean(true))
2471 );
2472 let res2 = r
2474 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2475 .unwrap();
2476 let row2 = &res2.result.records[0];
2477 assert_eq!(
2478 row2.get("ok"),
2479 Some(&crate::storage::schema::Value::Boolean(false))
2480 );
2481 }
2482
2483 #[test]
2484 fn cas_expect_null_via_sql() {
2485 let r = rt();
2486 let res = r
2487 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2488 .unwrap();
2489 let row = &res.result.records[0];
2490 assert_eq!(
2491 row.get("ok"),
2492 Some(&crate::storage::schema::Value::Boolean(true))
2493 );
2494 let res2 = r
2496 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2497 .unwrap();
2498 let row2 = &res2.result.records[0];
2499 assert_eq!(
2500 row2.get("ok"),
2501 Some(&crate::storage::schema::Value::Boolean(false))
2502 );
2503 }
2504
2505 #[test]
2506 fn incr_via_sql_roundtrip() {
2507 let r = rt();
2508 let res = r.execute_query("KV INCR hits").unwrap();
2509 let row = &res.result.records[0];
2510 assert_eq!(
2511 row.get("value"),
2512 Some(&crate::storage::schema::Value::Integer(1))
2513 );
2514 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2515 let row2 = &res2.result.records[0];
2516 assert_eq!(
2517 row2.get("value"),
2518 Some(&crate::storage::schema::Value::Integer(5))
2519 );
2520 }
2521
2522 #[test]
2523 fn watch_stream_delivers_key_events_in_lsn_order() {
2524 let r = rt();
2525 let ops = super::KvAtomicOps::new(&r);
2526 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
2527
2528 ops.set(
2529 CollectionModel::Kv,
2530 "kv_default",
2531 "seq",
2532 crate::storage::schema::Value::Integer(1),
2533 None,
2534 false,
2535 )
2536 .unwrap();
2537 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
2538 .unwrap();
2539 ops.delete(CollectionModel::Kv, "kv_default", "seq")
2540 .unwrap();
2541 ops.set(
2542 CollectionModel::Kv,
2543 "kv_default",
2544 "seq",
2545 crate::storage::schema::Value::Integer(9),
2546 None,
2547 false,
2548 )
2549 .unwrap();
2550
2551 let mut events = Vec::new();
2552 while let Some(event) = stream.poll_next() {
2553 events.push(event);
2554 if events.len() == 4 {
2555 break;
2556 }
2557 }
2558
2559 assert_eq!(events.len(), 4);
2560 assert_eq!(
2561 events[0].op,
2562 crate::replication::cdc::ChangeOperation::Insert
2563 );
2564 assert_eq!(
2565 events[1].op,
2566 crate::replication::cdc::ChangeOperation::Update
2567 );
2568 assert_eq!(
2569 events[2].op,
2570 crate::replication::cdc::ChangeOperation::Delete
2571 );
2572 assert_eq!(
2573 events[3].op,
2574 crate::replication::cdc::ChangeOperation::Insert
2575 );
2576 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
2577 }
2578
2579 #[test]
2580 fn watch_prefix_stream_delivers_matching_events_only() {
2581 let r = rt();
2582 let ops = super::KvAtomicOps::new(&r);
2583 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
2584
2585 ops.set(
2586 CollectionModel::Kv,
2587 "kv_default",
2588 "acct:1",
2589 crate::storage::schema::Value::Integer(1),
2590 None,
2591 false,
2592 )
2593 .unwrap();
2594 ops.set(
2595 CollectionModel::Kv,
2596 "kv_default",
2597 "session:1",
2598 crate::storage::schema::Value::Integer(2),
2599 None,
2600 false,
2601 )
2602 .unwrap();
2603 ops.set(
2604 CollectionModel::Kv,
2605 "kv_default",
2606 "acct:2",
2607 crate::storage::schema::Value::Integer(3),
2608 None,
2609 false,
2610 )
2611 .unwrap();
2612
2613 let first = stream.poll_next().expect("first prefix event");
2614 let second = stream.poll_next().expect("second prefix event");
2615 assert_eq!(first.key, "acct:1");
2616 assert_eq!(second.key, "acct:2");
2617 assert!(stream.poll_next().is_none());
2618 }
2619
2620 #[test]
2621 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
2622 let r = rt();
2623 let ops = super::KvAtomicOps::new(&r);
2624 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
2625
2626 let mut last_seen_lsn = 0;
2627 for value in 0..5 {
2628 ops.set(
2629 CollectionModel::Kv,
2630 "kv_default",
2631 "resume",
2632 crate::storage::schema::Value::Integer(value),
2633 None,
2634 false,
2635 )
2636 .unwrap();
2637 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
2638 }
2639 drop(stream);
2640
2641 for value in 5..55 {
2642 ops.set(
2643 CollectionModel::Kv,
2644 "kv_default",
2645 "resume",
2646 crate::storage::schema::Value::Integer(value),
2647 None,
2648 false,
2649 )
2650 .unwrap();
2651 }
2652
2653 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
2654 let mut lsns = Vec::new();
2655 while let Some(event) = resumed.poll_next() {
2656 lsns.push(event.lsn);
2657 if lsns.len() == 50 {
2658 break;
2659 }
2660 }
2661
2662 assert_eq!(lsns.len(), 50);
2663 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
2664 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
2665 assert!(resumed.poll_next().is_none());
2666 }
2667
2668 #[test]
2669 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
2670 let r = rt();
2671 let ops = super::KvAtomicOps::new(&r);
2672 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
2673
2674 for value in 0..10_000 {
2675 ops.set(
2676 CollectionModel::Kv,
2677 "kv_default",
2678 "slow",
2679 crate::storage::schema::Value::Integer(value),
2680 None,
2681 false,
2682 )
2683 .unwrap();
2684 }
2685
2686 let event = stream.poll_next().expect("tail event after drops");
2687 assert!(event.lsn > 1);
2688 assert!(event.dropped_event_count > 0);
2689 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
2690 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
2691 }
2692
2693 #[test]
2694 fn watch_stream_idle_timeout_closes_subscription() {
2695 let r = rt();
2696 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
2697 .unwrap();
2698
2699 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
2700 assert_eq!(r.stats().kv.watch_streams_active, 1);
2701 std::thread::sleep(std::time::Duration::from_millis(5));
2702
2703 assert!(stream.poll_next().is_none());
2704 assert_eq!(r.stats().kv.watch_streams_active, 0);
2705 }
2706
2707 #[test]
2708 fn watch_stream_does_not_emit_rolled_back_put() {
2709 let r = rt();
2710 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
2711
2712 r.execute_query("BEGIN").unwrap();
2713 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
2714 r.execute_query("ROLLBACK").unwrap();
2715
2716 assert!(stream.poll_next().is_none());
2717 }
2718}