1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 if was_present {
225 self.delete(model, collection, key)?;
226 }
227
228 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229 metadata.extend(ttl_metadata.fields);
230 }
231 if let Some(tags_metadata) = kv_tags_metadata(tags) {
232 metadata.push(tags_metadata);
233 }
234
235 let output = self
236 .runtime
237 .create_kv(crate::application::entity::CreateKvInput {
238 collection: collection.to_string(),
239 key: key.to_string(),
240 value,
241 metadata,
242 })?;
243 if model == crate::catalog::CollectionModel::Kv {
244 self.runtime
245 .inner
246 .kv_tag_index
247 .replace(collection, key, output.id, tags);
248 }
249
250 if model == crate::catalog::CollectionModel::Kv {
251 self.runtime
252 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253 }
254
255 if model == crate::catalog::CollectionModel::Kv {
256 self.runtime.inner.kv_stats.incr_puts();
257 }
258 Ok((!was_present, output.id))
259 }
260
261 pub fn get(
263 &self,
264 model: crate::catalog::CollectionModel,
265 collection: &str,
266 key: &str,
267 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268 let result = self.get_entry(model, collection, key)?;
269 if model == crate::catalog::CollectionModel::Kv {
270 self.runtime.inner.kv_stats.incr_gets();
271 }
272 Ok(result.map(|(v, _)| v))
273 }
274
275 pub fn delete(
277 &self,
278 model: crate::catalog::CollectionModel,
279 collection: &str,
280 key: &str,
281 ) -> RedDBResult<bool> {
282 self.ensure_declared_model(model, collection)?;
283 let found = self.get_entry(model, collection, key)?;
284 if let Some((value, id)) = found {
285 let store = self.runtime.inner.db.store();
286 let deleted = store
287 .delete(collection, id)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 if deleted {
290 store.context_index().remove_entity(id);
291 if model == crate::catalog::CollectionModel::Kv {
292 self.runtime.inner.kv_tag_index.remove(collection, key);
293 self.runtime.record_kv_watch_event(
294 crate::replication::cdc::ChangeOperation::Delete,
295 collection,
296 key,
297 id.raw(),
298 Some(crate::presentation::entity_json::storage_value_to_json(
299 &value,
300 )),
301 None,
302 );
303 self.runtime.inner.kv_stats.incr_deletes();
304 }
305 }
306 Ok(deleted)
307 } else {
308 Ok(false)
309 }
310 }
311
312 pub fn incr(
317 &self,
318 model: crate::catalog::CollectionModel,
319 collection: &str,
320 key: &str,
321 by: i64,
322 ttl_ms: Option<u64>,
323 ) -> RedDBResult<i64> {
324 if model == crate::catalog::CollectionModel::Vault {
325 return Err(RedDBError::InvalidOperation(
326 "VAULT INCR is not supported for sealed secrets".to_string(),
327 ));
328 }
329 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330 let _rmw_guard = rmw_lock.lock();
331 self.ensure_kv_collection(collection)?;
332 let existing = self.runtime.get_kv(collection, key)?;
333 let current: i64 = match existing.as_ref() {
334 None => 0,
335 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337 Some((other, _)) => {
338 return Err(RedDBError::Internal(format!(
339 "INCR on non-integer value: {:?}",
340 other
341 )));
342 }
343 };
344
345 let next = current
346 .checked_add(by)
347 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349 if existing.is_some() {
351 self.runtime.delete_kv(collection, key)?;
352 }
353
354 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355 .map(|m| m.fields.into_iter().collect())
356 .unwrap_or_default();
357
358 let output = self
359 .runtime
360 .create_kv(crate::application::entity::CreateKvInput {
361 collection: collection.to_string(),
362 key: key.to_string(),
363 value: crate::storage::schema::Value::Integer(next),
364 metadata: meta_vec,
365 })?;
366 self.runtime
367 .inner
368 .kv_tag_index
369 .replace(collection, key, output.id, &[]);
370
371 self.runtime.record_kv_watch_event(
372 if existing.is_some() {
373 crate::replication::cdc::ChangeOperation::Update
374 } else {
375 crate::replication::cdc::ChangeOperation::Insert
376 },
377 collection,
378 key,
379 output.id.raw(),
380 existing
381 .as_ref()
382 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383 Some(crate::presentation::entity_json::storage_value_to_json(
384 &crate::storage::schema::Value::Integer(next),
385 )),
386 );
387
388 self.runtime.inner.kv_stats.incr_incrs();
389 Ok(next)
390 }
391
392 pub fn cas(
400 &self,
401 model: crate::catalog::CollectionModel,
402 collection: &str,
403 key: &str,
404 expected: Option<&crate::storage::schema::Value>,
405 new_value: crate::storage::schema::Value,
406 ttl_ms: Option<u64>,
407 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408 if model == crate::catalog::CollectionModel::Vault {
409 return Err(RedDBError::InvalidOperation(
410 "VAULT CAS is not supported for sealed secrets".to_string(),
411 ));
412 }
413 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414 let _rmw_guard = rmw_lock.lock();
415 self.ensure_kv_collection(collection)?;
416 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418 let matches = match (¤t, expected) {
419 (None, None) => true,
420 (Some(cur), Some(exp)) => cur == exp,
421 _ => false,
422 };
423
424 if !matches {
425 self.runtime.inner.kv_stats.incr_cas_conflict();
426 return Ok((false, current));
427 }
428
429 if current.is_some() {
431 self.runtime.delete_kv(collection, key)?;
432 }
433
434 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435 .map(|m| m.fields.into_iter().collect())
436 .unwrap_or_default();
437
438 let output = self
439 .runtime
440 .create_kv(crate::application::entity::CreateKvInput {
441 collection: collection.to_string(),
442 key: key.to_string(),
443 value: new_value.clone(),
444 metadata: meta_vec,
445 })?;
446 self.runtime
447 .inner
448 .kv_tag_index
449 .replace(collection, key, output.id, &[]);
450
451 self.runtime.record_kv_watch_event(
452 if current.is_some() {
453 crate::replication::cdc::ChangeOperation::Update
454 } else {
455 crate::replication::cdc::ChangeOperation::Insert
456 },
457 collection,
458 key,
459 output.id.raw(),
460 current
461 .as_ref()
462 .map(crate::presentation::entity_json::storage_value_to_json),
463 Some(crate::presentation::entity_json::storage_value_to_json(
464 &new_value,
465 )),
466 );
467
468 self.runtime.inner.kv_stats.incr_cas_success();
469 Ok((true, current))
470 }
471
472 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473 self.runtime
474 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475 self.runtime.check_kv_invalidate_policy(collection)?;
476 self.ensure_kv_collection(collection)?;
477 let entries = self
478 .runtime
479 .inner
480 .kv_tag_index
481 .entries_for_tags(collection, tags);
482 if entries.is_empty() {
483 return Ok(0);
484 }
485
486 let store = self.runtime.inner.db.store();
487 let mut removed = 0usize;
488 for (key, id) in entries {
489 let before = store
490 .get(collection, id)
491 .and_then(|entity| kv_value_from_entity(&entity));
492 let deleted = store
493 .delete(collection, id)
494 .map_err(|err| RedDBError::Internal(err.to_string()))?;
495 if deleted {
496 store.context_index().remove_entity(id);
497 self.runtime.inner.kv_tag_index.remove(collection, &key);
498 self.runtime.record_kv_watch_event(
499 crate::replication::cdc::ChangeOperation::Delete,
500 collection,
501 &key,
502 id.raw(),
503 before
504 .as_ref()
505 .map(crate::presentation::entity_json::storage_value_to_json),
506 None,
507 );
508 removed += 1;
509 }
510 }
511 if removed > 0 {
512 self.runtime.inner.kv_stats.incr_deletes();
513 }
514 Ok(removed)
515 }
516
517 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518 self.runtime
519 .inner
520 .kv_tag_index
521 .tags_for_key(collection, key)
522 }
523
524 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527 }
528
529 fn ensure_keyed_collection(
530 &self,
531 model: crate::catalog::CollectionModel,
532 collection: &str,
533 ) -> RedDBResult<()> {
534 let store = self.runtime.inner.db.store();
535 if store.get_collection(collection).is_some() {
536 return self.ensure_declared_model(model, collection);
537 }
538 if model != crate::catalog::CollectionModel::Kv {
539 return Err(RedDBError::NotFound(format!(
540 "{} collection '{collection}' does not exist",
541 keyed_model_name(model)
542 )));
543 }
544 let auto_create = self
546 .runtime
547 .config_bool("red.config.kv.default_collection", true);
548 if !auto_create {
549 return Err(RedDBError::NotFound(format!(
550 "kv collection '{collection}' does not exist and auto-create is disabled \
551 (red.config.kv.default_collection = false)"
552 )));
553 }
554 store
555 .create_collection(collection)
556 .map_err(|err| RedDBError::Internal(err.to_string()))?;
557 self.runtime
558 .inner
559 .db
560 .save_collection_contract(kv_collection_contract(collection))
561 .map_err(|err| RedDBError::Internal(err.to_string()))?;
562 Ok(())
563 }
564
565 fn get_entry(
566 &self,
567 model: crate::catalog::CollectionModel,
568 collection: &str,
569 key: &str,
570 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571 let Some(entity) = self.get_entity(model, collection, key)? else {
572 return Ok(None);
573 };
574 Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575 }
576
577 fn get_entity(
578 &self,
579 model: crate::catalog::CollectionModel,
580 collection: &str,
581 key: &str,
582 ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583 self.ensure_declared_model(model, collection)?;
584 let store = self.runtime.inner.db.store();
585 let Some(manager) = store.get_collection(collection) else {
586 return Ok(None);
587 };
588 let entities = manager.query_all(|_| true);
589 for entity in entities {
590 if let crate::storage::EntityData::Row(ref row) = entity.data {
591 if let Some(ref named) = row.named {
592 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593 if &**k == key {
594 return Ok(Some(entity));
595 }
596 }
597 }
598 }
599 }
600 Ok(None)
601 }
602
603 fn latest_kv_entries(
604 &self,
605 collection: &str,
606 prefix: Option<&str>,
607 ) -> RedDBResult<Vec<(String, crate::storage::UnifiedEntity)>> {
608 self.ensure_declared_model(crate::catalog::CollectionModel::Kv, collection)?;
609 let store = self.runtime.inner.db.store();
610 let Some(manager) = store.get_collection(collection) else {
611 return Ok(Vec::new());
612 };
613 let mut entries: std::collections::BTreeMap<String, crate::storage::UnifiedEntity> =
614 std::collections::BTreeMap::new();
615 for entity in manager.query_all(|_| true) {
616 let key = match &entity.data {
617 crate::storage::EntityData::Row(row) => row
618 .named
619 .as_ref()
620 .and_then(|named| named.get("key"))
621 .and_then(|value| match value {
622 crate::storage::schema::Value::Text(key) => Some(key.to_string()),
623 _ => None,
624 }),
625 _ => None,
626 };
627 let Some(key) = key else {
628 continue;
629 };
630 if prefix.is_some_and(|prefix| !key.starts_with(prefix)) {
631 continue;
632 }
633 let should_replace = match entries.get(&key) {
634 Some(existing) => entity.id.raw() >= existing.id.raw(),
635 None => true,
636 };
637 if should_replace {
638 entries.insert(key, entity);
639 }
640 }
641 Ok(entries.into_iter().collect())
642 }
643
644 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
645 self.vault_versions(collection, key)
646 .map(super::keyed_spine::latest_version)
647 }
648
649 fn get_vault_entry_version(
650 &self,
651 collection: &str,
652 key: &str,
653 version: i64,
654 ) -> RedDBResult<Option<VaultEntry>> {
655 Ok(self
656 .vault_versions(collection, key)?
657 .into_iter()
658 .find(|entry| entry.version == version))
659 }
660
661 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
662 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
663 let store = self.runtime.inner.db.store();
664 let Some(manager) = store.get_collection(collection) else {
665 return Ok(Vec::new());
666 };
667 let entities = manager.query_all(|_| true);
668 let mut versions = Vec::new();
669 for entity in entities {
670 let crate::storage::EntityData::Row(ref row) = entity.data else {
671 continue;
672 };
673 let Some(version) =
674 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
675 else {
676 continue;
677 };
678 if version.key != key {
679 continue;
680 }
681 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
682 versions.push(VaultEntry::from_keyed_row(
683 version,
684 metadata,
685 entity.created_at,
686 entity.updated_at,
687 entity.sequence_id,
688 ));
689 }
690 Ok(versions)
691 }
692
693 fn latest_vault_entries(
694 &self,
695 collection: &str,
696 prefix: Option<&str>,
697 ) -> RedDBResult<Vec<VaultEntry>> {
698 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
699 let store = self.runtime.inner.db.store();
700 let Some(manager) = store.get_collection(collection) else {
701 return Ok(Vec::new());
702 };
703 let mut versions = Vec::new();
704 for entity in manager.query_all(|_| true) {
705 let crate::storage::EntityData::Row(ref row) = entity.data else {
706 continue;
707 };
708 let Some(version) =
709 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
710 else {
711 continue;
712 };
713 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
714 let entry = VaultEntry::from_keyed_row(
715 version,
716 metadata,
717 entity.created_at,
718 entity.updated_at,
719 entity.sequence_id,
720 );
721 versions.push(entry);
722 }
723 Ok(super::keyed_spine::latest_versions(versions, prefix))
724 }
725
726 fn append_vault_version(
727 &self,
728 collection: &str,
729 key: &str,
730 value: crate::storage::schema::Value,
731 op: &str,
732 tombstone: bool,
733 tags: &[String],
734 ) -> RedDBResult<VaultEntry> {
735 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
736 let version = self
737 .get_vault_entry(collection, key)?
738 .map(|entry| entry.version)
739 .unwrap_or(0)
740 + 1;
741 let stored_value = if tombstone {
742 crate::storage::schema::Value::Null
743 } else {
744 self.runtime.seal_vault_value(collection, value)?
745 };
746 let now = current_unix_ms() as i64;
747 let fields = vec![
748 (
749 "key".to_string(),
750 crate::storage::schema::Value::text(key.to_string()),
751 ),
752 ("value".to_string(), stored_value),
753 (
754 "version".to_string(),
755 crate::storage::schema::Value::Integer(version),
756 ),
757 (
758 "tombstone".to_string(),
759 crate::storage::schema::Value::Boolean(tombstone),
760 ),
761 (
762 "op".to_string(),
763 crate::storage::schema::Value::text(op.to_string()),
764 ),
765 (
766 "created_at_ms".to_string(),
767 crate::storage::schema::Value::Integer(now),
768 ),
769 ];
770 let mut row = crate::storage::RowData::new(Vec::new());
771 row.named = Some(fields.into_iter().collect());
772 let entity = crate::storage::UnifiedEntity::new(
773 crate::storage::EntityId::new(0),
774 crate::storage::EntityKind::TableRow {
775 table: std::sync::Arc::from(collection),
776 row_id: 0,
777 },
778 crate::storage::EntityData::Row(row),
779 );
780 let id = self
781 .runtime
782 .inner
783 .db
784 .store()
785 .insert(collection, entity)
786 .map_err(|err| RedDBError::Internal(err.to_string()))?;
787 if !tags.is_empty() {
788 self.runtime
789 .inner
790 .db
791 .store()
792 .set_metadata(
793 collection,
794 id,
795 Metadata::with_fields(vault_tags_metadata(tags)),
796 )
797 .map_err(|err| RedDBError::Internal(err.to_string()))?;
798 self.runtime
799 .inner
800 .kv_tag_index
801 .replace(collection, key, id, tags);
802 }
803 self.get_vault_entry_version(collection, key, version)?
804 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
805 }
806
807 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
808 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
809 let versions = self.vault_versions(collection, key)?;
810 let store = self.runtime.inner.db.store();
811 let mut purged = 0usize;
812 for entry in versions {
813 if store
814 .delete(collection, entry.id)
815 .map_err(|err| RedDBError::Internal(err.to_string()))?
816 {
817 store.context_index().remove_entity(entry.id);
818 purged += 1;
819 }
820 }
821 Ok(purged)
822 }
823
824 fn ensure_declared_model(
825 &self,
826 model: crate::catalog::CollectionModel,
827 collection: &str,
828 ) -> RedDBResult<()> {
829 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
830 return Ok(());
831 };
832 if contract.declared_model == model
833 || contract.declared_model == crate::catalog::CollectionModel::Mixed
834 {
835 return Ok(());
836 }
837 Err(RedDBError::InvalidOperation(format!(
838 "collection '{}' is declared as '{}' and does not allow '{}' operations",
839 collection,
840 keyed_model_name(contract.declared_model),
841 keyed_model_name(model)
842 )))
843 }
844}
845
846impl RedDBRuntime {
847 pub(crate) fn seal_vault_value(
848 &self,
849 collection: &str,
850 value: crate::storage::schema::Value,
851 ) -> RedDBResult<crate::storage::schema::Value> {
852 let key = self.vault_encryption_key(collection)?;
853 let plaintext = value.to_bytes();
854 let nonce_bytes = crate::auth::store::random_bytes(12);
855 let mut nonce = [0u8; 12];
856 nonce.copy_from_slice(&nonce_bytes[..12]);
857 let aad = format!("reddb.vault.{collection}");
858 let ciphertext =
859 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
860 let mut payload = Vec::with_capacity(12 + ciphertext.len());
861 payload.extend_from_slice(&nonce);
862 payload.extend_from_slice(&ciphertext);
863 Ok(crate::storage::schema::Value::Secret(payload))
864 }
865
866 fn vault_key_available(&self, collection: &str) -> bool {
867 self.vault_encryption_key(collection).is_ok()
868 }
869
870 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
871 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
872 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
873 })?;
874 if !auth_store.is_vault_backed() {
875 return Err(RedDBError::Query(
876 "vault sealed_unavailable: key provider is sealed".to_string(),
877 ));
878 }
879
880 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
881 return decode_vault_key(&hex_key);
882 }
883 auth_store.vault_secret_key().ok_or_else(|| {
884 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
885 })
886 }
887
888 fn unseal_vault_value(
889 &self,
890 collection: &str,
891 sealed: &crate::storage::schema::Value,
892 ) -> RedDBResult<crate::storage::schema::Value> {
893 let crate::storage::schema::Value::Secret(payload) = sealed else {
894 return Err(RedDBError::Query(
895 "vault unseal failed: stored value is not sealed".to_string(),
896 ));
897 };
898 if payload.len() < 12 {
899 return Err(RedDBError::Query(
900 "vault unseal failed: sealed payload is truncated".to_string(),
901 ));
902 }
903 let key = self.vault_encryption_key(collection)?;
904 let mut nonce = [0u8; 12];
905 nonce.copy_from_slice(&payload[..12]);
906 let aad = format!("reddb.vault.{collection}");
907 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
908 &key,
909 &nonce,
910 aad.as_bytes(),
911 &payload[12..],
912 )
913 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
914 let (value, consumed) =
915 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
916 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
917 })?;
918 if consumed != plaintext.len() {
919 return Err(RedDBError::Query(
920 "vault unseal failed: trailing plaintext bytes".to_string(),
921 ));
922 }
923 Ok(value)
924 }
925
926 pub(crate) fn peek_vault_unsealed(
932 &self,
933 collection: &str,
934 key: &str,
935 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
936 let ops = KvAtomicOps::new(self);
937 let Some(entry) = ops.get_vault_entry(collection, key)? else {
938 return Ok(None);
939 };
940 if entry.tombstone {
941 return Ok(None);
942 }
943 Ok(self.unseal_vault_value(collection, &entry.value).ok())
944 }
945
946 fn vault_target_resource(collection: &str, key: &str) -> String {
947 if collection == "red.vault" {
948 return format!("red.vault/{}", key.to_ascii_lowercase());
949 }
950 format!("{collection}.{key}")
951 }
952
953 fn current_vault_actor() -> String {
954 current_auth_identity()
955 .map(|(principal, _)| principal)
956 .unwrap_or_else(|| "anonymous".to_string())
957 }
958
959 fn vault_request_id() -> String {
960 let conn_id = current_connection_id();
961 if conn_id == 0 {
962 "embedded".to_string()
963 } else {
964 format!("conn-{conn_id}")
965 }
966 }
967
968 fn check_vault_capability(
969 &self,
970 action: &str,
971 collection: &str,
972 key: &str,
973 ) -> Result<(), String> {
974 let Some(auth_store) = self.inner.auth_store.read().clone() else {
975 return Ok(());
976 };
977 if !auth_store.iam_authorization_enabled() {
978 return Ok(());
979 }
980 let Some((principal, role)) = current_auth_identity() else {
981 return Err(
982 "IAM authorization is enabled; vault capability check requires an authenticated principal"
983 .to_string(),
984 );
985 };
986 let tenant = current_tenant();
987 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
988 let mut resource = crate::auth::policies::ResourceRef::new(
989 "vault",
990 Self::vault_target_resource(collection, key),
991 );
992 if let Some(ref tenant) = tenant {
993 resource = resource.with_tenant(tenant.clone());
994 }
995 let ctx = crate::auth::policies::EvalContext {
996 principal_tenant: tenant.clone(),
997 current_tenant: tenant,
998 peer_ip: None,
999 mfa_present: false,
1000 now_ms: crate::utils::now_unix_millis() as u128,
1001 principal_is_admin_role: role == crate::auth::Role::Admin,
1002 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1003 principal_is_platform_scoped: principal_id.tenant.is_none(),
1004 };
1005 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1006 Ok(())
1007 } else {
1008 Err(format!(
1009 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
1010 principal,
1011 action,
1012 Self::vault_target_resource(collection, key)
1013 ))
1014 }
1015 }
1016
1017 fn check_system_vault_capability(
1018 &self,
1019 action: &str,
1020 collection: &str,
1021 key: &str,
1022 ) -> Result<(), String> {
1023 if collection != "red.vault" {
1024 return Ok(());
1025 }
1026 self.check_vault_capability(action, collection, key)
1027 }
1028
1029 fn audit_vault_unseal(
1030 &self,
1031 collection: &str,
1032 key: &str,
1033 outcome: crate::runtime::audit_log::Outcome,
1034 reason: &str,
1035 entry: Option<&VaultEntry>,
1036 ) {
1037 let actor = Self::current_vault_actor();
1038 let request_id = Self::vault_request_id();
1039 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
1040 .principal(actor.clone())
1041 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1042 .resource(format!(
1043 "vault:{}",
1044 Self::vault_target_resource(collection, key)
1045 ))
1046 .outcome(outcome)
1047 .correlation_id(request_id.clone())
1048 .fields([
1049 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1050 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1051 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1052 crate::runtime::audit_log::AuditFieldEscaper::field(
1053 "target",
1054 Self::vault_target_resource(collection, key),
1055 ),
1056 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1057 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1058 crate::runtime::audit_log::AuditFieldEscaper::field(
1059 "connection_id",
1060 current_connection_id(),
1061 ),
1062 ]);
1063 if let Some(tenant) = current_tenant() {
1064 builder = builder.tenant(tenant);
1065 }
1066 if let Some(entry) = entry {
1067 builder = builder.fields([
1068 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1069 crate::runtime::audit_log::AuditFieldEscaper::field(
1070 "sequence_id",
1071 entry.sequence_id,
1072 ),
1073 ]);
1074 }
1075 self.audit_log().record_event(builder.build());
1076 }
1077
1078 fn audit_vault_lifecycle(
1079 &self,
1080 operation: &str,
1081 collection: &str,
1082 key: &str,
1083 outcome: crate::runtime::audit_log::Outcome,
1084 reason: &str,
1085 entry: Option<&VaultEntry>,
1086 ) {
1087 let actor = Self::current_vault_actor();
1088 let request_id = Self::vault_request_id();
1089 let mut builder =
1090 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1091 .principal(actor.clone())
1092 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1093 .resource(format!(
1094 "vault:{}",
1095 Self::vault_target_resource(collection, key)
1096 ))
1097 .outcome(outcome)
1098 .correlation_id(request_id.clone())
1099 .fields([
1100 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1101 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1102 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1103 crate::runtime::audit_log::AuditFieldEscaper::field(
1104 "target",
1105 Self::vault_target_resource(collection, key),
1106 ),
1107 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1108 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1109 crate::runtime::audit_log::AuditFieldEscaper::field(
1110 "connection_id",
1111 current_connection_id(),
1112 ),
1113 ]);
1114 if let Some(tenant) = current_tenant() {
1115 builder = builder.tenant(tenant);
1116 }
1117 if let Some(entry) = entry {
1118 builder = builder.fields([
1119 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1120 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1121 crate::runtime::audit_log::AuditFieldEscaper::field(
1122 "sequence_id",
1123 entry.sequence_id,
1124 ),
1125 ]);
1126 }
1127 self.audit_log().record_event(builder.build());
1128 }
1129
1130 fn emit_vault_control_event(
1131 &self,
1132 kind: crate::runtime::control_events::EventKind,
1133 outcome: crate::runtime::control_events::Outcome,
1134 action: &'static str,
1135 collection: &str,
1136 key: &str,
1137 reason: &str,
1138 entry: Option<&VaultEntry>,
1139 extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1140 ) -> RedDBResult<()> {
1141 use crate::runtime::control_events::{
1142 ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1143 };
1144 use std::borrow::Cow;
1145
1146 let tenant = current_tenant();
1147 let principal = current_auth_identity().map(|(principal, _)| principal);
1148 let actor_user = principal
1149 .as_ref()
1150 .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1151 let request_id = Self::vault_request_id();
1152 let actor = actor_user
1153 .as_ref()
1154 .map(ActorRef::User)
1155 .unwrap_or(ActorRef::Anonymous);
1156 let ctx = ControlEventCtx {
1157 actor,
1158 scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1159 request_id: Some(Cow::Borrowed(request_id.as_str())),
1160 trace_id: None,
1161 };
1162
1163 let target = Self::vault_target_resource(collection, key);
1164 let mut fields = std::collections::HashMap::new();
1165 fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1166 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1167 fields.insert("key".to_string(), Sensitivity::raw(key));
1168 fields.insert(
1169 "connection_id".to_string(),
1170 Sensitivity::raw(current_connection_id().to_string()),
1171 );
1172 if let Some(entry) = entry {
1173 fields.insert(
1174 "entity_id".to_string(),
1175 Sensitivity::raw(entry.id.raw().to_string()),
1176 );
1177 fields.insert(
1178 "sequence_id".to_string(),
1179 Sensitivity::raw(entry.sequence_id.to_string()),
1180 );
1181 fields.insert(
1182 "version".to_string(),
1183 Sensitivity::raw(entry.version.to_string()),
1184 );
1185 fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1186 fields.insert(
1187 "tombstone".to_string(),
1188 Sensitivity::raw(entry.tombstone.to_string()),
1189 );
1190 if !entry.tombstone {
1191 fields.insert(
1192 "fingerprint".to_string(),
1193 Sensitivity::raw(vault_fingerprint(&entry.value)),
1194 );
1195 }
1196 fields.insert(
1197 "tags".to_string(),
1198 Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1199 );
1200 }
1201 for (key, value) in extra_fields {
1202 fields.insert(key, value);
1203 }
1204
1205 let event = ControlEvent {
1206 kind,
1207 outcome,
1208 action: Cow::Borrowed(action),
1209 resource: Some(format!("vault:{target}")),
1210 reason: Some(reason.to_string()),
1211 matched_policy_id: None,
1212 fields,
1213 };
1214 let ledger = self.inner.control_event_ledger.read();
1215 match ledger.emit(&ctx, event) {
1216 Ok(_) => Ok(()),
1217 Err(err) if self.inner.control_event_config.require_persistence() => {
1218 Err(RedDBError::Internal(err.to_string()))
1219 }
1220 Err(_) => Ok(()),
1221 }
1222 }
1223
1224 pub(crate) fn resolve_vault_secret_value(
1225 &self,
1226 collection: &str,
1227 key: &str,
1228 ) -> RedDBResult<Value> {
1229 let ops = KvAtomicOps::new(self);
1230 let entry = ops.get_vault_entry(collection, key)?;
1231 if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1232 self.audit_vault_unseal(
1233 collection,
1234 key,
1235 crate::runtime::audit_log::Outcome::Denied,
1236 &reason,
1237 entry.as_ref(),
1238 );
1239 self.emit_vault_control_event(
1240 crate::runtime::control_events::EventKind::VaultRead,
1241 crate::runtime::control_events::Outcome::Denied,
1242 "vault:read",
1243 collection,
1244 key,
1245 &reason,
1246 entry.as_ref(),
1247 Vec::new(),
1248 )?;
1249 return Err(RedDBError::Query(reason));
1250 }
1251 let Some(entry) = entry else {
1252 let reason = "not_found";
1253 self.audit_vault_unseal(
1254 collection,
1255 key,
1256 crate::runtime::audit_log::Outcome::Denied,
1257 reason,
1258 None,
1259 );
1260 self.emit_vault_control_event(
1261 crate::runtime::control_events::EventKind::VaultRead,
1262 crate::runtime::control_events::Outcome::Denied,
1263 "vault:read",
1264 collection,
1265 key,
1266 reason,
1267 None,
1268 Vec::new(),
1269 )?;
1270 return Err(RedDBError::NotFound(format!(
1271 "vault secret '{}.{}' not found",
1272 collection, key
1273 )));
1274 };
1275 if entry.tombstone {
1276 let reason = "deleted";
1277 self.audit_vault_unseal(
1278 collection,
1279 key,
1280 crate::runtime::audit_log::Outcome::Denied,
1281 reason,
1282 Some(&entry),
1283 );
1284 self.emit_vault_control_event(
1285 crate::runtime::control_events::EventKind::VaultRead,
1286 crate::runtime::control_events::Outcome::Denied,
1287 "vault:read",
1288 collection,
1289 key,
1290 reason,
1291 Some(&entry),
1292 Vec::new(),
1293 )?;
1294 return Err(RedDBError::NotFound(format!(
1295 "vault secret '{}.{}' is deleted",
1296 collection, key
1297 )));
1298 }
1299 match self.unseal_vault_value(collection, &entry.value) {
1300 Ok(value) => {
1301 self.audit_vault_unseal(
1302 collection,
1303 key,
1304 crate::runtime::audit_log::Outcome::Success,
1305 "ok",
1306 Some(&entry),
1307 );
1308 self.emit_vault_control_event(
1309 crate::runtime::control_events::EventKind::VaultRead,
1310 crate::runtime::control_events::Outcome::Allowed,
1311 "vault:read",
1312 collection,
1313 key,
1314 "ok",
1315 Some(&entry),
1316 Vec::new(),
1317 )?;
1318 Ok(value)
1319 }
1320 Err(err) => {
1321 let reason = err.to_string();
1322 self.audit_vault_unseal(
1323 collection,
1324 key,
1325 crate::runtime::audit_log::Outcome::Error,
1326 &reason,
1327 Some(&entry),
1328 );
1329 self.emit_vault_control_event(
1330 crate::runtime::control_events::EventKind::VaultRead,
1331 crate::runtime::control_events::Outcome::Error,
1332 "vault:read",
1333 collection,
1334 key,
1335 &reason,
1336 Some(&entry),
1337 Vec::new(),
1338 )?;
1339 Err(err)
1340 }
1341 }
1342 }
1343
1344 pub fn execute_kv_command(
1346 &self,
1347 raw_query: &str,
1348 cmd: &crate::storage::query::ast::KvCommand,
1349 ) -> RedDBResult<RuntimeQueryResult> {
1350 use crate::storage::query::ast::KvCommand;
1351
1352 let ops = KvAtomicOps::new(self);
1353
1354 match cmd {
1355 KvCommand::Put {
1356 model,
1357 collection,
1358 key,
1359 value,
1360 ttl_ms,
1361 tags,
1362 if_not_exists,
1363 } => {
1364 if *model == crate::catalog::CollectionModel::Vault {
1365 self.check_system_vault_capability("vault:write", collection, key)
1366 .map_err(RedDBError::Query)?;
1367 }
1368 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1369 let (created, id) = ops.set_with_tags_for_model(
1370 *model,
1371 collection,
1372 key,
1373 value.clone(),
1374 *ttl_ms,
1375 tags,
1376 *if_not_exists,
1377 )?;
1378
1379 let mut result = UnifiedResult::with_columns(vec![
1380 "ok".into(),
1381 "collection".into(),
1382 "key".into(),
1383 "id".into(),
1384 "created".into(),
1385 "tags".into(),
1386 ]);
1387 let mut record = UnifiedRecord::new();
1388 record.set("ok", Value::Boolean(true));
1389 record.set("collection", Value::text(collection.clone()));
1390 record.set("key", Value::text(key.clone()));
1391 record.set("id", Value::Integer(id.raw() as i64));
1392 record.set("created", Value::Boolean(created));
1393 record.set("tags", kv_tags_value(tags));
1394 result.push(record);
1395
1396 Ok(RuntimeQueryResult {
1397 query: raw_query.to_string(),
1398 mode: crate::storage::query::modes::QueryMode::Sql,
1399 statement: if *model == crate::catalog::CollectionModel::Vault {
1400 "vault_put"
1401 } else {
1402 "kv_put"
1403 },
1404 engine: if *model == crate::catalog::CollectionModel::Vault {
1405 "vault"
1406 } else {
1407 "kv"
1408 },
1409 result,
1410 affected_rows: 1,
1411 statement_type: if created { "insert" } else { "update" },
1412 bookmark: None,
1413 })
1414 }
1415 KvCommand::InvalidateTags { collection, tags } => {
1416 let invalidated = ops.invalidate_tags(collection, tags)?;
1417
1418 let mut result = UnifiedResult::with_columns(vec![
1419 "ok".into(),
1420 "collection".into(),
1421 "invalidated".into(),
1422 "tags".into(),
1423 ]);
1424 let mut record = UnifiedRecord::new();
1425 record.set("ok", Value::Boolean(true));
1426 record.set("collection", Value::text(collection.clone()));
1427 record.set("invalidated", Value::Integer(invalidated as i64));
1428 record.set("tags", kv_tags_value(tags));
1429 result.push(record);
1430
1431 Ok(RuntimeQueryResult {
1432 query: raw_query.to_string(),
1433 mode: crate::storage::query::modes::QueryMode::Sql,
1434 statement: "kv_invalidate_tags",
1435 engine: "kv",
1436 result,
1437 affected_rows: invalidated as u64,
1438 statement_type: "delete",
1439 bookmark: None,
1440 })
1441 }
1442
1443 KvCommand::Rotate {
1444 collection,
1445 key,
1446 value,
1447 tags,
1448 } => {
1449 self.check_system_vault_capability("vault:write", collection, key)
1450 .map_err(RedDBError::Query)?;
1451 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1452 let entry = ops.append_vault_version(
1453 collection,
1454 key,
1455 value.clone(),
1456 "rotate",
1457 false,
1458 tags,
1459 )?;
1460 self.record_kv_watch_event(
1461 crate::replication::cdc::ChangeOperation::Update,
1462 collection,
1463 key,
1464 entry.id.raw(),
1465 None,
1466 Some(vault_entry_metadata_json(&entry)),
1467 );
1468 self.audit_vault_lifecycle(
1469 "rotate",
1470 collection,
1471 key,
1472 crate::runtime::audit_log::Outcome::Success,
1473 "ok",
1474 Some(&entry),
1475 );
1476 self.emit_vault_control_event(
1477 crate::runtime::control_events::EventKind::VaultRotate,
1478 crate::runtime::control_events::Outcome::Allowed,
1479 "vault:rotate",
1480 collection,
1481 key,
1482 "ok",
1483 Some(&entry),
1484 Vec::new(),
1485 )?;
1486 Ok(vault_write_result(
1487 raw_query,
1488 "vault_rotate",
1489 "update",
1490 collection,
1491 key,
1492 &entry,
1493 1,
1494 ))
1495 }
1496
1497 KvCommand::List {
1498 model,
1499 collection,
1500 prefix,
1501 limit,
1502 offset,
1503 as_json,
1504 } => {
1505 if *model == crate::catalog::CollectionModel::Vault {
1506 if *as_json {
1507 return Err(RedDBError::Query(
1508 "LIST VAULT AS JSON is not supported; vault list only exposes metadata"
1509 .to_string(),
1510 ));
1511 }
1512 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1513 entries.sort_by(|left, right| left.key.cmp(&right.key));
1514 let mut result = UnifiedResult::with_columns(vec![
1515 "collection".into(),
1516 "key".into(),
1517 "version".into(),
1518 "fingerprint".into(),
1519 "tags".into(),
1520 "created_at".into(),
1521 "updated_at".into(),
1522 "status".into(),
1523 "tombstone".into(),
1524 "op".into(),
1525 ]);
1526 let mut visible = Vec::new();
1527 for entry in entries {
1528 match self.check_vault_capability(
1529 "vault:read_metadata",
1530 collection,
1531 &entry.key,
1532 ) {
1533 Ok(()) => {
1534 self.emit_vault_control_event(
1535 crate::runtime::control_events::EventKind::VaultMetadataRead,
1536 crate::runtime::control_events::Outcome::Allowed,
1537 "vault:read_metadata",
1538 collection,
1539 &entry.key,
1540 "ok",
1541 Some(&entry),
1542 Vec::new(),
1543 )?;
1544 visible.push(entry);
1545 }
1546 Err(reason) => {
1547 self.emit_vault_control_event(
1548 crate::runtime::control_events::EventKind::VaultMetadataRead,
1549 crate::runtime::control_events::Outcome::Denied,
1550 "vault:read_metadata",
1551 collection,
1552 &entry.key,
1553 &reason,
1554 Some(&entry),
1555 Vec::new(),
1556 )?;
1557 }
1558 }
1559 }
1560 for entry in visible
1561 .into_iter()
1562 .skip(*offset)
1563 .take(limit.unwrap_or(usize::MAX))
1564 {
1565 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1566 }
1567 Ok(RuntimeQueryResult {
1568 query: raw_query.to_string(),
1569 mode: crate::storage::query::modes::QueryMode::Sql,
1570 statement: "vault_list",
1571 engine: "vault",
1572 result,
1573 affected_rows: 0,
1574 statement_type: "select",
1575 bookmark: None,
1576 })
1577 } else {
1578 let mut result = UnifiedResult::with_columns(vec![
1579 "rid".into(),
1580 "collection".into(),
1581 "kind".into(),
1582 "tenant".into(),
1583 "created_at".into(),
1584 "updated_at".into(),
1585 "key".into(),
1586 "value".into(),
1587 "tags".into(),
1588 ]);
1589 let entries = ops.latest_kv_entries(collection, prefix.as_deref())?;
1590 if *as_json {
1591 let mut tree = crate::json::Value::Object(crate::json::Map::new());
1592 for (key, entity) in entries
1593 .into_iter()
1594 .skip(*offset)
1595 .take(limit.unwrap_or(usize::MAX))
1596 {
1597 let Some(value) = kv_value_from_entity(&entity) else {
1598 continue;
1599 };
1600 let relative = match prefix {
1601 Some(pfx) if key == *pfx => "",
1602 Some(pfx) => key
1603 .strip_prefix(pfx.as_str())
1604 .map(|tail| tail.strip_prefix('.').unwrap_or(tail))
1605 .unwrap_or(key.as_str()),
1606 None => key.as_str(),
1607 };
1608 insert_kv_json_path(
1609 &mut tree,
1610 relative,
1611 crate::presentation::entity_json::storage_value_to_json(&value),
1612 );
1613 }
1614 Ok(kv_list_json_result(raw_query, collection, prefix, tree))
1615 } else {
1616 for (key, entity) in entries
1617 .into_iter()
1618 .skip(*offset)
1619 .take(limit.unwrap_or(usize::MAX))
1620 {
1621 let mut record = UnifiedRecord::new();
1622 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1623 record.set("collection", Value::text(collection.clone()));
1624 record.set("kind", Value::text("kv"));
1625 record.set("tenant", Value::Null);
1626 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1627 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1628 record.set("key", Value::text(key.clone()));
1629 record.set(
1630 "value",
1631 kv_value_from_entity(&entity)
1632 .unwrap_or(crate::storage::schema::Value::Null),
1633 );
1634 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, &key)));
1635 result.push(record);
1636 }
1637 Ok(RuntimeQueryResult {
1638 query: raw_query.to_string(),
1639 mode: crate::storage::query::modes::QueryMode::Sql,
1640 statement: "kv_list",
1641 engine: "kv",
1642 result,
1643 affected_rows: 0,
1644 statement_type: "select",
1645 bookmark: None,
1646 })
1647 }
1648 }
1649 }
1650
1651 KvCommand::History { collection, key } => {
1652 let latest = ops.get_vault_entry(collection, key)?;
1653 if let Err(reason) =
1654 self.check_vault_capability("vault:read_metadata", collection, key)
1655 {
1656 self.emit_vault_control_event(
1657 crate::runtime::control_events::EventKind::VaultMetadataRead,
1658 crate::runtime::control_events::Outcome::Denied,
1659 "vault:read_metadata",
1660 collection,
1661 key,
1662 &reason,
1663 latest.as_ref(),
1664 Vec::new(),
1665 )?;
1666 return Err(RedDBError::Query(reason));
1667 }
1668 let versions =
1669 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1670 let result = vault_history_result(collection, key, &versions);
1671 self.emit_vault_control_event(
1672 crate::runtime::control_events::EventKind::VaultMetadataRead,
1673 crate::runtime::control_events::Outcome::Allowed,
1674 "vault:read_metadata",
1675 collection,
1676 key,
1677 "ok",
1678 latest.as_ref(),
1679 Vec::new(),
1680 )?;
1681 Ok(RuntimeQueryResult {
1682 query: raw_query.to_string(),
1683 mode: crate::storage::query::modes::QueryMode::Sql,
1684 statement: "vault_history",
1685 engine: "vault",
1686 result,
1687 affected_rows: 0,
1688 statement_type: "select",
1689 bookmark: None,
1690 })
1691 }
1692
1693 KvCommand::Purge { collection, key } => {
1694 let entry = ops.get_vault_entry(collection, key)?;
1695 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1696 self.audit_vault_lifecycle(
1697 "purge",
1698 collection,
1699 key,
1700 crate::runtime::audit_log::Outcome::Denied,
1701 &reason,
1702 entry.as_ref(),
1703 );
1704 self.emit_vault_control_event(
1705 crate::runtime::control_events::EventKind::VaultPurge,
1706 crate::runtime::control_events::Outcome::Denied,
1707 "vault:purge",
1708 collection,
1709 key,
1710 &reason,
1711 entry.as_ref(),
1712 Vec::new(),
1713 )?;
1714 return Err(RedDBError::Query(reason));
1715 }
1716 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1717 let purged = ops.purge_vault_versions(collection, key)?;
1718 self.audit_vault_lifecycle(
1719 "purge",
1720 collection,
1721 key,
1722 crate::runtime::audit_log::Outcome::Success,
1723 "ok",
1724 entry.as_ref(),
1725 );
1726 self.emit_vault_control_event(
1727 crate::runtime::control_events::EventKind::VaultPurge,
1728 crate::runtime::control_events::Outcome::Allowed,
1729 "vault:purge",
1730 collection,
1731 key,
1732 "ok",
1733 entry.as_ref(),
1734 vec![(
1735 "purged".to_string(),
1736 crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1737 )],
1738 )?;
1739 let mut result = UnifiedResult::with_columns(vec![
1740 "ok".into(),
1741 "collection".into(),
1742 "key".into(),
1743 "purged".into(),
1744 ]);
1745 let mut record = UnifiedRecord::new();
1746 record.set("ok", Value::Boolean(true));
1747 record.set("collection", Value::text(collection.clone()));
1748 record.set("key", Value::text(key.clone()));
1749 record.set("purged", Value::Integer(purged as i64));
1750 result.push(record);
1751 Ok(RuntimeQueryResult {
1752 query: raw_query.to_string(),
1753 mode: crate::storage::query::modes::QueryMode::Sql,
1754 statement: "vault_purge",
1755 engine: "vault",
1756 result,
1757 affected_rows: purged as u64,
1758 statement_type: "delete",
1759 bookmark: None,
1760 })
1761 }
1762
1763 KvCommand::Get {
1764 model,
1765 collection,
1766 key,
1767 } => {
1768 if *model == crate::catalog::CollectionModel::Vault {
1769 let entry = ops.get_vault_entry(collection, key)?;
1770 if let Err(reason) =
1771 self.check_vault_capability("vault:read_metadata", collection, key)
1772 {
1773 self.emit_vault_control_event(
1774 crate::runtime::control_events::EventKind::VaultMetadataRead,
1775 crate::runtime::control_events::Outcome::Denied,
1776 "vault:read_metadata",
1777 collection,
1778 key,
1779 &reason,
1780 entry.as_ref(),
1781 Vec::new(),
1782 )?;
1783 return Err(RedDBError::Query(reason));
1784 }
1785 let key_available = self.vault_key_available(collection);
1786 let result =
1787 vault_metadata_result(collection, key, entry.as_ref(), key_available);
1788 self.emit_vault_control_event(
1789 crate::runtime::control_events::EventKind::VaultMetadataRead,
1790 crate::runtime::control_events::Outcome::Allowed,
1791 "vault:read_metadata",
1792 collection,
1793 key,
1794 "ok",
1795 entry.as_ref(),
1796 Vec::new(),
1797 )?;
1798 return Ok(RuntimeQueryResult {
1799 query: raw_query.to_string(),
1800 mode: crate::storage::query::modes::QueryMode::Sql,
1801 statement: "vault_get",
1802 engine: "vault",
1803 result,
1804 affected_rows: 0,
1805 statement_type: "select",
1806 bookmark: None,
1807 });
1808 }
1809
1810 let entity = ops.get_entity(*model, collection, key)?;
1811 let value = entity.as_ref().and_then(kv_value_from_entity);
1812 if *model == crate::catalog::CollectionModel::Kv {
1813 self.inner.kv_stats.incr_gets();
1814 }
1815 let mut result = UnifiedResult::with_columns(vec![
1816 "rid".into(),
1817 "collection".into(),
1818 "kind".into(),
1819 "tenant".into(),
1820 "created_at".into(),
1821 "updated_at".into(),
1822 "key".into(),
1823 "value".into(),
1824 "tags".into(),
1825 ]);
1826 let mut record = UnifiedRecord::new();
1827 if let Some(entity) = entity.as_ref() {
1828 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1829 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1830 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1831 } else {
1832 record.set("rid", Value::Null);
1833 record.set("created_at", Value::Null);
1834 record.set("updated_at", Value::Null);
1835 }
1836 record.set("collection", Value::text(collection.clone()));
1837 record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1838 record.set("tenant", Value::Null);
1839 record.set("key", Value::text(key.clone()));
1840 record.set(
1841 "value",
1842 value.unwrap_or(crate::storage::schema::Value::Null),
1843 );
1844 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1845 result.push(record);
1846
1847 Ok(RuntimeQueryResult {
1848 query: raw_query.to_string(),
1849 mode: crate::storage::query::modes::QueryMode::Sql,
1850 statement: "kv_get",
1851 engine: "kv",
1852 result,
1853 affected_rows: 0,
1854 statement_type: "select",
1855 bookmark: None,
1856 })
1857 }
1858 KvCommand::Watch {
1859 model,
1860 collection,
1861 key,
1862 prefix,
1863 from_lsn,
1864 } => {
1865 let watch_key = if *prefix {
1866 format!("{key}.*")
1867 } else {
1868 key.clone()
1869 };
1870 let endpoint = match from_lsn {
1871 Some(lsn) => format!(
1872 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1873 keyed_model_name(*model)
1874 ),
1875 None => format!(
1876 "/collections/{collection}/{}/{watch_key}/watch",
1877 keyed_model_name(*model)
1878 ),
1879 };
1880 let mut result = UnifiedResult::with_columns(vec![
1881 "collection".into(),
1882 "key".into(),
1883 "prefix".into(),
1884 "from_lsn".into(),
1885 "watch_url".into(),
1886 "streaming".into(),
1887 ]);
1888 let mut record = UnifiedRecord::new();
1889 record.set("collection", Value::text(collection.clone()));
1890 record.set("key", Value::text(watch_key));
1891 record.set("prefix", Value::Boolean(*prefix));
1892 record.set(
1893 "from_lsn",
1894 from_lsn
1895 .map(Value::UnsignedInteger)
1896 .unwrap_or(crate::storage::schema::Value::Null),
1897 );
1898 record.set("watch_url", Value::text(endpoint));
1899 record.set("streaming", Value::Boolean(true));
1900 result.push(record);
1901
1902 Ok(RuntimeQueryResult {
1903 query: raw_query.to_string(),
1904 mode: crate::storage::query::modes::QueryMode::Sql,
1905 statement: "kv_watch",
1906 engine: keyed_model_name(*model),
1907 result,
1908 affected_rows: 0,
1909 statement_type: "stream",
1910 bookmark: None,
1911 })
1912 }
1913
1914 KvCommand::Unseal {
1915 collection,
1916 key,
1917 version,
1918 } => {
1919 let latest = ops.get_vault_entry(collection, key)?;
1920 let entry = match version {
1921 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1922 None => latest.clone(),
1923 };
1924 let action = match (version, latest.as_ref()) {
1925 (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1926 (Some(_), _) => "vault:unseal_history",
1927 _ => "vault:read",
1928 };
1929 let event_kind = if action == "vault:read" {
1930 crate::runtime::control_events::EventKind::VaultRead
1931 } else {
1932 crate::runtime::control_events::EventKind::VaultUnseal
1933 };
1934 if let Err(reason) = self.check_vault_capability(action, collection, key) {
1935 self.audit_vault_unseal(
1936 collection,
1937 key,
1938 crate::runtime::audit_log::Outcome::Denied,
1939 &reason,
1940 entry.as_ref(),
1941 );
1942 self.emit_vault_control_event(
1943 event_kind,
1944 crate::runtime::control_events::Outcome::Denied,
1945 action,
1946 collection,
1947 key,
1948 &reason,
1949 entry.as_ref(),
1950 Vec::new(),
1951 )?;
1952 return Err(RedDBError::Query(reason));
1953 }
1954 let Some(entry) = entry else {
1955 let reason = "not_found";
1956 self.audit_vault_unseal(
1957 collection,
1958 key,
1959 crate::runtime::audit_log::Outcome::Denied,
1960 reason,
1961 None,
1962 );
1963 self.emit_vault_control_event(
1964 event_kind,
1965 crate::runtime::control_events::Outcome::Denied,
1966 action,
1967 collection,
1968 key,
1969 reason,
1970 None,
1971 Vec::new(),
1972 )?;
1973 return Err(RedDBError::NotFound(format!(
1974 "vault secret '{}.{}' not found",
1975 collection, key
1976 )));
1977 };
1978 if entry.tombstone {
1979 let reason = "deleted";
1980 self.audit_vault_unseal(
1981 collection,
1982 key,
1983 crate::runtime::audit_log::Outcome::Denied,
1984 reason,
1985 Some(&entry),
1986 );
1987 self.emit_vault_control_event(
1988 event_kind,
1989 crate::runtime::control_events::Outcome::Denied,
1990 action,
1991 collection,
1992 key,
1993 reason,
1994 Some(&entry),
1995 Vec::new(),
1996 )?;
1997 return Err(RedDBError::NotFound(format!(
1998 "vault secret '{}.{}' is deleted",
1999 collection, key
2000 )));
2001 }
2002 match self.unseal_vault_value(collection, &entry.value) {
2003 Ok(value) => {
2004 self.audit_vault_unseal(
2005 collection,
2006 key,
2007 crate::runtime::audit_log::Outcome::Success,
2008 "ok",
2009 Some(&entry),
2010 );
2011 self.emit_vault_control_event(
2012 event_kind,
2013 crate::runtime::control_events::Outcome::Allowed,
2014 action,
2015 collection,
2016 key,
2017 "ok",
2018 Some(&entry),
2019 Vec::new(),
2020 )?;
2021 let mut result = UnifiedResult::with_columns(vec![
2022 "collection".into(),
2023 "key".into(),
2024 "value".into(),
2025 ]);
2026 let mut record = UnifiedRecord::new();
2027 record.set("collection", Value::text(collection.clone()));
2028 record.set("key", Value::text(key.clone()));
2029 record.set("value", value);
2030 result.push(record);
2031 Ok(RuntimeQueryResult {
2032 query: raw_query.to_string(),
2033 mode: crate::storage::query::modes::QueryMode::Sql,
2034 statement: "vault_unseal",
2035 engine: "vault",
2036 result,
2037 affected_rows: 0,
2038 statement_type: "select",
2039 bookmark: None,
2040 })
2041 }
2042 Err(err) => {
2043 let reason = err.to_string();
2044 self.audit_vault_unseal(
2045 collection,
2046 key,
2047 crate::runtime::audit_log::Outcome::Error,
2048 &reason,
2049 Some(&entry),
2050 );
2051 self.emit_vault_control_event(
2052 event_kind,
2053 crate::runtime::control_events::Outcome::Error,
2054 action,
2055 collection,
2056 key,
2057 &reason,
2058 Some(&entry),
2059 Vec::new(),
2060 )?;
2061 Err(err)
2062 }
2063 }
2064 }
2065
2066 KvCommand::Incr {
2067 model,
2068 collection,
2069 key,
2070 by,
2071 ttl_ms,
2072 } => {
2073 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2074 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
2075
2076 let mut result = UnifiedResult::with_columns(vec![
2077 "ok".into(),
2078 "collection".into(),
2079 "key".into(),
2080 "value".into(),
2081 ]);
2082 let mut record = UnifiedRecord::new();
2083 record.set("ok", Value::Boolean(true));
2084 record.set("collection", Value::text(collection.clone()));
2085 record.set("key", Value::text(key.clone()));
2086 record.set("value", Value::Integer(new_value));
2087 result.push(record);
2088
2089 Ok(RuntimeQueryResult {
2090 query: raw_query.to_string(),
2091 mode: crate::storage::query::modes::QueryMode::Sql,
2092 statement: "kv_incr",
2093 engine: "kv",
2094 result,
2095 affected_rows: 1,
2096 statement_type: "update",
2097 bookmark: None,
2098 })
2099 }
2100
2101 KvCommand::Cas {
2102 model,
2103 collection,
2104 key,
2105 expected,
2106 new_value,
2107 ttl_ms,
2108 } => {
2109 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2110 let (ok, current) = ops.cas(
2111 *model,
2112 collection,
2113 key,
2114 expected.as_ref(),
2115 new_value.clone(),
2116 *ttl_ms,
2117 )?;
2118
2119 let mut result = UnifiedResult::with_columns(vec![
2120 "ok".into(),
2121 "collection".into(),
2122 "key".into(),
2123 "current".into(),
2124 ]);
2125 let mut record = UnifiedRecord::new();
2126 record.set("ok", Value::Boolean(ok));
2127 record.set("collection", Value::text(collection.clone()));
2128 record.set("key", Value::text(key.clone()));
2129 record.set(
2130 "current",
2131 current.unwrap_or(crate::storage::schema::Value::Null),
2132 );
2133 result.push(record);
2134
2135 Ok(RuntimeQueryResult {
2136 query: raw_query.to_string(),
2137 mode: crate::storage::query::modes::QueryMode::Sql,
2138 statement: "kv_cas",
2139 engine: "kv",
2140 result,
2141 affected_rows: if ok { 1 } else { 0 },
2142 statement_type: "update",
2143 bookmark: None,
2144 })
2145 }
2146
2147 KvCommand::Delete {
2148 model,
2149 collection,
2150 key,
2151 } => {
2152 if *model == crate::catalog::CollectionModel::Vault {
2153 self.check_system_vault_capability("vault:write", collection, key)
2154 .map_err(RedDBError::Query)?;
2155 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2156 let entry = ops.append_vault_version(
2157 collection,
2158 key,
2159 Value::Null,
2160 "delete",
2161 true,
2162 &[],
2163 )?;
2164 self.record_kv_watch_event(
2165 crate::replication::cdc::ChangeOperation::Delete,
2166 collection,
2167 key,
2168 entry.id.raw(),
2169 None,
2170 Some(vault_entry_metadata_json(&entry)),
2171 );
2172 self.audit_vault_lifecycle(
2173 "delete",
2174 collection,
2175 key,
2176 crate::runtime::audit_log::Outcome::Success,
2177 "ok",
2178 Some(&entry),
2179 );
2180 return Ok(vault_write_result(
2181 raw_query,
2182 "vault_delete",
2183 "delete",
2184 collection,
2185 key,
2186 &entry,
2187 1,
2188 ));
2189 }
2190 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2191 let deleted = ops.delete(*model, collection, key)?;
2192
2193 let mut result = UnifiedResult::with_columns(vec![
2194 "ok".into(),
2195 "collection".into(),
2196 "key".into(),
2197 "deleted".into(),
2198 ]);
2199 let mut record = UnifiedRecord::new();
2200 record.set("ok", Value::Boolean(true));
2201 record.set("collection", Value::text(collection.clone()));
2202 record.set("key", Value::text(key.clone()));
2203 record.set("deleted", Value::Boolean(deleted));
2204 result.push(record);
2205
2206 Ok(RuntimeQueryResult {
2207 query: raw_query.to_string(),
2208 mode: crate::storage::query::modes::QueryMode::Sql,
2209 statement: "kv_delete",
2210 engine: "kv",
2211 result,
2212 affected_rows: if deleted { 1 } else { 0 },
2213 statement_type: "delete",
2214 bookmark: None,
2215 })
2216 }
2217 }
2218 }
2219
2220 pub fn vault_watch_events_since(
2221 &self,
2222 collection: &str,
2223 key: &str,
2224 since_lsn: u64,
2225 max_count: usize,
2226 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2227 self.kv_watch_events_since(collection, key, since_lsn, max_count)
2228 .into_iter()
2229 .filter(|event| {
2230 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2231 .is_ok()
2232 })
2233 .map(vault_filter_watch_event)
2234 .collect()
2235 }
2236
2237 pub fn vault_watch_events_since_prefix(
2238 &self,
2239 collection: &str,
2240 prefix: &str,
2241 since_lsn: u64,
2242 max_count: usize,
2243 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2244 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2245 .into_iter()
2246 .filter(|event| {
2247 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2248 .is_ok()
2249 })
2250 .map(vault_filter_watch_event)
2251 .collect()
2252 }
2253
2254 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2255 let auth_store = match self.inner.auth_store.read().clone() {
2256 Some(store) => store,
2257 None => return Ok(()),
2258 };
2259 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2260 Some(identity) => identity,
2261 None => return Ok(()),
2262 };
2263 if role < crate::auth::Role::Write {
2264 return Err(RedDBError::Query(format!(
2265 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2266 )));
2267 }
2268 if !auth_store.iam_authorization_enabled() {
2269 return Ok(());
2270 }
2271
2272 let tenant = crate::runtime::impl_core::current_tenant();
2273 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2274 let mut resource =
2275 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2276 if let Some(tenant) = tenant.clone() {
2277 resource = resource.with_tenant(tenant);
2278 }
2279 let ctx = crate::auth::policies::EvalContext {
2280 principal_tenant: tenant.clone(),
2281 current_tenant: tenant,
2282 peer_ip: None,
2283 mfa_present: false,
2284 now_ms: current_unix_ms(),
2285 principal_is_admin_role: role == crate::auth::Role::Admin,
2286 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2287 principal_is_platform_scoped: principal.tenant.is_none(),
2288 };
2289 if auth_store.check_policy_authz_with_role(
2290 &principal,
2291 "kv:invalidate",
2292 &resource,
2293 &ctx,
2294 role,
2295 ) {
2296 Ok(())
2297 } else {
2298 Err(RedDBError::Query(format!(
2299 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2300 )))
2301 }
2302 }
2303}
2304
2305fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2306 let ttl_ms = ttl_ms?;
2307 Some(Metadata::with_fields(
2308 [(
2309 "_ttl_ms".to_string(),
2310 if ttl_ms <= i64::MAX as u64 {
2311 MetadataValue::Int(ttl_ms as i64)
2312 } else {
2313 MetadataValue::Timestamp(ttl_ms)
2314 },
2315 )]
2316 .into_iter()
2317 .collect(),
2318 ))
2319}
2320
2321fn vault_write_result(
2322 raw_query: &str,
2323 statement: &'static str,
2324 statement_type: &'static str,
2325 collection: &str,
2326 key: &str,
2327 entry: &VaultEntry,
2328 affected_rows: u64,
2329) -> RuntimeQueryResult {
2330 let mut result = UnifiedResult::with_columns(vec![
2331 "ok".into(),
2332 "collection".into(),
2333 "key".into(),
2334 "version".into(),
2335 "fingerprint".into(),
2336 "tombstone".into(),
2337 "op".into(),
2338 "id".into(),
2339 ]);
2340 let mut record = UnifiedRecord::new();
2341 record.set("ok", Value::Boolean(true));
2342 record.set("collection", Value::text(collection.to_string()));
2343 record.set("key", Value::text(key.to_string()));
2344 record.set("version", Value::Integer(entry.version));
2345 if entry.tombstone {
2346 record.set("fingerprint", Value::Null);
2347 } else {
2348 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2349 }
2350 record.set("tombstone", Value::Boolean(entry.tombstone));
2351 record.set("op", Value::text(entry.op.clone()));
2352 record.set("id", Value::Integer(entry.id.raw() as i64));
2353 result.push(record);
2354 RuntimeQueryResult {
2355 query: raw_query.to_string(),
2356 mode: crate::storage::query::modes::QueryMode::Sql,
2357 statement,
2358 engine: "vault",
2359 result,
2360 affected_rows,
2361 statement_type,
2362 bookmark: None,
2363 }
2364}
2365
2366fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2367 let mut result = UnifiedResult::with_columns(vec![
2368 "collection".into(),
2369 "key".into(),
2370 "version".into(),
2371 "fingerprint".into(),
2372 "tags".into(),
2373 "created_at".into(),
2374 "updated_at".into(),
2375 "status".into(),
2376 "tombstone".into(),
2377 "op".into(),
2378 ]);
2379 for entry in versions {
2380 push_vault_metadata_record(&mut result, collection, key, entry);
2381 }
2382 result
2383}
2384
2385fn push_vault_metadata_record(
2386 result: &mut UnifiedResult,
2387 collection: &str,
2388 key: &str,
2389 entry: &VaultEntry,
2390) {
2391 let mut record = UnifiedRecord::new();
2392 record.set("collection", Value::text(collection.to_string()));
2393 record.set("key", Value::text(key.to_string()));
2394 record.set("version", Value::Integer(entry.version));
2395 if entry.tombstone {
2396 record.set("fingerprint", Value::Null);
2397 record.set("status", Value::text("deleted"));
2398 } else {
2399 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2400 record.set("status", Value::text("sealed"));
2401 }
2402 record.set("tags", vault_tags_value(&entry.metadata));
2403 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2404 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2405 record.set("tombstone", Value::Boolean(entry.tombstone));
2406 record.set("op", Value::text(entry.op.clone()));
2407 result.push(record);
2408}
2409
2410fn vault_metadata_result(
2411 collection: &str,
2412 key: &str,
2413 entry: Option<&VaultEntry>,
2414 key_available: bool,
2415) -> UnifiedResult {
2416 let mut result = UnifiedResult::with_columns(vec![
2417 "collection".into(),
2418 "key".into(),
2419 "version".into(),
2420 "fingerprint".into(),
2421 "tags".into(),
2422 "created_at".into(),
2423 "updated_at".into(),
2424 "value".into(),
2425 "status".into(),
2426 "tombstone".into(),
2427 "op".into(),
2428 ]);
2429 let mut record = UnifiedRecord::new();
2430 record.set("collection", Value::text(collection.to_string()));
2431 record.set("key", Value::text(key.to_string()));
2432 match entry {
2433 Some(entry) => {
2434 record.set("version", Value::Integer(entry.version));
2435 if entry.tombstone {
2436 record.set("fingerprint", Value::Null);
2437 } else {
2438 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2439 }
2440 record.set("tags", vault_tags_value(&entry.metadata));
2441 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2442 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2443 record.set("value", Value::text("***"));
2444 record.set(
2445 "status",
2446 Value::text(if entry.tombstone {
2447 "deleted"
2448 } else if key_available {
2449 "sealed"
2450 } else {
2451 "sealed_unavailable"
2452 }),
2453 );
2454 record.set("tombstone", Value::Boolean(entry.tombstone));
2455 record.set("op", Value::text(entry.op.clone()));
2456 }
2457 None => {
2458 record.set("version", Value::Null);
2459 record.set("fingerprint", Value::Null);
2460 record.set("tags", Value::Array(Vec::new()));
2461 record.set("created_at", Value::Null);
2462 record.set("updated_at", Value::Null);
2463 record.set("value", Value::text(""));
2464 record.set("status", Value::text("missing"));
2465 record.set("tombstone", Value::Boolean(false));
2466 record.set("op", Value::Null);
2467 }
2468 }
2469 result.push(record);
2470 result
2471}
2472
2473fn vault_fingerprint(value: &Value) -> String {
2474 match value {
2475 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2476 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2477 }
2478}
2479
2480fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2481 let mut object = crate::json::Map::new();
2482 object.insert(
2483 "key".to_string(),
2484 crate::json::Value::String(entry.key.clone()),
2485 );
2486 object.insert(
2487 "version".to_string(),
2488 crate::json::Value::Number(entry.version as f64),
2489 );
2490 object.insert(
2491 "fingerprint".to_string(),
2492 if entry.tombstone {
2493 crate::json::Value::Null
2494 } else {
2495 crate::json::Value::String(vault_fingerprint(&entry.value))
2496 },
2497 );
2498 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2499 object.insert(
2500 "actor".to_string(),
2501 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2502 );
2503 object.insert(
2504 "sequence_id".to_string(),
2505 crate::json::Value::Number(entry.sequence_id as f64),
2506 );
2507 object.insert(
2508 "tombstone".to_string(),
2509 crate::json::Value::Bool(entry.tombstone),
2510 );
2511 object.insert(
2512 "op".to_string(),
2513 crate::json::Value::String(entry.op.clone()),
2514 );
2515 crate::json::Value::Object(object)
2516}
2517
2518fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2519 match vault_tags_value(metadata) {
2520 Value::Array(values) => crate::json::Value::Array(
2521 values
2522 .into_iter()
2523 .filter_map(|value| match value {
2524 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2525 _ => None,
2526 })
2527 .collect(),
2528 ),
2529 _ => crate::json::Value::Array(Vec::new()),
2530 }
2531}
2532
2533fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2534 [(
2535 "tags".to_string(),
2536 MetadataValue::Array(
2537 tags.iter()
2538 .map(|tag| MetadataValue::String(tag.clone()))
2539 .collect(),
2540 ),
2541 )]
2542 .into_iter()
2543 .collect()
2544}
2545
2546fn vault_filter_watch_event(
2547 mut event: crate::replication::cdc::KvWatchEvent,
2548) -> crate::replication::cdc::KvWatchEvent {
2549 event.before = event.before.and_then(vault_metadata_json_only);
2550 event.after = event.after.and_then(vault_metadata_json_only);
2551 event
2552}
2553
2554fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2555 let object = value.as_object()?;
2556 let mut out = crate::json::Map::new();
2557 for field in [
2558 "key",
2559 "version",
2560 "fingerprint",
2561 "tags",
2562 "actor",
2563 "sequence_id",
2564 "tombstone",
2565 "op",
2566 ] {
2567 if let Some(value) = object.get(field) {
2568 out.insert(field.to_string(), value.clone());
2569 }
2570 }
2571 Some(crate::json::Value::Object(out))
2572}
2573
2574fn vault_tags_value(metadata: &Metadata) -> Value {
2575 match metadata.get("tags") {
2576 Some(MetadataValue::Array(values)) => Value::Array(
2577 values
2578 .iter()
2579 .filter_map(|value| match value {
2580 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2581 _ => None,
2582 })
2583 .collect(),
2584 ),
2585 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2586 Value::Array(vec![Value::text(tag.clone())])
2587 }
2588 _ => Value::Array(Vec::new()),
2589 }
2590}
2591
2592fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2593 let bytes = hex::decode(hex_key)
2594 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2595 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2596 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2597 })?;
2598 Ok(key)
2599}
2600
2601fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2602 if tags.is_empty() {
2603 return None;
2604 }
2605 let values = tags
2606 .iter()
2607 .map(|tag| MetadataValue::String(tag.clone()))
2608 .collect();
2609 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2610}
2611
2612fn kv_tags_value(tags: &[String]) -> Value {
2613 let json = crate::json::Value::Array(
2614 tags.iter()
2615 .map(|tag| crate::json::Value::String(tag.clone()))
2616 .collect(),
2617 );
2618 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2619}
2620
2621fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2622 if let crate::storage::EntityData::Row(ref row) = entity.data {
2623 if let Some(ref named) = row.named {
2624 return named.get("value").cloned();
2625 }
2626 }
2627 None
2628}
2629
2630fn insert_kv_json_path(root: &mut crate::json::Value, path: &str, value: crate::json::Value) {
2631 let segments: Vec<&str> = path
2632 .split('.')
2633 .filter(|segment| !segment.is_empty())
2634 .collect();
2635 insert_kv_json_segments(root, &segments, value);
2636}
2637
2638fn insert_kv_json_segments(
2639 root: &mut crate::json::Value,
2640 segments: &[&str],
2641 value: crate::json::Value,
2642) {
2643 if segments.is_empty() {
2644 *root = value;
2645 return;
2646 }
2647
2648 if !matches!(root, crate::json::Value::Object(_)) {
2649 *root = crate::json::Value::Object(crate::json::Map::new());
2650 }
2651
2652 let crate::json::Value::Object(map) = root else {
2653 return;
2654 };
2655 if segments.len() == 1 {
2656 map.insert(segments[0].to_string(), value);
2657 return;
2658 }
2659 let entry = map
2660 .entry(segments[0].to_string())
2661 .or_insert_with(|| crate::json::Value::Object(crate::json::Map::new()));
2662 insert_kv_json_segments(entry, &segments[1..], value);
2663}
2664
2665fn kv_list_json_result(
2666 raw_query: &str,
2667 collection: &str,
2668 prefix: &Option<String>,
2669 value: crate::json::Value,
2670) -> RuntimeQueryResult {
2671 let mut result =
2672 UnifiedResult::with_columns(vec!["collection".into(), "prefix".into(), "value".into()]);
2673 let mut record = UnifiedRecord::new();
2674 record.set("collection", Value::text(collection.to_string()));
2675 record.set(
2676 "prefix",
2677 prefix
2678 .as_ref()
2679 .map(|prefix| Value::text(prefix.clone()))
2680 .unwrap_or(Value::Null),
2681 );
2682 record.set("value", Value::Json(value.to_string_compact().into_bytes()));
2683 result.push(record);
2684 RuntimeQueryResult {
2685 query: raw_query.to_string(),
2686 mode: crate::storage::query::modes::QueryMode::Sql,
2687 statement: "kv_list_json",
2688 engine: "kv",
2689 result,
2690 affected_rows: 0,
2691 statement_type: "select",
2692 bookmark: None,
2693 }
2694}
2695
2696fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2697 let now = current_unix_ms();
2698 crate::physical::CollectionContract {
2699 name: name.to_string(),
2700 declared_model: crate::catalog::CollectionModel::Kv,
2701 schema_mode: crate::catalog::SchemaMode::Dynamic,
2702 origin: crate::physical::ContractOrigin::Implicit,
2703 version: 1,
2704 created_at_unix_ms: now,
2705 updated_at_unix_ms: now,
2706 default_ttl_ms: None,
2707 vector_dimension: None,
2708 vector_metric: None,
2709 context_index_fields: Vec::new(),
2710 declared_columns: Vec::new(),
2711 table_def: None,
2712 timestamps_enabled: false,
2713 context_index_enabled: false,
2714 metrics_raw_retention_ms: None,
2715 metrics_rollup_policies: Vec::new(),
2716 metrics_tenant_identity: None,
2717 metrics_namespace: None,
2718 append_only: false,
2719 subscriptions: Vec::new(),
2720 analytics_config: Vec::new(),
2721 session_key: None,
2722 session_gap_ms: None,
2723 retention_duration_ms: None,
2724 analytical_storage: None,
2725 }
2726}
2727
2728fn current_unix_ms() -> u128 {
2729 std::time::SystemTime::now()
2730 .duration_since(std::time::UNIX_EPOCH)
2731 .unwrap_or_default()
2732 .as_millis()
2733}
2734
2735#[cfg(test)]
2736mod tests {
2737 use crate::api::RedDBOptions;
2738 use crate::catalog::CollectionModel;
2739 use crate::runtime::RedDBRuntime;
2740
2741 fn rt() -> RedDBRuntime {
2742 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2743 }
2744
2745 #[test]
2746 fn incr_missing_key_initialises_at_by() {
2747 let r = rt();
2748 let ops = super::KvAtomicOps::new(&r);
2749 let v = ops
2750 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2751 .unwrap();
2752 assert_eq!(v, 5);
2753 }
2754
2755 #[test]
2756 fn kv_runtime_stats_count_public_ops() {
2757 let r = rt();
2758 let ops = super::KvAtomicOps::new(&r);
2759
2760 ops.set(
2761 CollectionModel::Kv,
2762 "kv_default",
2763 "profile",
2764 crate::storage::schema::Value::text("alice"),
2765 None,
2766 false,
2767 )
2768 .unwrap();
2769 ops.get(CollectionModel::Kv, "kv_default", "profile")
2770 .unwrap();
2771 ops.delete(CollectionModel::Kv, "kv_default", "profile")
2772 .unwrap();
2773 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2774 .unwrap();
2775 ops.cas(
2776 CollectionModel::Kv,
2777 "kv_default",
2778 "profile",
2779 None,
2780 crate::storage::schema::Value::text("created"),
2781 None,
2782 )
2783 .unwrap();
2784 ops.cas(
2785 CollectionModel::Kv,
2786 "kv_default",
2787 "profile",
2788 Some(&crate::storage::schema::Value::text("different")),
2789 crate::storage::schema::Value::text("ignored"),
2790 None,
2791 )
2792 .unwrap();
2793
2794 let stats = r.stats().kv;
2795 assert_eq!(stats.puts, 1);
2796 assert_eq!(stats.gets, 1);
2797 assert_eq!(stats.deletes, 1);
2798 assert_eq!(stats.incrs, 1);
2799 assert_eq!(stats.cas_success, 1);
2800 assert_eq!(stats.cas_conflict, 1);
2801 }
2802
2803 #[test]
2804 fn kv_invalidate_tags_removes_matching_entries_only() {
2805 let r = rt();
2806
2807 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2808 .unwrap();
2809
2810 let miss = r
2811 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2812 .unwrap();
2813 assert_eq!(miss.affected_rows, 0);
2814 assert!(matches!(
2815 r.execute_query("KV GET sessions.blob")
2816 .unwrap()
2817 .result
2818 .records[0]
2819 .get("value"),
2820 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2821 ));
2822
2823 let hit = r
2824 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2825 .unwrap();
2826 assert_eq!(hit.affected_rows, 1);
2827 assert!(matches!(
2828 r.execute_query("KV GET sessions.blob")
2829 .unwrap()
2830 .result
2831 .records[0]
2832 .get("value"),
2833 Some(crate::storage::schema::Value::Null)
2834 ));
2835 }
2836
2837 #[test]
2838 fn kv_runtime_stats_count_watch_streams_and_events() {
2839 let r = rt();
2840 let ops = super::KvAtomicOps::new(&r);
2841 assert_eq!(r.stats().kv.watch_streams_active, 0);
2842
2843 {
2844 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2845 assert_eq!(r.stats().kv.watch_streams_active, 1);
2846
2847 ops.set(
2848 CollectionModel::Kv,
2849 "kv_default",
2850 "watched",
2851 crate::storage::schema::Value::Integer(1),
2852 None,
2853 false,
2854 )
2855 .unwrap();
2856 let event = stream.poll_next().expect("watch event");
2857 assert_eq!(event.key, "watched");
2858 assert_eq!(r.stats().kv.watch_events_emitted, 1);
2859
2860 stream.record_drop_count(3);
2861 assert_eq!(r.stats().kv.watch_drops, 3);
2862 }
2863
2864 assert_eq!(r.stats().kv.watch_streams_active, 0);
2865 }
2866
2867 #[test]
2868 fn incr_existing_integer_accumulates() {
2869 let r = rt();
2870 let ops = super::KvAtomicOps::new(&r);
2871 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2872 .unwrap();
2873 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2874 .unwrap();
2875 let v = ops
2876 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2877 .unwrap();
2878 assert_eq!(v, 3);
2879 }
2880
2881 #[test]
2882 fn decr_via_negative_by() {
2883 let r = rt();
2884 let ops = super::KvAtomicOps::new(&r);
2885 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2886 .unwrap();
2887 let v = ops
2888 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2889 .unwrap();
2890 assert_eq!(v, 7);
2891 }
2892
2893 #[test]
2894 fn concurrent_incr_single_key_is_atomic() {
2895 const THREADS: usize = 8;
2896 const ITERS: usize = 1000;
2897
2898 let runtime = std::sync::Arc::new(rt());
2899 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2900 let mut handles = Vec::new();
2901
2902 for _ in 0..THREADS {
2903 let runtime = std::sync::Arc::clone(&runtime);
2904 let barrier = std::sync::Arc::clone(&barrier);
2905 handles.push(std::thread::spawn(move || {
2906 let ops = super::KvAtomicOps::new(&runtime);
2907 barrier.wait();
2908 for _ in 0..ITERS {
2909 ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2910 .unwrap();
2911 }
2912 }));
2913 }
2914
2915 for handle in handles {
2916 handle.join().expect("worker should finish");
2917 }
2918
2919 let ops = super::KvAtomicOps::new(&runtime);
2920 assert_eq!(
2921 ops.get(CollectionModel::Kv, "kv_default", "counter")
2922 .unwrap(),
2923 Some(crate::storage::schema::Value::Integer(
2924 (THREADS * ITERS) as i64
2925 ))
2926 );
2927 }
2928
2929 #[test]
2930 fn incr_on_string_value_returns_error() {
2931 let r = rt();
2932 let ops = super::KvAtomicOps::new(&r);
2933 ops.set(
2934 CollectionModel::Kv,
2935 "kv_default",
2936 "name",
2937 crate::storage::schema::Value::text("alice"),
2938 None,
2939 false,
2940 )
2941 .unwrap();
2942 let err = ops
2943 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2944 .unwrap_err();
2945 assert!(err.to_string().contains("non-integer"));
2946 }
2947
2948 #[test]
2951 fn cas_matching_value_succeeds() {
2952 let r = rt();
2953 let ops = super::KvAtomicOps::new(&r);
2954 ops.set(
2955 CollectionModel::Kv,
2956 "kv_default",
2957 "lock",
2958 crate::storage::schema::Value::text("free"),
2959 None,
2960 false,
2961 )
2962 .unwrap();
2963 let (ok, prev) = ops
2964 .cas(
2965 CollectionModel::Kv,
2966 "kv_default",
2967 "lock",
2968 Some(&crate::storage::schema::Value::text("free")),
2969 crate::storage::schema::Value::text("held"),
2970 None,
2971 )
2972 .unwrap();
2973 assert!(ok);
2974 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2975 assert_eq!(
2977 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2978 Some(crate::storage::schema::Value::text("held"))
2979 );
2980 }
2981
2982 #[test]
2983 fn concurrent_cas_allows_one_success_per_round() {
2984 const THREADS: usize = 8;
2985 const ROUNDS: usize = 100;
2986
2987 let runtime = std::sync::Arc::new(rt());
2988 let ops = super::KvAtomicOps::new(&runtime);
2989 ops.set(
2990 CollectionModel::Kv,
2991 "kv_default",
2992 "cas_counter",
2993 crate::storage::schema::Value::Integer(0),
2994 None,
2995 false,
2996 )
2997 .unwrap();
2998
2999 let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3000 let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3001 let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
3002 let mut handles = Vec::new();
3003
3004 for _ in 0..THREADS {
3005 let runtime = std::sync::Arc::clone(&runtime);
3006 let start_round = std::sync::Arc::clone(&start_round);
3007 let finish_round = std::sync::Arc::clone(&finish_round);
3008 let successes = std::sync::Arc::clone(&successes);
3009 handles.push(std::thread::spawn(move || {
3010 let ops = super::KvAtomicOps::new(&runtime);
3011 for round in 0..ROUNDS {
3012 start_round.wait();
3013 let (ok, _) = ops
3014 .cas(
3015 CollectionModel::Kv,
3016 "kv_default",
3017 "cas_counter",
3018 Some(&crate::storage::schema::Value::Integer(round as i64)),
3019 crate::storage::schema::Value::Integer((round + 1) as i64),
3020 None,
3021 )
3022 .unwrap();
3023 if ok {
3024 successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3025 }
3026 finish_round.wait();
3027 }
3028 }));
3029 }
3030
3031 for handle in handles {
3032 handle.join().expect("worker should finish");
3033 }
3034
3035 assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
3036 assert_eq!(
3037 ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
3038 .unwrap(),
3039 Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
3040 );
3041 }
3042
3043 #[test]
3044 fn cas_mismatching_value_fails() {
3045 let r = rt();
3046 let ops = super::KvAtomicOps::new(&r);
3047 ops.set(
3048 CollectionModel::Kv,
3049 "kv_default",
3050 "lock",
3051 crate::storage::schema::Value::text("free"),
3052 None,
3053 false,
3054 )
3055 .unwrap();
3056 let (ok, current) = ops
3057 .cas(
3058 CollectionModel::Kv,
3059 "kv_default",
3060 "lock",
3061 Some(&crate::storage::schema::Value::text("held")),
3062 crate::storage::schema::Value::text("worker-7"),
3063 None,
3064 )
3065 .unwrap();
3066 assert!(!ok);
3067 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
3068 assert_eq!(
3070 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3071 Some(crate::storage::schema::Value::text("free"))
3072 );
3073 }
3074
3075 #[test]
3076 fn cas_expect_null_on_missing_key_creates() {
3077 let r = rt();
3078 let ops = super::KvAtomicOps::new(&r);
3079 let (ok, prev) = ops
3080 .cas(
3081 CollectionModel::Kv,
3082 "kv_default",
3083 "new_key",
3084 None,
3085 crate::storage::schema::Value::text("created"),
3086 None,
3087 )
3088 .unwrap();
3089 assert!(ok);
3090 assert_eq!(prev, None);
3091 assert_eq!(
3092 ops.get(CollectionModel::Kv, "kv_default", "new_key")
3093 .unwrap(),
3094 Some(crate::storage::schema::Value::text("created"))
3095 );
3096 }
3097
3098 #[test]
3099 fn cas_expect_null_on_existing_key_fails() {
3100 let r = rt();
3101 let ops = super::KvAtomicOps::new(&r);
3102 ops.set(
3103 CollectionModel::Kv,
3104 "kv_default",
3105 "taken",
3106 crate::storage::schema::Value::text("worker-1"),
3107 None,
3108 false,
3109 )
3110 .unwrap();
3111 let (ok, current) = ops
3112 .cas(
3113 CollectionModel::Kv,
3114 "kv_default",
3115 "taken",
3116 None,
3117 crate::storage::schema::Value::text("worker-2"),
3118 None,
3119 )
3120 .unwrap();
3121 assert!(!ok);
3122 assert_eq!(
3123 current,
3124 Some(crate::storage::schema::Value::text("worker-1"))
3125 );
3126 }
3127
3128 #[test]
3129 fn cas_via_sql_roundtrip() {
3130 let r = rt();
3131 r.execute_query("KV PUT lock = 'free'").unwrap();
3133 let res = r
3135 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3136 .unwrap();
3137 let row = &res.result.records[0];
3138 assert_eq!(
3139 row.get("ok"),
3140 Some(&crate::storage::schema::Value::Boolean(true))
3141 );
3142 let res2 = r
3144 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3145 .unwrap();
3146 let row2 = &res2.result.records[0];
3147 assert_eq!(
3148 row2.get("ok"),
3149 Some(&crate::storage::schema::Value::Boolean(false))
3150 );
3151 }
3152
3153 #[test]
3154 fn cas_expect_null_via_sql() {
3155 let r = rt();
3156 let res = r
3157 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
3158 .unwrap();
3159 let row = &res.result.records[0];
3160 assert_eq!(
3161 row.get("ok"),
3162 Some(&crate::storage::schema::Value::Boolean(true))
3163 );
3164 let res2 = r
3166 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
3167 .unwrap();
3168 let row2 = &res2.result.records[0];
3169 assert_eq!(
3170 row2.get("ok"),
3171 Some(&crate::storage::schema::Value::Boolean(false))
3172 );
3173 }
3174
3175 #[test]
3176 fn incr_via_sql_roundtrip() {
3177 let r = rt();
3178 let res = r.execute_query("KV INCR hits").unwrap();
3179 let row = &res.result.records[0];
3180 assert_eq!(
3181 row.get("value"),
3182 Some(&crate::storage::schema::Value::Integer(1))
3183 );
3184 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
3185 let row2 = &res2.result.records[0];
3186 assert_eq!(
3187 row2.get("value"),
3188 Some(&crate::storage::schema::Value::Integer(5))
3189 );
3190 }
3191
3192 #[test]
3193 fn concurrent_self_referential_update_is_atomic() {
3194 const THREADS: usize = 8;
3195 const ITERS: usize = 100;
3196
3197 let runtime = std::sync::Arc::new(rt());
3198 runtime
3199 .execute_query("CREATE TABLE counters (id INT, n INT)")
3200 .unwrap();
3201 runtime
3202 .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3203 .unwrap();
3204
3205 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3206 let mut handles = Vec::new();
3207 for _ in 0..THREADS {
3208 let runtime = std::sync::Arc::clone(&runtime);
3209 let barrier = std::sync::Arc::clone(&barrier);
3210 handles.push(std::thread::spawn(move || {
3211 barrier.wait();
3212 for _ in 0..ITERS {
3213 runtime
3214 .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3215 .unwrap();
3216 }
3217 }));
3218 }
3219
3220 for handle in handles {
3221 handle.join().expect("worker should finish");
3222 }
3223
3224 let selected = runtime
3225 .execute_query("SELECT n FROM counters WHERE id = 1")
3226 .unwrap();
3227 assert_eq!(
3228 selected.result.records[0].get("n"),
3229 Some(&crate::storage::schema::Value::Integer(
3230 (THREADS * ITERS) as i64
3231 ))
3232 );
3233 }
3234
3235 #[test]
3236 fn watch_stream_delivers_key_events_in_lsn_order() {
3237 let r = rt();
3238 let ops = super::KvAtomicOps::new(&r);
3239 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3240
3241 ops.set(
3242 CollectionModel::Kv,
3243 "kv_default",
3244 "seq",
3245 crate::storage::schema::Value::Integer(1),
3246 None,
3247 false,
3248 )
3249 .unwrap();
3250 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3251 .unwrap();
3252 ops.delete(CollectionModel::Kv, "kv_default", "seq")
3253 .unwrap();
3254 ops.set(
3255 CollectionModel::Kv,
3256 "kv_default",
3257 "seq",
3258 crate::storage::schema::Value::Integer(9),
3259 None,
3260 false,
3261 )
3262 .unwrap();
3263
3264 let mut events = Vec::new();
3265 while let Some(event) = stream.poll_next() {
3266 events.push(event);
3267 if events.len() == 4 {
3268 break;
3269 }
3270 }
3271
3272 assert_eq!(events.len(), 4);
3273 assert_eq!(
3274 events[0].op,
3275 crate::replication::cdc::ChangeOperation::Insert
3276 );
3277 assert_eq!(
3278 events[1].op,
3279 crate::replication::cdc::ChangeOperation::Update
3280 );
3281 assert_eq!(
3282 events[2].op,
3283 crate::replication::cdc::ChangeOperation::Delete
3284 );
3285 assert_eq!(
3286 events[3].op,
3287 crate::replication::cdc::ChangeOperation::Insert
3288 );
3289 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3290 }
3291
3292 #[test]
3293 fn watch_prefix_stream_delivers_matching_events_only() {
3294 let r = rt();
3295 let ops = super::KvAtomicOps::new(&r);
3296 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3297
3298 ops.set(
3299 CollectionModel::Kv,
3300 "kv_default",
3301 "acct:1",
3302 crate::storage::schema::Value::Integer(1),
3303 None,
3304 false,
3305 )
3306 .unwrap();
3307 ops.set(
3308 CollectionModel::Kv,
3309 "kv_default",
3310 "session:1",
3311 crate::storage::schema::Value::Integer(2),
3312 None,
3313 false,
3314 )
3315 .unwrap();
3316 ops.set(
3317 CollectionModel::Kv,
3318 "kv_default",
3319 "acct:2",
3320 crate::storage::schema::Value::Integer(3),
3321 None,
3322 false,
3323 )
3324 .unwrap();
3325
3326 let first = stream.poll_next().expect("first prefix event");
3327 let second = stream.poll_next().expect("second prefix event");
3328 assert_eq!(first.key, "acct:1");
3329 assert_eq!(second.key, "acct:2");
3330 assert!(stream.poll_next().is_none());
3331 }
3332
3333 #[test]
3334 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3335 let r = rt();
3336 let ops = super::KvAtomicOps::new(&r);
3337 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3338
3339 let mut last_seen_lsn = 0;
3340 for value in 0..5 {
3341 ops.set(
3342 CollectionModel::Kv,
3343 "kv_default",
3344 "resume",
3345 crate::storage::schema::Value::Integer(value),
3346 None,
3347 false,
3348 )
3349 .unwrap();
3350 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3351 }
3352 drop(stream);
3353
3354 for value in 5..55 {
3355 ops.set(
3356 CollectionModel::Kv,
3357 "kv_default",
3358 "resume",
3359 crate::storage::schema::Value::Integer(value),
3360 None,
3361 false,
3362 )
3363 .unwrap();
3364 }
3365
3366 let expected_lsns: Vec<u64> = r
3367 .kv_watch_events_since("kv_default", "resume", last_seen_lsn, 200)
3368 .into_iter()
3369 .map(|event| event.lsn)
3370 .collect();
3371 assert!(!expected_lsns.is_empty());
3372
3373 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3374 let mut lsns = Vec::new();
3375 while let Some(event) = resumed.poll_next() {
3376 lsns.push(event.lsn);
3377 if lsns.len() == expected_lsns.len() {
3378 break;
3379 }
3380 }
3381
3382 assert_eq!(lsns, expected_lsns);
3383 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3384 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3385 assert!(resumed.poll_next().is_none());
3386 }
3387
3388 #[test]
3389 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3390 let r = rt();
3391 let ops = super::KvAtomicOps::new(&r);
3392 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3393
3394 for value in 0..1_200 {
3395 ops.set(
3396 CollectionModel::Kv,
3397 "kv_default",
3398 "slow",
3399 crate::storage::schema::Value::Integer(value),
3400 None,
3401 false,
3402 )
3403 .unwrap();
3404 }
3405
3406 let event = stream.poll_next().expect("tail event after drops");
3407 assert!(event.lsn > 1);
3408 assert!(event.dropped_event_count > 0);
3409 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3410 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3411 }
3412
3413 #[test]
3414 fn watch_stream_idle_timeout_closes_subscription() {
3415 let r = rt();
3416 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3417 .unwrap();
3418
3419 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3420 assert_eq!(r.stats().kv.watch_streams_active, 1);
3421 std::thread::sleep(std::time::Duration::from_millis(5));
3422
3423 assert!(stream.poll_next().is_none());
3424 assert_eq!(r.stats().kv.watch_streams_active, 0);
3425 }
3426
3427 #[test]
3428 fn watch_stream_does_not_emit_rolled_back_put() {
3429 let r = rt();
3430 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3431
3432 r.execute_query("BEGIN").unwrap();
3433 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3434 r.execute_query("ROLLBACK").unwrap();
3435
3436 assert!(stream.poll_next().is_none());
3437 }
3438}