1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 if was_present {
225 self.delete(model, collection, key)?;
226 }
227
228 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229 metadata.extend(ttl_metadata.fields);
230 }
231 if let Some(tags_metadata) = kv_tags_metadata(tags) {
232 metadata.push(tags_metadata);
233 }
234
235 let output = self
236 .runtime
237 .create_kv(crate::application::entity::CreateKvInput {
238 collection: collection.to_string(),
239 key: key.to_string(),
240 value,
241 metadata,
242 })?;
243 if model == crate::catalog::CollectionModel::Kv {
244 self.runtime
245 .inner
246 .kv_tag_index
247 .replace(collection, key, output.id, tags);
248 }
249
250 if model == crate::catalog::CollectionModel::Kv {
251 self.runtime
252 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253 }
254
255 if model == crate::catalog::CollectionModel::Kv {
256 self.runtime.inner.kv_stats.incr_puts();
257 }
258 Ok((!was_present, output.id))
259 }
260
261 pub fn get(
263 &self,
264 model: crate::catalog::CollectionModel,
265 collection: &str,
266 key: &str,
267 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268 let result = self.get_entry(model, collection, key)?;
269 if model == crate::catalog::CollectionModel::Kv {
270 self.runtime.inner.kv_stats.incr_gets();
271 }
272 Ok(result.map(|(v, _)| v))
273 }
274
275 pub fn delete(
277 &self,
278 model: crate::catalog::CollectionModel,
279 collection: &str,
280 key: &str,
281 ) -> RedDBResult<bool> {
282 self.ensure_declared_model(model, collection)?;
283 let found = self.get_entry(model, collection, key)?;
284 if let Some((value, id)) = found {
285 let store = self.runtime.inner.db.store();
286 let deleted = store
287 .delete(collection, id)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 if deleted {
290 store.context_index().remove_entity(id);
291 if model == crate::catalog::CollectionModel::Kv {
292 self.runtime.inner.kv_tag_index.remove(collection, key);
293 self.runtime.record_kv_watch_event(
294 crate::replication::cdc::ChangeOperation::Delete,
295 collection,
296 key,
297 id.raw(),
298 Some(crate::presentation::entity_json::storage_value_to_json(
299 &value,
300 )),
301 None,
302 );
303 self.runtime.inner.kv_stats.incr_deletes();
304 }
305 }
306 Ok(deleted)
307 } else {
308 Ok(false)
309 }
310 }
311
312 pub fn incr(
317 &self,
318 model: crate::catalog::CollectionModel,
319 collection: &str,
320 key: &str,
321 by: i64,
322 ttl_ms: Option<u64>,
323 ) -> RedDBResult<i64> {
324 if model == crate::catalog::CollectionModel::Vault {
325 return Err(RedDBError::InvalidOperation(
326 "VAULT INCR is not supported for sealed secrets".to_string(),
327 ));
328 }
329 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330 let _rmw_guard = rmw_lock.lock();
331 self.ensure_kv_collection(collection)?;
332 let existing = self.runtime.get_kv(collection, key)?;
333 let current: i64 = match existing.as_ref() {
334 None => 0,
335 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337 Some((other, _)) => {
338 return Err(RedDBError::Internal(format!(
339 "INCR on non-integer value: {:?}",
340 other
341 )));
342 }
343 };
344
345 let next = current
346 .checked_add(by)
347 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349 if existing.is_some() {
351 self.runtime.delete_kv(collection, key)?;
352 }
353
354 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355 .map(|m| m.fields.into_iter().collect())
356 .unwrap_or_default();
357
358 let output = self
359 .runtime
360 .create_kv(crate::application::entity::CreateKvInput {
361 collection: collection.to_string(),
362 key: key.to_string(),
363 value: crate::storage::schema::Value::Integer(next),
364 metadata: meta_vec,
365 })?;
366 self.runtime
367 .inner
368 .kv_tag_index
369 .replace(collection, key, output.id, &[]);
370
371 self.runtime.record_kv_watch_event(
372 if existing.is_some() {
373 crate::replication::cdc::ChangeOperation::Update
374 } else {
375 crate::replication::cdc::ChangeOperation::Insert
376 },
377 collection,
378 key,
379 output.id.raw(),
380 existing
381 .as_ref()
382 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383 Some(crate::presentation::entity_json::storage_value_to_json(
384 &crate::storage::schema::Value::Integer(next),
385 )),
386 );
387
388 self.runtime.inner.kv_stats.incr_incrs();
389 Ok(next)
390 }
391
392 pub fn cas(
400 &self,
401 model: crate::catalog::CollectionModel,
402 collection: &str,
403 key: &str,
404 expected: Option<&crate::storage::schema::Value>,
405 new_value: crate::storage::schema::Value,
406 ttl_ms: Option<u64>,
407 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408 if model == crate::catalog::CollectionModel::Vault {
409 return Err(RedDBError::InvalidOperation(
410 "VAULT CAS is not supported for sealed secrets".to_string(),
411 ));
412 }
413 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414 let _rmw_guard = rmw_lock.lock();
415 self.ensure_kv_collection(collection)?;
416 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418 let matches = match (¤t, expected) {
419 (None, None) => true,
420 (Some(cur), Some(exp)) => cur == exp,
421 _ => false,
422 };
423
424 if !matches {
425 self.runtime.inner.kv_stats.incr_cas_conflict();
426 return Ok((false, current));
427 }
428
429 if current.is_some() {
431 self.runtime.delete_kv(collection, key)?;
432 }
433
434 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435 .map(|m| m.fields.into_iter().collect())
436 .unwrap_or_default();
437
438 let output = self
439 .runtime
440 .create_kv(crate::application::entity::CreateKvInput {
441 collection: collection.to_string(),
442 key: key.to_string(),
443 value: new_value.clone(),
444 metadata: meta_vec,
445 })?;
446 self.runtime
447 .inner
448 .kv_tag_index
449 .replace(collection, key, output.id, &[]);
450
451 self.runtime.record_kv_watch_event(
452 if current.is_some() {
453 crate::replication::cdc::ChangeOperation::Update
454 } else {
455 crate::replication::cdc::ChangeOperation::Insert
456 },
457 collection,
458 key,
459 output.id.raw(),
460 current
461 .as_ref()
462 .map(crate::presentation::entity_json::storage_value_to_json),
463 Some(crate::presentation::entity_json::storage_value_to_json(
464 &new_value,
465 )),
466 );
467
468 self.runtime.inner.kv_stats.incr_cas_success();
469 Ok((true, current))
470 }
471
472 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473 self.runtime
474 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475 self.runtime.check_kv_invalidate_policy(collection)?;
476 self.ensure_kv_collection(collection)?;
477 let entries = self
478 .runtime
479 .inner
480 .kv_tag_index
481 .entries_for_tags(collection, tags);
482 if entries.is_empty() {
483 return Ok(0);
484 }
485
486 let store = self.runtime.inner.db.store();
487 let mut removed = 0usize;
488 for (key, id) in entries {
489 let before = store
490 .get(collection, id)
491 .and_then(|entity| kv_value_from_entity(&entity));
492 let deleted = store
493 .delete(collection, id)
494 .map_err(|err| RedDBError::Internal(err.to_string()))?;
495 if deleted {
496 store.context_index().remove_entity(id);
497 self.runtime.inner.kv_tag_index.remove(collection, &key);
498 self.runtime.record_kv_watch_event(
499 crate::replication::cdc::ChangeOperation::Delete,
500 collection,
501 &key,
502 id.raw(),
503 before
504 .as_ref()
505 .map(crate::presentation::entity_json::storage_value_to_json),
506 None,
507 );
508 removed += 1;
509 }
510 }
511 if removed > 0 {
512 self.runtime.inner.kv_stats.incr_deletes();
513 }
514 Ok(removed)
515 }
516
517 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518 self.runtime
519 .inner
520 .kv_tag_index
521 .tags_for_key(collection, key)
522 }
523
524 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527 }
528
529 fn ensure_keyed_collection(
530 &self,
531 model: crate::catalog::CollectionModel,
532 collection: &str,
533 ) -> RedDBResult<()> {
534 let store = self.runtime.inner.db.store();
535 if store.get_collection(collection).is_some() {
536 return self.ensure_declared_model(model, collection);
537 }
538 if model != crate::catalog::CollectionModel::Kv {
539 return Err(RedDBError::NotFound(format!(
540 "{} collection '{collection}' does not exist",
541 keyed_model_name(model)
542 )));
543 }
544 let auto_create = self
546 .runtime
547 .config_bool("red.config.kv.default_collection", true);
548 if !auto_create {
549 return Err(RedDBError::NotFound(format!(
550 "kv collection '{collection}' does not exist and auto-create is disabled \
551 (red.config.kv.default_collection = false)"
552 )));
553 }
554 store
555 .create_collection(collection)
556 .map_err(|err| RedDBError::Internal(err.to_string()))?;
557 self.runtime
558 .inner
559 .db
560 .save_collection_contract(kv_collection_contract(collection))
561 .map_err(|err| RedDBError::Internal(err.to_string()))?;
562 Ok(())
563 }
564
565 fn get_entry(
566 &self,
567 model: crate::catalog::CollectionModel,
568 collection: &str,
569 key: &str,
570 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571 let Some(entity) = self.get_entity(model, collection, key)? else {
572 return Ok(None);
573 };
574 Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575 }
576
577 fn get_entity(
578 &self,
579 model: crate::catalog::CollectionModel,
580 collection: &str,
581 key: &str,
582 ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583 self.ensure_declared_model(model, collection)?;
584 let store = self.runtime.inner.db.store();
585 let Some(manager) = store.get_collection(collection) else {
586 return Ok(None);
587 };
588 let entities = manager.query_all(|_| true);
589 for entity in entities {
590 if let crate::storage::EntityData::Row(ref row) = entity.data {
591 if let Some(ref named) = row.named {
592 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593 if &**k == key {
594 return Ok(Some(entity));
595 }
596 }
597 }
598 }
599 }
600 Ok(None)
601 }
602
603 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604 self.vault_versions(collection, key)
605 .map(super::keyed_spine::latest_version)
606 }
607
608 fn get_vault_entry_version(
609 &self,
610 collection: &str,
611 key: &str,
612 version: i64,
613 ) -> RedDBResult<Option<VaultEntry>> {
614 Ok(self
615 .vault_versions(collection, key)?
616 .into_iter()
617 .find(|entry| entry.version == version))
618 }
619
620 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622 let store = self.runtime.inner.db.store();
623 let Some(manager) = store.get_collection(collection) else {
624 return Ok(Vec::new());
625 };
626 let entities = manager.query_all(|_| true);
627 let mut versions = Vec::new();
628 for entity in entities {
629 let crate::storage::EntityData::Row(ref row) = entity.data else {
630 continue;
631 };
632 let Some(version) =
633 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634 else {
635 continue;
636 };
637 if version.key != key {
638 continue;
639 }
640 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641 versions.push(VaultEntry::from_keyed_row(
642 version,
643 metadata,
644 entity.created_at,
645 entity.updated_at,
646 entity.sequence_id,
647 ));
648 }
649 Ok(versions)
650 }
651
652 fn latest_vault_entries(
653 &self,
654 collection: &str,
655 prefix: Option<&str>,
656 ) -> RedDBResult<Vec<VaultEntry>> {
657 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658 let store = self.runtime.inner.db.store();
659 let Some(manager) = store.get_collection(collection) else {
660 return Ok(Vec::new());
661 };
662 let mut versions = Vec::new();
663 for entity in manager.query_all(|_| true) {
664 let crate::storage::EntityData::Row(ref row) = entity.data else {
665 continue;
666 };
667 let Some(version) =
668 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669 else {
670 continue;
671 };
672 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673 let entry = VaultEntry::from_keyed_row(
674 version,
675 metadata,
676 entity.created_at,
677 entity.updated_at,
678 entity.sequence_id,
679 );
680 versions.push(entry);
681 }
682 Ok(super::keyed_spine::latest_versions(versions, prefix))
683 }
684
685 fn append_vault_version(
686 &self,
687 collection: &str,
688 key: &str,
689 value: crate::storage::schema::Value,
690 op: &str,
691 tombstone: bool,
692 tags: &[String],
693 ) -> RedDBResult<VaultEntry> {
694 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695 let version = self
696 .get_vault_entry(collection, key)?
697 .map(|entry| entry.version)
698 .unwrap_or(0)
699 + 1;
700 let stored_value = if tombstone {
701 crate::storage::schema::Value::Null
702 } else {
703 self.runtime.seal_vault_value(collection, value)?
704 };
705 let now = current_unix_ms() as i64;
706 let fields = vec![
707 (
708 "key".to_string(),
709 crate::storage::schema::Value::text(key.to_string()),
710 ),
711 ("value".to_string(), stored_value),
712 (
713 "version".to_string(),
714 crate::storage::schema::Value::Integer(version),
715 ),
716 (
717 "tombstone".to_string(),
718 crate::storage::schema::Value::Boolean(tombstone),
719 ),
720 (
721 "op".to_string(),
722 crate::storage::schema::Value::text(op.to_string()),
723 ),
724 (
725 "created_at_ms".to_string(),
726 crate::storage::schema::Value::Integer(now),
727 ),
728 ];
729 let mut row = crate::storage::RowData::new(Vec::new());
730 row.named = Some(fields.into_iter().collect());
731 let entity = crate::storage::UnifiedEntity::new(
732 crate::storage::EntityId::new(0),
733 crate::storage::EntityKind::TableRow {
734 table: std::sync::Arc::from(collection),
735 row_id: 0,
736 },
737 crate::storage::EntityData::Row(row),
738 );
739 let id = self
740 .runtime
741 .inner
742 .db
743 .store()
744 .insert(collection, entity)
745 .map_err(|err| RedDBError::Internal(err.to_string()))?;
746 if !tags.is_empty() {
747 self.runtime
748 .inner
749 .db
750 .store()
751 .set_metadata(
752 collection,
753 id,
754 Metadata::with_fields(vault_tags_metadata(tags)),
755 )
756 .map_err(|err| RedDBError::Internal(err.to_string()))?;
757 self.runtime
758 .inner
759 .kv_tag_index
760 .replace(collection, key, id, tags);
761 }
762 self.get_vault_entry_version(collection, key, version)?
763 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764 }
765
766 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768 let versions = self.vault_versions(collection, key)?;
769 let store = self.runtime.inner.db.store();
770 let mut purged = 0usize;
771 for entry in versions {
772 if store
773 .delete(collection, entry.id)
774 .map_err(|err| RedDBError::Internal(err.to_string()))?
775 {
776 store.context_index().remove_entity(entry.id);
777 purged += 1;
778 }
779 }
780 Ok(purged)
781 }
782
783 fn ensure_declared_model(
784 &self,
785 model: crate::catalog::CollectionModel,
786 collection: &str,
787 ) -> RedDBResult<()> {
788 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789 return Ok(());
790 };
791 if contract.declared_model == model
792 || contract.declared_model == crate::catalog::CollectionModel::Mixed
793 {
794 return Ok(());
795 }
796 Err(RedDBError::InvalidOperation(format!(
797 "collection '{}' is declared as '{}' and does not allow '{}' operations",
798 collection,
799 keyed_model_name(contract.declared_model),
800 keyed_model_name(model)
801 )))
802 }
803}
804
805impl RedDBRuntime {
806 pub(crate) fn seal_vault_value(
807 &self,
808 collection: &str,
809 value: crate::storage::schema::Value,
810 ) -> RedDBResult<crate::storage::schema::Value> {
811 let key = self.vault_encryption_key(collection)?;
812 let plaintext = value.to_bytes();
813 let nonce_bytes = crate::auth::store::random_bytes(12);
814 let mut nonce = [0u8; 12];
815 nonce.copy_from_slice(&nonce_bytes[..12]);
816 let aad = format!("reddb.vault.{collection}");
817 let ciphertext =
818 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819 let mut payload = Vec::with_capacity(12 + ciphertext.len());
820 payload.extend_from_slice(&nonce);
821 payload.extend_from_slice(&ciphertext);
822 Ok(crate::storage::schema::Value::Secret(payload))
823 }
824
825 fn vault_key_available(&self, collection: &str) -> bool {
826 self.vault_encryption_key(collection).is_ok()
827 }
828
829 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832 })?;
833 if !auth_store.is_vault_backed() {
834 return Err(RedDBError::Query(
835 "vault sealed_unavailable: key provider is sealed".to_string(),
836 ));
837 }
838
839 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840 return decode_vault_key(&hex_key);
841 }
842 auth_store.vault_secret_key().ok_or_else(|| {
843 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844 })
845 }
846
847 fn unseal_vault_value(
848 &self,
849 collection: &str,
850 sealed: &crate::storage::schema::Value,
851 ) -> RedDBResult<crate::storage::schema::Value> {
852 let crate::storage::schema::Value::Secret(payload) = sealed else {
853 return Err(RedDBError::Query(
854 "vault unseal failed: stored value is not sealed".to_string(),
855 ));
856 };
857 if payload.len() < 12 {
858 return Err(RedDBError::Query(
859 "vault unseal failed: sealed payload is truncated".to_string(),
860 ));
861 }
862 let key = self.vault_encryption_key(collection)?;
863 let mut nonce = [0u8; 12];
864 nonce.copy_from_slice(&payload[..12]);
865 let aad = format!("reddb.vault.{collection}");
866 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867 &key,
868 &nonce,
869 aad.as_bytes(),
870 &payload[12..],
871 )
872 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873 let (value, consumed) =
874 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876 })?;
877 if consumed != plaintext.len() {
878 return Err(RedDBError::Query(
879 "vault unseal failed: trailing plaintext bytes".to_string(),
880 ));
881 }
882 Ok(value)
883 }
884
885 fn vault_target_resource(collection: &str, key: &str) -> String {
886 if collection == "red.vault" {
887 return format!("red.vault/{}", key.to_ascii_lowercase());
888 }
889 format!("{collection}.{key}")
890 }
891
892 fn current_vault_actor() -> String {
893 current_auth_identity()
894 .map(|(principal, _)| principal)
895 .unwrap_or_else(|| "anonymous".to_string())
896 }
897
898 fn vault_request_id() -> String {
899 let conn_id = current_connection_id();
900 if conn_id == 0 {
901 "embedded".to_string()
902 } else {
903 format!("conn-{conn_id}")
904 }
905 }
906
907 fn check_vault_capability(
908 &self,
909 action: &str,
910 collection: &str,
911 key: &str,
912 ) -> Result<(), String> {
913 let Some(auth_store) = self.inner.auth_store.read().clone() else {
914 return Ok(());
915 };
916 if !auth_store.iam_authorization_enabled() {
917 return Ok(());
918 }
919 let Some((principal, role)) = current_auth_identity() else {
920 return Err(
921 "IAM authorization is enabled; vault capability check requires an authenticated principal"
922 .to_string(),
923 );
924 };
925 let tenant = current_tenant();
926 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
927 let mut resource = crate::auth::policies::ResourceRef::new(
928 "vault",
929 Self::vault_target_resource(collection, key),
930 );
931 if let Some(ref tenant) = tenant {
932 resource = resource.with_tenant(tenant.clone());
933 }
934 let ctx = crate::auth::policies::EvalContext {
935 principal_tenant: tenant.clone(),
936 current_tenant: tenant,
937 peer_ip: None,
938 mfa_present: false,
939 now_ms: crate::utils::now_unix_millis() as u128,
940 principal_is_admin_role: role == crate::auth::Role::Admin,
941 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
942 principal_is_platform_scoped: principal_id.tenant.is_none(),
943 };
944 if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
945 Ok(())
946 } else {
947 Err(format!(
948 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
949 principal,
950 action,
951 Self::vault_target_resource(collection, key)
952 ))
953 }
954 }
955
956 fn check_system_vault_capability(
957 &self,
958 action: &str,
959 collection: &str,
960 key: &str,
961 ) -> Result<(), String> {
962 if collection != "red.vault" {
963 return Ok(());
964 }
965 self.check_vault_capability(action, collection, key)
966 }
967
968 fn audit_vault_unseal(
969 &self,
970 collection: &str,
971 key: &str,
972 outcome: crate::runtime::audit_log::Outcome,
973 reason: &str,
974 entry: Option<&VaultEntry>,
975 ) {
976 let actor = Self::current_vault_actor();
977 let request_id = Self::vault_request_id();
978 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
979 .principal(actor.clone())
980 .source(crate::runtime::audit_log::AuditAuthSource::Password)
981 .resource(format!(
982 "vault:{}",
983 Self::vault_target_resource(collection, key)
984 ))
985 .outcome(outcome)
986 .correlation_id(request_id.clone())
987 .fields([
988 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
989 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
990 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
991 crate::runtime::audit_log::AuditFieldEscaper::field(
992 "target",
993 Self::vault_target_resource(collection, key),
994 ),
995 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
996 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
997 crate::runtime::audit_log::AuditFieldEscaper::field(
998 "connection_id",
999 current_connection_id(),
1000 ),
1001 ]);
1002 if let Some(tenant) = current_tenant() {
1003 builder = builder.tenant(tenant);
1004 }
1005 if let Some(entry) = entry {
1006 builder = builder.fields([
1007 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1008 crate::runtime::audit_log::AuditFieldEscaper::field(
1009 "sequence_id",
1010 entry.sequence_id,
1011 ),
1012 ]);
1013 }
1014 self.audit_log().record_event(builder.build());
1015 }
1016
1017 fn audit_vault_lifecycle(
1018 &self,
1019 operation: &str,
1020 collection: &str,
1021 key: &str,
1022 outcome: crate::runtime::audit_log::Outcome,
1023 reason: &str,
1024 entry: Option<&VaultEntry>,
1025 ) {
1026 let actor = Self::current_vault_actor();
1027 let request_id = Self::vault_request_id();
1028 let mut builder =
1029 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1030 .principal(actor.clone())
1031 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1032 .resource(format!(
1033 "vault:{}",
1034 Self::vault_target_resource(collection, key)
1035 ))
1036 .outcome(outcome)
1037 .correlation_id(request_id.clone())
1038 .fields([
1039 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1040 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1041 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1042 crate::runtime::audit_log::AuditFieldEscaper::field(
1043 "target",
1044 Self::vault_target_resource(collection, key),
1045 ),
1046 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1047 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1048 crate::runtime::audit_log::AuditFieldEscaper::field(
1049 "connection_id",
1050 current_connection_id(),
1051 ),
1052 ]);
1053 if let Some(tenant) = current_tenant() {
1054 builder = builder.tenant(tenant);
1055 }
1056 if let Some(entry) = entry {
1057 builder = builder.fields([
1058 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1059 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1060 crate::runtime::audit_log::AuditFieldEscaper::field(
1061 "sequence_id",
1062 entry.sequence_id,
1063 ),
1064 ]);
1065 }
1066 self.audit_log().record_event(builder.build());
1067 }
1068
1069 pub(crate) fn resolve_vault_secret_value(
1070 &self,
1071 collection: &str,
1072 key: &str,
1073 ) -> RedDBResult<Value> {
1074 let ops = KvAtomicOps::new(self);
1075 let entry = ops.get_vault_entry(collection, key)?;
1076 if let Err(reason) = self.check_vault_capability("vault:unseal", collection, key) {
1077 self.audit_vault_unseal(
1078 collection,
1079 key,
1080 crate::runtime::audit_log::Outcome::Denied,
1081 &reason,
1082 entry.as_ref(),
1083 );
1084 return Err(RedDBError::Query(reason));
1085 }
1086 let Some(entry) = entry else {
1087 let reason = "not_found";
1088 self.audit_vault_unseal(
1089 collection,
1090 key,
1091 crate::runtime::audit_log::Outcome::Denied,
1092 reason,
1093 None,
1094 );
1095 return Err(RedDBError::NotFound(format!(
1096 "vault secret '{}.{}' not found",
1097 collection, key
1098 )));
1099 };
1100 if entry.tombstone {
1101 let reason = "deleted";
1102 self.audit_vault_unseal(
1103 collection,
1104 key,
1105 crate::runtime::audit_log::Outcome::Denied,
1106 reason,
1107 Some(&entry),
1108 );
1109 return Err(RedDBError::NotFound(format!(
1110 "vault secret '{}.{}' is deleted",
1111 collection, key
1112 )));
1113 }
1114 match self.unseal_vault_value(collection, &entry.value) {
1115 Ok(value) => {
1116 self.audit_vault_unseal(
1117 collection,
1118 key,
1119 crate::runtime::audit_log::Outcome::Success,
1120 "ok",
1121 Some(&entry),
1122 );
1123 Ok(value)
1124 }
1125 Err(err) => {
1126 let reason = err.to_string();
1127 self.audit_vault_unseal(
1128 collection,
1129 key,
1130 crate::runtime::audit_log::Outcome::Error,
1131 &reason,
1132 Some(&entry),
1133 );
1134 Err(err)
1135 }
1136 }
1137 }
1138
1139 pub fn execute_kv_command(
1141 &self,
1142 raw_query: &str,
1143 cmd: &crate::storage::query::ast::KvCommand,
1144 ) -> RedDBResult<RuntimeQueryResult> {
1145 use crate::storage::query::ast::KvCommand;
1146
1147 let ops = KvAtomicOps::new(self);
1148
1149 match cmd {
1150 KvCommand::Put {
1151 model,
1152 collection,
1153 key,
1154 value,
1155 ttl_ms,
1156 tags,
1157 if_not_exists,
1158 } => {
1159 if *model == crate::catalog::CollectionModel::Vault {
1160 self.check_system_vault_capability("vault:write", collection, key)
1161 .map_err(RedDBError::Query)?;
1162 }
1163 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1164 let (created, id) = ops.set_with_tags_for_model(
1165 *model,
1166 collection,
1167 key,
1168 value.clone(),
1169 *ttl_ms,
1170 tags,
1171 *if_not_exists,
1172 )?;
1173
1174 let mut result = UnifiedResult::with_columns(vec![
1175 "ok".into(),
1176 "collection".into(),
1177 "key".into(),
1178 "id".into(),
1179 "created".into(),
1180 "tags".into(),
1181 ]);
1182 let mut record = UnifiedRecord::new();
1183 record.set("ok", Value::Boolean(true));
1184 record.set("collection", Value::text(collection.clone()));
1185 record.set("key", Value::text(key.clone()));
1186 record.set("id", Value::Integer(id.raw() as i64));
1187 record.set("created", Value::Boolean(created));
1188 record.set("tags", kv_tags_value(tags));
1189 result.push(record);
1190
1191 Ok(RuntimeQueryResult {
1192 query: raw_query.to_string(),
1193 mode: crate::storage::query::modes::QueryMode::Sql,
1194 statement: if *model == crate::catalog::CollectionModel::Vault {
1195 "vault_put"
1196 } else {
1197 "kv_put"
1198 },
1199 engine: if *model == crate::catalog::CollectionModel::Vault {
1200 "vault"
1201 } else {
1202 "kv"
1203 },
1204 result,
1205 affected_rows: 1,
1206 statement_type: if created { "insert" } else { "update" },
1207 })
1208 }
1209 KvCommand::InvalidateTags { collection, tags } => {
1210 let invalidated = ops.invalidate_tags(collection, tags)?;
1211
1212 let mut result = UnifiedResult::with_columns(vec![
1213 "ok".into(),
1214 "collection".into(),
1215 "invalidated".into(),
1216 "tags".into(),
1217 ]);
1218 let mut record = UnifiedRecord::new();
1219 record.set("ok", Value::Boolean(true));
1220 record.set("collection", Value::text(collection.clone()));
1221 record.set("invalidated", Value::Integer(invalidated as i64));
1222 record.set("tags", kv_tags_value(tags));
1223 result.push(record);
1224
1225 Ok(RuntimeQueryResult {
1226 query: raw_query.to_string(),
1227 mode: crate::storage::query::modes::QueryMode::Sql,
1228 statement: "kv_invalidate_tags",
1229 engine: "kv",
1230 result,
1231 affected_rows: invalidated as u64,
1232 statement_type: "delete",
1233 })
1234 }
1235
1236 KvCommand::Rotate {
1237 collection,
1238 key,
1239 value,
1240 tags,
1241 } => {
1242 self.check_system_vault_capability("vault:write", collection, key)
1243 .map_err(RedDBError::Query)?;
1244 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1245 let entry = ops.append_vault_version(
1246 collection,
1247 key,
1248 value.clone(),
1249 "rotate",
1250 false,
1251 tags,
1252 )?;
1253 self.record_kv_watch_event(
1254 crate::replication::cdc::ChangeOperation::Update,
1255 collection,
1256 key,
1257 entry.id.raw(),
1258 None,
1259 Some(vault_entry_metadata_json(&entry)),
1260 );
1261 self.audit_vault_lifecycle(
1262 "rotate",
1263 collection,
1264 key,
1265 crate::runtime::audit_log::Outcome::Success,
1266 "ok",
1267 Some(&entry),
1268 );
1269 Ok(vault_write_result(
1270 raw_query,
1271 "vault_rotate",
1272 "update",
1273 collection,
1274 key,
1275 &entry,
1276 1,
1277 ))
1278 }
1279
1280 KvCommand::List {
1281 model,
1282 collection,
1283 prefix,
1284 limit,
1285 offset,
1286 } => {
1287 if *model != crate::catalog::CollectionModel::Vault {
1288 return Err(RedDBError::InvalidOperation(
1289 "LIST is not supported through normal KV command execution".to_string(),
1290 ));
1291 }
1292 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1293 entries.sort_by(|left, right| left.key.cmp(&right.key));
1294 let mut result = UnifiedResult::with_columns(vec![
1295 "collection".into(),
1296 "key".into(),
1297 "version".into(),
1298 "fingerprint".into(),
1299 "tags".into(),
1300 "created_at".into(),
1301 "updated_at".into(),
1302 "status".into(),
1303 "tombstone".into(),
1304 "op".into(),
1305 ]);
1306 for entry in entries
1307 .into_iter()
1308 .filter(|entry| {
1309 self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1310 .is_ok()
1311 })
1312 .skip(*offset)
1313 .take(limit.unwrap_or(usize::MAX))
1314 {
1315 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1316 }
1317 Ok(RuntimeQueryResult {
1318 query: raw_query.to_string(),
1319 mode: crate::storage::query::modes::QueryMode::Sql,
1320 statement: "vault_list",
1321 engine: "vault",
1322 result,
1323 affected_rows: 0,
1324 statement_type: "select",
1325 })
1326 }
1327
1328 KvCommand::History { collection, key } => {
1329 self.check_vault_capability("vault:read_metadata", collection, key)
1330 .map_err(RedDBError::Query)?;
1331 let versions =
1332 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1333 let result = vault_history_result(collection, key, &versions);
1334 Ok(RuntimeQueryResult {
1335 query: raw_query.to_string(),
1336 mode: crate::storage::query::modes::QueryMode::Sql,
1337 statement: "vault_history",
1338 engine: "vault",
1339 result,
1340 affected_rows: 0,
1341 statement_type: "select",
1342 })
1343 }
1344
1345 KvCommand::Purge { collection, key } => {
1346 let entry = ops.get_vault_entry(collection, key)?;
1347 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1348 self.audit_vault_lifecycle(
1349 "purge",
1350 collection,
1351 key,
1352 crate::runtime::audit_log::Outcome::Denied,
1353 &reason,
1354 entry.as_ref(),
1355 );
1356 return Err(RedDBError::Query(reason));
1357 }
1358 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1359 let purged = ops.purge_vault_versions(collection, key)?;
1360 self.audit_vault_lifecycle(
1361 "purge",
1362 collection,
1363 key,
1364 crate::runtime::audit_log::Outcome::Success,
1365 "ok",
1366 entry.as_ref(),
1367 );
1368 let mut result = UnifiedResult::with_columns(vec![
1369 "ok".into(),
1370 "collection".into(),
1371 "key".into(),
1372 "purged".into(),
1373 ]);
1374 let mut record = UnifiedRecord::new();
1375 record.set("ok", Value::Boolean(true));
1376 record.set("collection", Value::text(collection.clone()));
1377 record.set("key", Value::text(key.clone()));
1378 record.set("purged", Value::Integer(purged as i64));
1379 result.push(record);
1380 Ok(RuntimeQueryResult {
1381 query: raw_query.to_string(),
1382 mode: crate::storage::query::modes::QueryMode::Sql,
1383 statement: "vault_purge",
1384 engine: "vault",
1385 result,
1386 affected_rows: purged as u64,
1387 statement_type: "delete",
1388 })
1389 }
1390
1391 KvCommand::Get {
1392 model,
1393 collection,
1394 key,
1395 } => {
1396 if *model == crate::catalog::CollectionModel::Vault {
1397 self.check_vault_capability("vault:read_metadata", collection, key)
1398 .map_err(RedDBError::Query)?;
1399 let entry = ops.get_vault_entry(collection, key)?;
1400 let key_available = self.vault_key_available(collection);
1401 let result =
1402 vault_metadata_result(collection, key, entry.as_ref(), key_available);
1403 return Ok(RuntimeQueryResult {
1404 query: raw_query.to_string(),
1405 mode: crate::storage::query::modes::QueryMode::Sql,
1406 statement: "vault_get",
1407 engine: "vault",
1408 result,
1409 affected_rows: 0,
1410 statement_type: "select",
1411 });
1412 }
1413
1414 let entity = ops.get_entity(*model, collection, key)?;
1415 let value = entity.as_ref().and_then(kv_value_from_entity);
1416 if *model == crate::catalog::CollectionModel::Kv {
1417 self.inner.kv_stats.incr_gets();
1418 }
1419 let mut result = UnifiedResult::with_columns(vec![
1420 "rid".into(),
1421 "collection".into(),
1422 "kind".into(),
1423 "tenant".into(),
1424 "created_at".into(),
1425 "updated_at".into(),
1426 "key".into(),
1427 "value".into(),
1428 "tags".into(),
1429 ]);
1430 let mut record = UnifiedRecord::new();
1431 if let Some(entity) = entity.as_ref() {
1432 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1433 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1434 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1435 } else {
1436 record.set("rid", Value::Null);
1437 record.set("created_at", Value::Null);
1438 record.set("updated_at", Value::Null);
1439 }
1440 record.set("collection", Value::text(collection.clone()));
1441 record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1442 record.set("tenant", Value::Null);
1443 record.set("key", Value::text(key.clone()));
1444 record.set(
1445 "value",
1446 value.unwrap_or(crate::storage::schema::Value::Null),
1447 );
1448 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1449 result.push(record);
1450
1451 Ok(RuntimeQueryResult {
1452 query: raw_query.to_string(),
1453 mode: crate::storage::query::modes::QueryMode::Sql,
1454 statement: "kv_get",
1455 engine: "kv",
1456 result,
1457 affected_rows: 0,
1458 statement_type: "select",
1459 })
1460 }
1461 KvCommand::Watch {
1462 model,
1463 collection,
1464 key,
1465 prefix,
1466 from_lsn,
1467 } => {
1468 let watch_key = if *prefix {
1469 format!("{key}.*")
1470 } else {
1471 key.clone()
1472 };
1473 let endpoint = match from_lsn {
1474 Some(lsn) => format!(
1475 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1476 keyed_model_name(*model)
1477 ),
1478 None => format!(
1479 "/collections/{collection}/{}/{watch_key}/watch",
1480 keyed_model_name(*model)
1481 ),
1482 };
1483 let mut result = UnifiedResult::with_columns(vec![
1484 "collection".into(),
1485 "key".into(),
1486 "prefix".into(),
1487 "from_lsn".into(),
1488 "watch_url".into(),
1489 "streaming".into(),
1490 ]);
1491 let mut record = UnifiedRecord::new();
1492 record.set("collection", Value::text(collection.clone()));
1493 record.set("key", Value::text(watch_key));
1494 record.set("prefix", Value::Boolean(*prefix));
1495 record.set(
1496 "from_lsn",
1497 from_lsn
1498 .map(Value::UnsignedInteger)
1499 .unwrap_or(crate::storage::schema::Value::Null),
1500 );
1501 record.set("watch_url", Value::text(endpoint));
1502 record.set("streaming", Value::Boolean(true));
1503 result.push(record);
1504
1505 Ok(RuntimeQueryResult {
1506 query: raw_query.to_string(),
1507 mode: crate::storage::query::modes::QueryMode::Sql,
1508 statement: "kv_watch",
1509 engine: keyed_model_name(*model),
1510 result,
1511 affected_rows: 0,
1512 statement_type: "stream",
1513 })
1514 }
1515
1516 KvCommand::Unseal {
1517 collection,
1518 key,
1519 version,
1520 } => {
1521 let latest = ops.get_vault_entry(collection, key)?;
1522 let entry = match version {
1523 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1524 None => latest.clone(),
1525 };
1526 let action = match (version, latest.as_ref()) {
1527 (Some(requested), Some(latest)) if *requested == latest.version => {
1528 "vault:unseal"
1529 }
1530 (Some(_), _) => "vault:unseal_history",
1531 _ => "vault:unseal",
1532 };
1533 if let Err(reason) = self.check_vault_capability(action, collection, key) {
1534 self.audit_vault_unseal(
1535 collection,
1536 key,
1537 crate::runtime::audit_log::Outcome::Denied,
1538 &reason,
1539 entry.as_ref(),
1540 );
1541 return Err(RedDBError::Query(reason));
1542 }
1543 let Some(entry) = entry else {
1544 let reason = "not_found";
1545 self.audit_vault_unseal(
1546 collection,
1547 key,
1548 crate::runtime::audit_log::Outcome::Denied,
1549 reason,
1550 None,
1551 );
1552 return Err(RedDBError::NotFound(format!(
1553 "vault secret '{}.{}' not found",
1554 collection, key
1555 )));
1556 };
1557 if entry.tombstone {
1558 let reason = "deleted";
1559 self.audit_vault_unseal(
1560 collection,
1561 key,
1562 crate::runtime::audit_log::Outcome::Denied,
1563 reason,
1564 Some(&entry),
1565 );
1566 return Err(RedDBError::NotFound(format!(
1567 "vault secret '{}.{}' is deleted",
1568 collection, key
1569 )));
1570 }
1571 match self.unseal_vault_value(collection, &entry.value) {
1572 Ok(value) => {
1573 self.audit_vault_unseal(
1574 collection,
1575 key,
1576 crate::runtime::audit_log::Outcome::Success,
1577 "ok",
1578 Some(&entry),
1579 );
1580 let mut result = UnifiedResult::with_columns(vec![
1581 "collection".into(),
1582 "key".into(),
1583 "value".into(),
1584 ]);
1585 let mut record = UnifiedRecord::new();
1586 record.set("collection", Value::text(collection.clone()));
1587 record.set("key", Value::text(key.clone()));
1588 record.set("value", value);
1589 result.push(record);
1590 Ok(RuntimeQueryResult {
1591 query: raw_query.to_string(),
1592 mode: crate::storage::query::modes::QueryMode::Sql,
1593 statement: "vault_unseal",
1594 engine: "vault",
1595 result,
1596 affected_rows: 0,
1597 statement_type: "select",
1598 })
1599 }
1600 Err(err) => {
1601 let reason = err.to_string();
1602 self.audit_vault_unseal(
1603 collection,
1604 key,
1605 crate::runtime::audit_log::Outcome::Error,
1606 &reason,
1607 Some(&entry),
1608 );
1609 Err(err)
1610 }
1611 }
1612 }
1613
1614 KvCommand::Incr {
1615 model,
1616 collection,
1617 key,
1618 by,
1619 ttl_ms,
1620 } => {
1621 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1622 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1623
1624 let mut result = UnifiedResult::with_columns(vec![
1625 "ok".into(),
1626 "collection".into(),
1627 "key".into(),
1628 "value".into(),
1629 ]);
1630 let mut record = UnifiedRecord::new();
1631 record.set("ok", Value::Boolean(true));
1632 record.set("collection", Value::text(collection.clone()));
1633 record.set("key", Value::text(key.clone()));
1634 record.set("value", Value::Integer(new_value));
1635 result.push(record);
1636
1637 Ok(RuntimeQueryResult {
1638 query: raw_query.to_string(),
1639 mode: crate::storage::query::modes::QueryMode::Sql,
1640 statement: "kv_incr",
1641 engine: "kv",
1642 result,
1643 affected_rows: 1,
1644 statement_type: "update",
1645 })
1646 }
1647
1648 KvCommand::Cas {
1649 model,
1650 collection,
1651 key,
1652 expected,
1653 new_value,
1654 ttl_ms,
1655 } => {
1656 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1657 let (ok, current) = ops.cas(
1658 *model,
1659 collection,
1660 key,
1661 expected.as_ref(),
1662 new_value.clone(),
1663 *ttl_ms,
1664 )?;
1665
1666 let mut result = UnifiedResult::with_columns(vec![
1667 "ok".into(),
1668 "collection".into(),
1669 "key".into(),
1670 "current".into(),
1671 ]);
1672 let mut record = UnifiedRecord::new();
1673 record.set("ok", Value::Boolean(ok));
1674 record.set("collection", Value::text(collection.clone()));
1675 record.set("key", Value::text(key.clone()));
1676 record.set(
1677 "current",
1678 current.unwrap_or(crate::storage::schema::Value::Null),
1679 );
1680 result.push(record);
1681
1682 Ok(RuntimeQueryResult {
1683 query: raw_query.to_string(),
1684 mode: crate::storage::query::modes::QueryMode::Sql,
1685 statement: "kv_cas",
1686 engine: "kv",
1687 result,
1688 affected_rows: if ok { 1 } else { 0 },
1689 statement_type: "update",
1690 })
1691 }
1692
1693 KvCommand::Delete {
1694 model,
1695 collection,
1696 key,
1697 } => {
1698 if *model == crate::catalog::CollectionModel::Vault {
1699 self.check_system_vault_capability("vault:write", collection, key)
1700 .map_err(RedDBError::Query)?;
1701 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1702 let entry = ops.append_vault_version(
1703 collection,
1704 key,
1705 Value::Null,
1706 "delete",
1707 true,
1708 &[],
1709 )?;
1710 self.record_kv_watch_event(
1711 crate::replication::cdc::ChangeOperation::Delete,
1712 collection,
1713 key,
1714 entry.id.raw(),
1715 None,
1716 Some(vault_entry_metadata_json(&entry)),
1717 );
1718 self.audit_vault_lifecycle(
1719 "delete",
1720 collection,
1721 key,
1722 crate::runtime::audit_log::Outcome::Success,
1723 "ok",
1724 Some(&entry),
1725 );
1726 return Ok(vault_write_result(
1727 raw_query,
1728 "vault_delete",
1729 "delete",
1730 collection,
1731 key,
1732 &entry,
1733 1,
1734 ));
1735 }
1736 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1737 let deleted = ops.delete(*model, collection, key)?;
1738
1739 let mut result = UnifiedResult::with_columns(vec![
1740 "ok".into(),
1741 "collection".into(),
1742 "key".into(),
1743 "deleted".into(),
1744 ]);
1745 let mut record = UnifiedRecord::new();
1746 record.set("ok", Value::Boolean(true));
1747 record.set("collection", Value::text(collection.clone()));
1748 record.set("key", Value::text(key.clone()));
1749 record.set("deleted", Value::Boolean(deleted));
1750 result.push(record);
1751
1752 Ok(RuntimeQueryResult {
1753 query: raw_query.to_string(),
1754 mode: crate::storage::query::modes::QueryMode::Sql,
1755 statement: "kv_delete",
1756 engine: "kv",
1757 result,
1758 affected_rows: if deleted { 1 } else { 0 },
1759 statement_type: "delete",
1760 })
1761 }
1762 }
1763 }
1764
1765 pub fn vault_watch_events_since(
1766 &self,
1767 collection: &str,
1768 key: &str,
1769 since_lsn: u64,
1770 max_count: usize,
1771 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1772 self.kv_watch_events_since(collection, key, since_lsn, max_count)
1773 .into_iter()
1774 .filter(|event| {
1775 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1776 .is_ok()
1777 })
1778 .map(vault_filter_watch_event)
1779 .collect()
1780 }
1781
1782 pub fn vault_watch_events_since_prefix(
1783 &self,
1784 collection: &str,
1785 prefix: &str,
1786 since_lsn: u64,
1787 max_count: usize,
1788 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1789 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1790 .into_iter()
1791 .filter(|event| {
1792 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1793 .is_ok()
1794 })
1795 .map(vault_filter_watch_event)
1796 .collect()
1797 }
1798
1799 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
1800 let auth_store = match self.inner.auth_store.read().clone() {
1801 Some(store) => store,
1802 None => return Ok(()),
1803 };
1804 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1805 Some(identity) => identity,
1806 None => return Ok(()),
1807 };
1808 if role < crate::auth::Role::Write {
1809 return Err(RedDBError::Query(format!(
1810 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
1811 )));
1812 }
1813 if !auth_store.iam_authorization_enabled() {
1814 return Ok(());
1815 }
1816
1817 let tenant = crate::runtime::impl_core::current_tenant();
1818 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1819 let mut resource =
1820 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
1821 if let Some(tenant) = tenant.clone() {
1822 resource = resource.with_tenant(tenant);
1823 }
1824 let ctx = crate::auth::policies::EvalContext {
1825 principal_tenant: tenant.clone(),
1826 current_tenant: tenant,
1827 peer_ip: None,
1828 mfa_present: false,
1829 now_ms: current_unix_ms(),
1830 principal_is_admin_role: role == crate::auth::Role::Admin,
1831 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1832 principal_is_platform_scoped: principal.tenant.is_none(),
1833 };
1834 if auth_store.check_policy_authz(&principal, "kv:invalidate", &resource, &ctx) {
1835 Ok(())
1836 } else {
1837 Err(RedDBError::Query(format!(
1838 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
1839 )))
1840 }
1841 }
1842}
1843
1844fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
1845 let ttl_ms = ttl_ms?;
1846 Some(Metadata::with_fields(
1847 [(
1848 "_ttl_ms".to_string(),
1849 if ttl_ms <= i64::MAX as u64 {
1850 MetadataValue::Int(ttl_ms as i64)
1851 } else {
1852 MetadataValue::Timestamp(ttl_ms)
1853 },
1854 )]
1855 .into_iter()
1856 .collect(),
1857 ))
1858}
1859
1860fn vault_write_result(
1861 raw_query: &str,
1862 statement: &'static str,
1863 statement_type: &'static str,
1864 collection: &str,
1865 key: &str,
1866 entry: &VaultEntry,
1867 affected_rows: u64,
1868) -> RuntimeQueryResult {
1869 let mut result = UnifiedResult::with_columns(vec![
1870 "ok".into(),
1871 "collection".into(),
1872 "key".into(),
1873 "version".into(),
1874 "fingerprint".into(),
1875 "tombstone".into(),
1876 "op".into(),
1877 "id".into(),
1878 ]);
1879 let mut record = UnifiedRecord::new();
1880 record.set("ok", Value::Boolean(true));
1881 record.set("collection", Value::text(collection.to_string()));
1882 record.set("key", Value::text(key.to_string()));
1883 record.set("version", Value::Integer(entry.version));
1884 if entry.tombstone {
1885 record.set("fingerprint", Value::Null);
1886 } else {
1887 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1888 }
1889 record.set("tombstone", Value::Boolean(entry.tombstone));
1890 record.set("op", Value::text(entry.op.clone()));
1891 record.set("id", Value::Integer(entry.id.raw() as i64));
1892 result.push(record);
1893 RuntimeQueryResult {
1894 query: raw_query.to_string(),
1895 mode: crate::storage::query::modes::QueryMode::Sql,
1896 statement,
1897 engine: "vault",
1898 result,
1899 affected_rows,
1900 statement_type,
1901 }
1902}
1903
1904fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
1905 let mut result = UnifiedResult::with_columns(vec![
1906 "collection".into(),
1907 "key".into(),
1908 "version".into(),
1909 "fingerprint".into(),
1910 "tags".into(),
1911 "created_at".into(),
1912 "updated_at".into(),
1913 "status".into(),
1914 "tombstone".into(),
1915 "op".into(),
1916 ]);
1917 for entry in versions {
1918 push_vault_metadata_record(&mut result, collection, key, entry);
1919 }
1920 result
1921}
1922
1923fn push_vault_metadata_record(
1924 result: &mut UnifiedResult,
1925 collection: &str,
1926 key: &str,
1927 entry: &VaultEntry,
1928) {
1929 let mut record = UnifiedRecord::new();
1930 record.set("collection", Value::text(collection.to_string()));
1931 record.set("key", Value::text(key.to_string()));
1932 record.set("version", Value::Integer(entry.version));
1933 if entry.tombstone {
1934 record.set("fingerprint", Value::Null);
1935 record.set("status", Value::text("deleted"));
1936 } else {
1937 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1938 record.set("status", Value::text("sealed"));
1939 }
1940 record.set("tags", vault_tags_value(&entry.metadata));
1941 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1942 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1943 record.set("tombstone", Value::Boolean(entry.tombstone));
1944 record.set("op", Value::text(entry.op.clone()));
1945 result.push(record);
1946}
1947
1948fn vault_metadata_result(
1949 collection: &str,
1950 key: &str,
1951 entry: Option<&VaultEntry>,
1952 key_available: bool,
1953) -> UnifiedResult {
1954 let mut result = UnifiedResult::with_columns(vec![
1955 "collection".into(),
1956 "key".into(),
1957 "version".into(),
1958 "fingerprint".into(),
1959 "tags".into(),
1960 "created_at".into(),
1961 "updated_at".into(),
1962 "value".into(),
1963 "status".into(),
1964 "tombstone".into(),
1965 "op".into(),
1966 ]);
1967 let mut record = UnifiedRecord::new();
1968 record.set("collection", Value::text(collection.to_string()));
1969 record.set("key", Value::text(key.to_string()));
1970 match entry {
1971 Some(entry) => {
1972 record.set("version", Value::Integer(entry.version));
1973 if entry.tombstone {
1974 record.set("fingerprint", Value::Null);
1975 } else {
1976 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1977 }
1978 record.set("tags", vault_tags_value(&entry.metadata));
1979 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1980 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1981 record.set("value", Value::text("***"));
1982 record.set(
1983 "status",
1984 Value::text(if entry.tombstone {
1985 "deleted"
1986 } else if key_available {
1987 "sealed"
1988 } else {
1989 "sealed_unavailable"
1990 }),
1991 );
1992 record.set("tombstone", Value::Boolean(entry.tombstone));
1993 record.set("op", Value::text(entry.op.clone()));
1994 }
1995 None => {
1996 record.set("version", Value::Null);
1997 record.set("fingerprint", Value::Null);
1998 record.set("tags", Value::Array(Vec::new()));
1999 record.set("created_at", Value::Null);
2000 record.set("updated_at", Value::Null);
2001 record.set("value", Value::text(""));
2002 record.set("status", Value::text("missing"));
2003 record.set("tombstone", Value::Boolean(false));
2004 record.set("op", Value::Null);
2005 }
2006 }
2007 result.push(record);
2008 result
2009}
2010
2011fn vault_fingerprint(value: &Value) -> String {
2012 match value {
2013 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2014 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2015 }
2016}
2017
2018fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2019 let mut object = crate::json::Map::new();
2020 object.insert(
2021 "key".to_string(),
2022 crate::json::Value::String(entry.key.clone()),
2023 );
2024 object.insert(
2025 "version".to_string(),
2026 crate::json::Value::Number(entry.version as f64),
2027 );
2028 object.insert(
2029 "fingerprint".to_string(),
2030 if entry.tombstone {
2031 crate::json::Value::Null
2032 } else {
2033 crate::json::Value::String(vault_fingerprint(&entry.value))
2034 },
2035 );
2036 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2037 object.insert(
2038 "actor".to_string(),
2039 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2040 );
2041 object.insert(
2042 "sequence_id".to_string(),
2043 crate::json::Value::Number(entry.sequence_id as f64),
2044 );
2045 object.insert(
2046 "tombstone".to_string(),
2047 crate::json::Value::Bool(entry.tombstone),
2048 );
2049 object.insert(
2050 "op".to_string(),
2051 crate::json::Value::String(entry.op.clone()),
2052 );
2053 crate::json::Value::Object(object)
2054}
2055
2056fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2057 match vault_tags_value(metadata) {
2058 Value::Array(values) => crate::json::Value::Array(
2059 values
2060 .into_iter()
2061 .filter_map(|value| match value {
2062 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2063 _ => None,
2064 })
2065 .collect(),
2066 ),
2067 _ => crate::json::Value::Array(Vec::new()),
2068 }
2069}
2070
2071fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2072 [(
2073 "tags".to_string(),
2074 MetadataValue::Array(
2075 tags.iter()
2076 .map(|tag| MetadataValue::String(tag.clone()))
2077 .collect(),
2078 ),
2079 )]
2080 .into_iter()
2081 .collect()
2082}
2083
2084fn vault_filter_watch_event(
2085 mut event: crate::replication::cdc::KvWatchEvent,
2086) -> crate::replication::cdc::KvWatchEvent {
2087 event.before = event.before.and_then(vault_metadata_json_only);
2088 event.after = event.after.and_then(vault_metadata_json_only);
2089 event
2090}
2091
2092fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2093 let object = value.as_object()?;
2094 let mut out = crate::json::Map::new();
2095 for field in [
2096 "key",
2097 "version",
2098 "fingerprint",
2099 "tags",
2100 "actor",
2101 "sequence_id",
2102 "tombstone",
2103 "op",
2104 ] {
2105 if let Some(value) = object.get(field) {
2106 out.insert(field.to_string(), value.clone());
2107 }
2108 }
2109 Some(crate::json::Value::Object(out))
2110}
2111
2112fn vault_tags_value(metadata: &Metadata) -> Value {
2113 match metadata.get("tags") {
2114 Some(MetadataValue::Array(values)) => Value::Array(
2115 values
2116 .iter()
2117 .filter_map(|value| match value {
2118 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2119 _ => None,
2120 })
2121 .collect(),
2122 ),
2123 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2124 Value::Array(vec![Value::text(tag.clone())])
2125 }
2126 _ => Value::Array(Vec::new()),
2127 }
2128}
2129
2130fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2131 let bytes = hex::decode(hex_key)
2132 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2133 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2134 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2135 })?;
2136 Ok(key)
2137}
2138
2139fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2140 if tags.is_empty() {
2141 return None;
2142 }
2143 let values = tags
2144 .iter()
2145 .map(|tag| MetadataValue::String(tag.clone()))
2146 .collect();
2147 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2148}
2149
2150fn kv_tags_value(tags: &[String]) -> Value {
2151 let json = crate::json::Value::Array(
2152 tags.iter()
2153 .map(|tag| crate::json::Value::String(tag.clone()))
2154 .collect(),
2155 );
2156 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2157}
2158
2159fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2160 if let crate::storage::EntityData::Row(ref row) = entity.data {
2161 if let Some(ref named) = row.named {
2162 return named.get("value").cloned();
2163 }
2164 }
2165 None
2166}
2167
2168fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2169 let now = current_unix_ms();
2170 crate::physical::CollectionContract {
2171 name: name.to_string(),
2172 declared_model: crate::catalog::CollectionModel::Kv,
2173 schema_mode: crate::catalog::SchemaMode::Dynamic,
2174 origin: crate::physical::ContractOrigin::Implicit,
2175 version: 1,
2176 created_at_unix_ms: now,
2177 updated_at_unix_ms: now,
2178 default_ttl_ms: None,
2179 vector_dimension: None,
2180 vector_metric: None,
2181 context_index_fields: Vec::new(),
2182 declared_columns: Vec::new(),
2183 table_def: None,
2184 timestamps_enabled: false,
2185 context_index_enabled: false,
2186 metrics_raw_retention_ms: None,
2187 metrics_rollup_policies: Vec::new(),
2188 metrics_tenant_identity: None,
2189 metrics_namespace: None,
2190 append_only: false,
2191 subscriptions: Vec::new(),
2192 session_key: None,
2193 session_gap_ms: None,
2194 retention_duration_ms: None,
2195 }
2196}
2197
2198fn current_unix_ms() -> u128 {
2199 std::time::SystemTime::now()
2200 .duration_since(std::time::UNIX_EPOCH)
2201 .unwrap_or_default()
2202 .as_millis()
2203}
2204
2205#[cfg(test)]
2206mod tests {
2207 use crate::api::RedDBOptions;
2208 use crate::catalog::CollectionModel;
2209 use crate::runtime::RedDBRuntime;
2210
2211 fn rt() -> RedDBRuntime {
2212 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2213 }
2214
2215 #[test]
2216 fn incr_missing_key_initialises_at_by() {
2217 let r = rt();
2218 let ops = super::KvAtomicOps::new(&r);
2219 let v = ops
2220 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2221 .unwrap();
2222 assert_eq!(v, 5);
2223 }
2224
2225 #[test]
2226 fn kv_runtime_stats_count_public_ops() {
2227 let r = rt();
2228 let ops = super::KvAtomicOps::new(&r);
2229
2230 ops.set(
2231 CollectionModel::Kv,
2232 "kv_default",
2233 "profile",
2234 crate::storage::schema::Value::text("alice"),
2235 None,
2236 false,
2237 )
2238 .unwrap();
2239 ops.get(CollectionModel::Kv, "kv_default", "profile")
2240 .unwrap();
2241 ops.delete(CollectionModel::Kv, "kv_default", "profile")
2242 .unwrap();
2243 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2244 .unwrap();
2245 ops.cas(
2246 CollectionModel::Kv,
2247 "kv_default",
2248 "profile",
2249 None,
2250 crate::storage::schema::Value::text("created"),
2251 None,
2252 )
2253 .unwrap();
2254 ops.cas(
2255 CollectionModel::Kv,
2256 "kv_default",
2257 "profile",
2258 Some(&crate::storage::schema::Value::text("different")),
2259 crate::storage::schema::Value::text("ignored"),
2260 None,
2261 )
2262 .unwrap();
2263
2264 let stats = r.stats().kv;
2265 assert_eq!(stats.puts, 1);
2266 assert_eq!(stats.gets, 1);
2267 assert_eq!(stats.deletes, 1);
2268 assert_eq!(stats.incrs, 1);
2269 assert_eq!(stats.cas_success, 1);
2270 assert_eq!(stats.cas_conflict, 1);
2271 }
2272
2273 #[test]
2274 fn kv_invalidate_tags_removes_matching_entries_only() {
2275 let r = rt();
2276
2277 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2278 .unwrap();
2279
2280 let miss = r
2281 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2282 .unwrap();
2283 assert_eq!(miss.affected_rows, 0);
2284 assert!(matches!(
2285 r.execute_query("KV GET sessions.blob")
2286 .unwrap()
2287 .result
2288 .records[0]
2289 .get("value"),
2290 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2291 ));
2292
2293 let hit = r
2294 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2295 .unwrap();
2296 assert_eq!(hit.affected_rows, 1);
2297 assert!(matches!(
2298 r.execute_query("KV GET sessions.blob")
2299 .unwrap()
2300 .result
2301 .records[0]
2302 .get("value"),
2303 Some(crate::storage::schema::Value::Null)
2304 ));
2305 }
2306
2307 #[test]
2308 fn kv_runtime_stats_count_watch_streams_and_events() {
2309 let r = rt();
2310 let ops = super::KvAtomicOps::new(&r);
2311 assert_eq!(r.stats().kv.watch_streams_active, 0);
2312
2313 {
2314 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2315 assert_eq!(r.stats().kv.watch_streams_active, 1);
2316
2317 ops.set(
2318 CollectionModel::Kv,
2319 "kv_default",
2320 "watched",
2321 crate::storage::schema::Value::Integer(1),
2322 None,
2323 false,
2324 )
2325 .unwrap();
2326 let event = stream.poll_next().expect("watch event");
2327 assert_eq!(event.key, "watched");
2328 assert_eq!(r.stats().kv.watch_events_emitted, 1);
2329
2330 stream.record_drop_count(3);
2331 assert_eq!(r.stats().kv.watch_drops, 3);
2332 }
2333
2334 assert_eq!(r.stats().kv.watch_streams_active, 0);
2335 }
2336
2337 #[test]
2338 fn incr_existing_integer_accumulates() {
2339 let r = rt();
2340 let ops = super::KvAtomicOps::new(&r);
2341 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2342 .unwrap();
2343 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2344 .unwrap();
2345 let v = ops
2346 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2347 .unwrap();
2348 assert_eq!(v, 3);
2349 }
2350
2351 #[test]
2352 fn decr_via_negative_by() {
2353 let r = rt();
2354 let ops = super::KvAtomicOps::new(&r);
2355 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2356 .unwrap();
2357 let v = ops
2358 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2359 .unwrap();
2360 assert_eq!(v, 7);
2361 }
2362
2363 #[test]
2364 fn concurrent_incr_single_key_is_atomic() {
2365 const THREADS: usize = 8;
2366 const ITERS: usize = 1000;
2367
2368 let runtime = std::sync::Arc::new(rt());
2369 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2370 let mut handles = Vec::new();
2371
2372 for _ in 0..THREADS {
2373 let runtime = std::sync::Arc::clone(&runtime);
2374 let barrier = std::sync::Arc::clone(&barrier);
2375 handles.push(std::thread::spawn(move || {
2376 let ops = super::KvAtomicOps::new(&runtime);
2377 barrier.wait();
2378 for _ in 0..ITERS {
2379 ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2380 .unwrap();
2381 }
2382 }));
2383 }
2384
2385 for handle in handles {
2386 handle.join().expect("worker should finish");
2387 }
2388
2389 let ops = super::KvAtomicOps::new(&runtime);
2390 assert_eq!(
2391 ops.get(CollectionModel::Kv, "kv_default", "counter")
2392 .unwrap(),
2393 Some(crate::storage::schema::Value::Integer(
2394 (THREADS * ITERS) as i64
2395 ))
2396 );
2397 }
2398
2399 #[test]
2400 fn incr_on_string_value_returns_error() {
2401 let r = rt();
2402 let ops = super::KvAtomicOps::new(&r);
2403 ops.set(
2404 CollectionModel::Kv,
2405 "kv_default",
2406 "name",
2407 crate::storage::schema::Value::text("alice"),
2408 None,
2409 false,
2410 )
2411 .unwrap();
2412 let err = ops
2413 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2414 .unwrap_err();
2415 assert!(err.to_string().contains("non-integer"));
2416 }
2417
2418 #[test]
2421 fn cas_matching_value_succeeds() {
2422 let r = rt();
2423 let ops = super::KvAtomicOps::new(&r);
2424 ops.set(
2425 CollectionModel::Kv,
2426 "kv_default",
2427 "lock",
2428 crate::storage::schema::Value::text("free"),
2429 None,
2430 false,
2431 )
2432 .unwrap();
2433 let (ok, prev) = ops
2434 .cas(
2435 CollectionModel::Kv,
2436 "kv_default",
2437 "lock",
2438 Some(&crate::storage::schema::Value::text("free")),
2439 crate::storage::schema::Value::text("held"),
2440 None,
2441 )
2442 .unwrap();
2443 assert!(ok);
2444 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2445 assert_eq!(
2447 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2448 Some(crate::storage::schema::Value::text("held"))
2449 );
2450 }
2451
2452 #[test]
2453 fn concurrent_cas_allows_one_success_per_round() {
2454 const THREADS: usize = 8;
2455 const ROUNDS: usize = 100;
2456
2457 let runtime = std::sync::Arc::new(rt());
2458 let ops = super::KvAtomicOps::new(&runtime);
2459 ops.set(
2460 CollectionModel::Kv,
2461 "kv_default",
2462 "cas_counter",
2463 crate::storage::schema::Value::Integer(0),
2464 None,
2465 false,
2466 )
2467 .unwrap();
2468
2469 let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2470 let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2471 let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2472 let mut handles = Vec::new();
2473
2474 for _ in 0..THREADS {
2475 let runtime = std::sync::Arc::clone(&runtime);
2476 let start_round = std::sync::Arc::clone(&start_round);
2477 let finish_round = std::sync::Arc::clone(&finish_round);
2478 let successes = std::sync::Arc::clone(&successes);
2479 handles.push(std::thread::spawn(move || {
2480 let ops = super::KvAtomicOps::new(&runtime);
2481 for round in 0..ROUNDS {
2482 start_round.wait();
2483 let (ok, _) = ops
2484 .cas(
2485 CollectionModel::Kv,
2486 "kv_default",
2487 "cas_counter",
2488 Some(&crate::storage::schema::Value::Integer(round as i64)),
2489 crate::storage::schema::Value::Integer((round + 1) as i64),
2490 None,
2491 )
2492 .unwrap();
2493 if ok {
2494 successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2495 }
2496 finish_round.wait();
2497 }
2498 }));
2499 }
2500
2501 for handle in handles {
2502 handle.join().expect("worker should finish");
2503 }
2504
2505 assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2506 assert_eq!(
2507 ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2508 .unwrap(),
2509 Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2510 );
2511 }
2512
2513 #[test]
2514 fn cas_mismatching_value_fails() {
2515 let r = rt();
2516 let ops = super::KvAtomicOps::new(&r);
2517 ops.set(
2518 CollectionModel::Kv,
2519 "kv_default",
2520 "lock",
2521 crate::storage::schema::Value::text("free"),
2522 None,
2523 false,
2524 )
2525 .unwrap();
2526 let (ok, current) = ops
2527 .cas(
2528 CollectionModel::Kv,
2529 "kv_default",
2530 "lock",
2531 Some(&crate::storage::schema::Value::text("held")),
2532 crate::storage::schema::Value::text("worker-7"),
2533 None,
2534 )
2535 .unwrap();
2536 assert!(!ok);
2537 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2538 assert_eq!(
2540 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2541 Some(crate::storage::schema::Value::text("free"))
2542 );
2543 }
2544
2545 #[test]
2546 fn cas_expect_null_on_missing_key_creates() {
2547 let r = rt();
2548 let ops = super::KvAtomicOps::new(&r);
2549 let (ok, prev) = ops
2550 .cas(
2551 CollectionModel::Kv,
2552 "kv_default",
2553 "new_key",
2554 None,
2555 crate::storage::schema::Value::text("created"),
2556 None,
2557 )
2558 .unwrap();
2559 assert!(ok);
2560 assert_eq!(prev, None);
2561 assert_eq!(
2562 ops.get(CollectionModel::Kv, "kv_default", "new_key")
2563 .unwrap(),
2564 Some(crate::storage::schema::Value::text("created"))
2565 );
2566 }
2567
2568 #[test]
2569 fn cas_expect_null_on_existing_key_fails() {
2570 let r = rt();
2571 let ops = super::KvAtomicOps::new(&r);
2572 ops.set(
2573 CollectionModel::Kv,
2574 "kv_default",
2575 "taken",
2576 crate::storage::schema::Value::text("worker-1"),
2577 None,
2578 false,
2579 )
2580 .unwrap();
2581 let (ok, current) = ops
2582 .cas(
2583 CollectionModel::Kv,
2584 "kv_default",
2585 "taken",
2586 None,
2587 crate::storage::schema::Value::text("worker-2"),
2588 None,
2589 )
2590 .unwrap();
2591 assert!(!ok);
2592 assert_eq!(
2593 current,
2594 Some(crate::storage::schema::Value::text("worker-1"))
2595 );
2596 }
2597
2598 #[test]
2599 fn cas_via_sql_roundtrip() {
2600 let r = rt();
2601 r.execute_query("KV PUT lock = 'free'").unwrap();
2603 let res = r
2605 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2606 .unwrap();
2607 let row = &res.result.records[0];
2608 assert_eq!(
2609 row.get("ok"),
2610 Some(&crate::storage::schema::Value::Boolean(true))
2611 );
2612 let res2 = r
2614 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2615 .unwrap();
2616 let row2 = &res2.result.records[0];
2617 assert_eq!(
2618 row2.get("ok"),
2619 Some(&crate::storage::schema::Value::Boolean(false))
2620 );
2621 }
2622
2623 #[test]
2624 fn cas_expect_null_via_sql() {
2625 let r = rt();
2626 let res = r
2627 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2628 .unwrap();
2629 let row = &res.result.records[0];
2630 assert_eq!(
2631 row.get("ok"),
2632 Some(&crate::storage::schema::Value::Boolean(true))
2633 );
2634 let res2 = r
2636 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2637 .unwrap();
2638 let row2 = &res2.result.records[0];
2639 assert_eq!(
2640 row2.get("ok"),
2641 Some(&crate::storage::schema::Value::Boolean(false))
2642 );
2643 }
2644
2645 #[test]
2646 fn incr_via_sql_roundtrip() {
2647 let r = rt();
2648 let res = r.execute_query("KV INCR hits").unwrap();
2649 let row = &res.result.records[0];
2650 assert_eq!(
2651 row.get("value"),
2652 Some(&crate::storage::schema::Value::Integer(1))
2653 );
2654 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2655 let row2 = &res2.result.records[0];
2656 assert_eq!(
2657 row2.get("value"),
2658 Some(&crate::storage::schema::Value::Integer(5))
2659 );
2660 }
2661
2662 #[test]
2663 fn concurrent_self_referential_update_is_atomic() {
2664 const THREADS: usize = 8;
2665 const ITERS: usize = 100;
2666
2667 let runtime = std::sync::Arc::new(rt());
2668 runtime
2669 .execute_query("CREATE TABLE counters (id INT, n INT)")
2670 .unwrap();
2671 runtime
2672 .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
2673 .unwrap();
2674
2675 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2676 let mut handles = Vec::new();
2677 for _ in 0..THREADS {
2678 let runtime = std::sync::Arc::clone(&runtime);
2679 let barrier = std::sync::Arc::clone(&barrier);
2680 handles.push(std::thread::spawn(move || {
2681 barrier.wait();
2682 for _ in 0..ITERS {
2683 runtime
2684 .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
2685 .unwrap();
2686 }
2687 }));
2688 }
2689
2690 for handle in handles {
2691 handle.join().expect("worker should finish");
2692 }
2693
2694 let selected = runtime
2695 .execute_query("SELECT n FROM counters WHERE id = 1")
2696 .unwrap();
2697 assert_eq!(
2698 selected.result.records[0].get("n"),
2699 Some(&crate::storage::schema::Value::Integer(
2700 (THREADS * ITERS) as i64
2701 ))
2702 );
2703 }
2704
2705 #[test]
2706 fn watch_stream_delivers_key_events_in_lsn_order() {
2707 let r = rt();
2708 let ops = super::KvAtomicOps::new(&r);
2709 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
2710
2711 ops.set(
2712 CollectionModel::Kv,
2713 "kv_default",
2714 "seq",
2715 crate::storage::schema::Value::Integer(1),
2716 None,
2717 false,
2718 )
2719 .unwrap();
2720 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
2721 .unwrap();
2722 ops.delete(CollectionModel::Kv, "kv_default", "seq")
2723 .unwrap();
2724 ops.set(
2725 CollectionModel::Kv,
2726 "kv_default",
2727 "seq",
2728 crate::storage::schema::Value::Integer(9),
2729 None,
2730 false,
2731 )
2732 .unwrap();
2733
2734 let mut events = Vec::new();
2735 while let Some(event) = stream.poll_next() {
2736 events.push(event);
2737 if events.len() == 4 {
2738 break;
2739 }
2740 }
2741
2742 assert_eq!(events.len(), 4);
2743 assert_eq!(
2744 events[0].op,
2745 crate::replication::cdc::ChangeOperation::Insert
2746 );
2747 assert_eq!(
2748 events[1].op,
2749 crate::replication::cdc::ChangeOperation::Update
2750 );
2751 assert_eq!(
2752 events[2].op,
2753 crate::replication::cdc::ChangeOperation::Delete
2754 );
2755 assert_eq!(
2756 events[3].op,
2757 crate::replication::cdc::ChangeOperation::Insert
2758 );
2759 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
2760 }
2761
2762 #[test]
2763 fn watch_prefix_stream_delivers_matching_events_only() {
2764 let r = rt();
2765 let ops = super::KvAtomicOps::new(&r);
2766 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
2767
2768 ops.set(
2769 CollectionModel::Kv,
2770 "kv_default",
2771 "acct:1",
2772 crate::storage::schema::Value::Integer(1),
2773 None,
2774 false,
2775 )
2776 .unwrap();
2777 ops.set(
2778 CollectionModel::Kv,
2779 "kv_default",
2780 "session:1",
2781 crate::storage::schema::Value::Integer(2),
2782 None,
2783 false,
2784 )
2785 .unwrap();
2786 ops.set(
2787 CollectionModel::Kv,
2788 "kv_default",
2789 "acct:2",
2790 crate::storage::schema::Value::Integer(3),
2791 None,
2792 false,
2793 )
2794 .unwrap();
2795
2796 let first = stream.poll_next().expect("first prefix event");
2797 let second = stream.poll_next().expect("second prefix event");
2798 assert_eq!(first.key, "acct:1");
2799 assert_eq!(second.key, "acct:2");
2800 assert!(stream.poll_next().is_none());
2801 }
2802
2803 #[test]
2804 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
2805 let r = rt();
2806 let ops = super::KvAtomicOps::new(&r);
2807 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
2808
2809 let mut last_seen_lsn = 0;
2810 for value in 0..5 {
2811 ops.set(
2812 CollectionModel::Kv,
2813 "kv_default",
2814 "resume",
2815 crate::storage::schema::Value::Integer(value),
2816 None,
2817 false,
2818 )
2819 .unwrap();
2820 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
2821 }
2822 drop(stream);
2823
2824 for value in 5..55 {
2825 ops.set(
2826 CollectionModel::Kv,
2827 "kv_default",
2828 "resume",
2829 crate::storage::schema::Value::Integer(value),
2830 None,
2831 false,
2832 )
2833 .unwrap();
2834 }
2835
2836 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
2837 let mut lsns = Vec::new();
2838 while let Some(event) = resumed.poll_next() {
2839 lsns.push(event.lsn);
2840 if lsns.len() == 50 {
2841 break;
2842 }
2843 }
2844
2845 assert_eq!(lsns.len(), 50);
2846 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
2847 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
2848 assert!(resumed.poll_next().is_none());
2849 }
2850
2851 #[test]
2852 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
2853 let r = rt();
2854 let ops = super::KvAtomicOps::new(&r);
2855 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
2856
2857 for value in 0..10_000 {
2858 ops.set(
2859 CollectionModel::Kv,
2860 "kv_default",
2861 "slow",
2862 crate::storage::schema::Value::Integer(value),
2863 None,
2864 false,
2865 )
2866 .unwrap();
2867 }
2868
2869 let event = stream.poll_next().expect("tail event after drops");
2870 assert!(event.lsn > 1);
2871 assert!(event.dropped_event_count > 0);
2872 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
2873 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
2874 }
2875
2876 #[test]
2877 fn watch_stream_idle_timeout_closes_subscription() {
2878 let r = rt();
2879 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
2880 .unwrap();
2881
2882 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
2883 assert_eq!(r.stats().kv.watch_streams_active, 1);
2884 std::thread::sleep(std::time::Duration::from_millis(5));
2885
2886 assert!(stream.poll_next().is_none());
2887 assert_eq!(r.stats().kv.watch_streams_active, 0);
2888 }
2889
2890 #[test]
2891 fn watch_stream_does_not_emit_rolled_back_put() {
2892 let r = rt();
2893 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
2894
2895 r.execute_query("BEGIN").unwrap();
2896 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
2897 r.execute_query("ROLLBACK").unwrap();
2898
2899 assert!(stream.poll_next().is_none());
2900 }
2901}