1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 if was_present {
225 self.delete(model, collection, key)?;
226 }
227
228 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229 metadata.extend(ttl_metadata.fields);
230 }
231 if let Some(tags_metadata) = kv_tags_metadata(tags) {
232 metadata.push(tags_metadata);
233 }
234
235 let output = self
236 .runtime
237 .create_kv(crate::application::entity::CreateKvInput {
238 collection: collection.to_string(),
239 key: key.to_string(),
240 value,
241 metadata,
242 })?;
243 if model == crate::catalog::CollectionModel::Kv {
244 self.runtime
245 .inner
246 .kv_tag_index
247 .replace(collection, key, output.id, tags);
248 }
249
250 if model == crate::catalog::CollectionModel::Kv {
251 self.runtime
252 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253 }
254
255 if model == crate::catalog::CollectionModel::Kv {
256 self.runtime.inner.kv_stats.incr_puts();
257 }
258 Ok((!was_present, output.id))
259 }
260
261 pub fn get(
263 &self,
264 model: crate::catalog::CollectionModel,
265 collection: &str,
266 key: &str,
267 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268 let result = self.get_entry(model, collection, key)?;
269 if model == crate::catalog::CollectionModel::Kv {
270 self.runtime.inner.kv_stats.incr_gets();
271 }
272 Ok(result.map(|(v, _)| v))
273 }
274
275 pub fn delete(
277 &self,
278 model: crate::catalog::CollectionModel,
279 collection: &str,
280 key: &str,
281 ) -> RedDBResult<bool> {
282 self.ensure_declared_model(model, collection)?;
283 let found = self.get_entry(model, collection, key)?;
284 if let Some((value, id)) = found {
285 let store = self.runtime.inner.db.store();
286 let deleted = store
287 .delete(collection, id)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 if deleted {
290 store.context_index().remove_entity(id);
291 if model == crate::catalog::CollectionModel::Kv {
292 self.runtime.inner.kv_tag_index.remove(collection, key);
293 self.runtime.record_kv_watch_event(
294 crate::replication::cdc::ChangeOperation::Delete,
295 collection,
296 key,
297 id.raw(),
298 Some(crate::presentation::entity_json::storage_value_to_json(
299 &value,
300 )),
301 None,
302 );
303 self.runtime.inner.kv_stats.incr_deletes();
304 }
305 }
306 Ok(deleted)
307 } else {
308 Ok(false)
309 }
310 }
311
312 pub fn incr(
317 &self,
318 model: crate::catalog::CollectionModel,
319 collection: &str,
320 key: &str,
321 by: i64,
322 ttl_ms: Option<u64>,
323 ) -> RedDBResult<i64> {
324 if model == crate::catalog::CollectionModel::Vault {
325 return Err(RedDBError::InvalidOperation(
326 "VAULT INCR is not supported for sealed secrets".to_string(),
327 ));
328 }
329 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330 let _rmw_guard = rmw_lock.lock();
331 self.ensure_kv_collection(collection)?;
332 let existing = self.runtime.get_kv(collection, key)?;
333 let current: i64 = match existing.as_ref() {
334 None => 0,
335 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337 Some((other, _)) => {
338 return Err(RedDBError::Internal(format!(
339 "INCR on non-integer value: {:?}",
340 other
341 )));
342 }
343 };
344
345 let next = current
346 .checked_add(by)
347 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349 if existing.is_some() {
351 self.runtime.delete_kv(collection, key)?;
352 }
353
354 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355 .map(|m| m.fields.into_iter().collect())
356 .unwrap_or_default();
357
358 let output = self
359 .runtime
360 .create_kv(crate::application::entity::CreateKvInput {
361 collection: collection.to_string(),
362 key: key.to_string(),
363 value: crate::storage::schema::Value::Integer(next),
364 metadata: meta_vec,
365 })?;
366 self.runtime
367 .inner
368 .kv_tag_index
369 .replace(collection, key, output.id, &[]);
370
371 self.runtime.record_kv_watch_event(
372 if existing.is_some() {
373 crate::replication::cdc::ChangeOperation::Update
374 } else {
375 crate::replication::cdc::ChangeOperation::Insert
376 },
377 collection,
378 key,
379 output.id.raw(),
380 existing
381 .as_ref()
382 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383 Some(crate::presentation::entity_json::storage_value_to_json(
384 &crate::storage::schema::Value::Integer(next),
385 )),
386 );
387
388 self.runtime.inner.kv_stats.incr_incrs();
389 Ok(next)
390 }
391
392 pub fn cas(
400 &self,
401 model: crate::catalog::CollectionModel,
402 collection: &str,
403 key: &str,
404 expected: Option<&crate::storage::schema::Value>,
405 new_value: crate::storage::schema::Value,
406 ttl_ms: Option<u64>,
407 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408 if model == crate::catalog::CollectionModel::Vault {
409 return Err(RedDBError::InvalidOperation(
410 "VAULT CAS is not supported for sealed secrets".to_string(),
411 ));
412 }
413 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414 let _rmw_guard = rmw_lock.lock();
415 self.ensure_kv_collection(collection)?;
416 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418 let matches = match (¤t, expected) {
419 (None, None) => true,
420 (Some(cur), Some(exp)) => cur == exp,
421 _ => false,
422 };
423
424 if !matches {
425 self.runtime.inner.kv_stats.incr_cas_conflict();
426 return Ok((false, current));
427 }
428
429 if current.is_some() {
431 self.runtime.delete_kv(collection, key)?;
432 }
433
434 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435 .map(|m| m.fields.into_iter().collect())
436 .unwrap_or_default();
437
438 let output = self
439 .runtime
440 .create_kv(crate::application::entity::CreateKvInput {
441 collection: collection.to_string(),
442 key: key.to_string(),
443 value: new_value.clone(),
444 metadata: meta_vec,
445 })?;
446 self.runtime
447 .inner
448 .kv_tag_index
449 .replace(collection, key, output.id, &[]);
450
451 self.runtime.record_kv_watch_event(
452 if current.is_some() {
453 crate::replication::cdc::ChangeOperation::Update
454 } else {
455 crate::replication::cdc::ChangeOperation::Insert
456 },
457 collection,
458 key,
459 output.id.raw(),
460 current
461 .as_ref()
462 .map(crate::presentation::entity_json::storage_value_to_json),
463 Some(crate::presentation::entity_json::storage_value_to_json(
464 &new_value,
465 )),
466 );
467
468 self.runtime.inner.kv_stats.incr_cas_success();
469 Ok((true, current))
470 }
471
472 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473 self.runtime
474 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475 self.runtime.check_kv_invalidate_policy(collection)?;
476 self.ensure_kv_collection(collection)?;
477 let entries = self
478 .runtime
479 .inner
480 .kv_tag_index
481 .entries_for_tags(collection, tags);
482 if entries.is_empty() {
483 return Ok(0);
484 }
485
486 let store = self.runtime.inner.db.store();
487 let mut removed = 0usize;
488 for (key, id) in entries {
489 let before = store
490 .get(collection, id)
491 .and_then(|entity| kv_value_from_entity(&entity));
492 let deleted = store
493 .delete(collection, id)
494 .map_err(|err| RedDBError::Internal(err.to_string()))?;
495 if deleted {
496 store.context_index().remove_entity(id);
497 self.runtime.inner.kv_tag_index.remove(collection, &key);
498 self.runtime.record_kv_watch_event(
499 crate::replication::cdc::ChangeOperation::Delete,
500 collection,
501 &key,
502 id.raw(),
503 before
504 .as_ref()
505 .map(crate::presentation::entity_json::storage_value_to_json),
506 None,
507 );
508 removed += 1;
509 }
510 }
511 if removed > 0 {
512 self.runtime.inner.kv_stats.incr_deletes();
513 }
514 Ok(removed)
515 }
516
517 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518 self.runtime
519 .inner
520 .kv_tag_index
521 .tags_for_key(collection, key)
522 }
523
524 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527 }
528
529 fn ensure_keyed_collection(
530 &self,
531 model: crate::catalog::CollectionModel,
532 collection: &str,
533 ) -> RedDBResult<()> {
534 let store = self.runtime.inner.db.store();
535 if store.get_collection(collection).is_some() {
536 return self.ensure_declared_model(model, collection);
537 }
538 if model != crate::catalog::CollectionModel::Kv {
539 return Err(RedDBError::NotFound(format!(
540 "{} collection '{collection}' does not exist",
541 keyed_model_name(model)
542 )));
543 }
544 let auto_create = self
546 .runtime
547 .config_bool("red.config.kv.default_collection", true);
548 if !auto_create {
549 return Err(RedDBError::NotFound(format!(
550 "kv collection '{collection}' does not exist and auto-create is disabled \
551 (red.config.kv.default_collection = false)"
552 )));
553 }
554 store
555 .create_collection(collection)
556 .map_err(|err| RedDBError::Internal(err.to_string()))?;
557 self.runtime
558 .inner
559 .db
560 .save_collection_contract(kv_collection_contract(collection))
561 .map_err(|err| RedDBError::Internal(err.to_string()))?;
562 Ok(())
563 }
564
565 fn get_entry(
566 &self,
567 model: crate::catalog::CollectionModel,
568 collection: &str,
569 key: &str,
570 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571 let Some(entity) = self.get_entity(model, collection, key)? else {
572 return Ok(None);
573 };
574 Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575 }
576
577 fn get_entity(
578 &self,
579 model: crate::catalog::CollectionModel,
580 collection: &str,
581 key: &str,
582 ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583 self.ensure_declared_model(model, collection)?;
584 let store = self.runtime.inner.db.store();
585 let Some(manager) = store.get_collection(collection) else {
586 return Ok(None);
587 };
588 let entities = manager.query_all(|_| true);
589 for entity in entities {
590 if let crate::storage::EntityData::Row(ref row) = entity.data {
591 if let Some(ref named) = row.named {
592 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593 if &**k == key {
594 return Ok(Some(entity));
595 }
596 }
597 }
598 }
599 }
600 Ok(None)
601 }
602
603 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604 self.vault_versions(collection, key)
605 .map(super::keyed_spine::latest_version)
606 }
607
608 fn get_vault_entry_version(
609 &self,
610 collection: &str,
611 key: &str,
612 version: i64,
613 ) -> RedDBResult<Option<VaultEntry>> {
614 Ok(self
615 .vault_versions(collection, key)?
616 .into_iter()
617 .find(|entry| entry.version == version))
618 }
619
620 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622 let store = self.runtime.inner.db.store();
623 let Some(manager) = store.get_collection(collection) else {
624 return Ok(Vec::new());
625 };
626 let entities = manager.query_all(|_| true);
627 let mut versions = Vec::new();
628 for entity in entities {
629 let crate::storage::EntityData::Row(ref row) = entity.data else {
630 continue;
631 };
632 let Some(version) =
633 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634 else {
635 continue;
636 };
637 if version.key != key {
638 continue;
639 }
640 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641 versions.push(VaultEntry::from_keyed_row(
642 version,
643 metadata,
644 entity.created_at,
645 entity.updated_at,
646 entity.sequence_id,
647 ));
648 }
649 Ok(versions)
650 }
651
652 fn latest_vault_entries(
653 &self,
654 collection: &str,
655 prefix: Option<&str>,
656 ) -> RedDBResult<Vec<VaultEntry>> {
657 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658 let store = self.runtime.inner.db.store();
659 let Some(manager) = store.get_collection(collection) else {
660 return Ok(Vec::new());
661 };
662 let mut versions = Vec::new();
663 for entity in manager.query_all(|_| true) {
664 let crate::storage::EntityData::Row(ref row) = entity.data else {
665 continue;
666 };
667 let Some(version) =
668 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669 else {
670 continue;
671 };
672 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673 let entry = VaultEntry::from_keyed_row(
674 version,
675 metadata,
676 entity.created_at,
677 entity.updated_at,
678 entity.sequence_id,
679 );
680 versions.push(entry);
681 }
682 Ok(super::keyed_spine::latest_versions(versions, prefix))
683 }
684
685 fn append_vault_version(
686 &self,
687 collection: &str,
688 key: &str,
689 value: crate::storage::schema::Value,
690 op: &str,
691 tombstone: bool,
692 tags: &[String],
693 ) -> RedDBResult<VaultEntry> {
694 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695 let version = self
696 .get_vault_entry(collection, key)?
697 .map(|entry| entry.version)
698 .unwrap_or(0)
699 + 1;
700 let stored_value = if tombstone {
701 crate::storage::schema::Value::Null
702 } else {
703 self.runtime.seal_vault_value(collection, value)?
704 };
705 let now = current_unix_ms() as i64;
706 let fields = vec![
707 (
708 "key".to_string(),
709 crate::storage::schema::Value::text(key.to_string()),
710 ),
711 ("value".to_string(), stored_value),
712 (
713 "version".to_string(),
714 crate::storage::schema::Value::Integer(version),
715 ),
716 (
717 "tombstone".to_string(),
718 crate::storage::schema::Value::Boolean(tombstone),
719 ),
720 (
721 "op".to_string(),
722 crate::storage::schema::Value::text(op.to_string()),
723 ),
724 (
725 "created_at_ms".to_string(),
726 crate::storage::schema::Value::Integer(now),
727 ),
728 ];
729 let mut row = crate::storage::RowData::new(Vec::new());
730 row.named = Some(fields.into_iter().collect());
731 let entity = crate::storage::UnifiedEntity::new(
732 crate::storage::EntityId::new(0),
733 crate::storage::EntityKind::TableRow {
734 table: std::sync::Arc::from(collection),
735 row_id: 0,
736 },
737 crate::storage::EntityData::Row(row),
738 );
739 let id = self
740 .runtime
741 .inner
742 .db
743 .store()
744 .insert(collection, entity)
745 .map_err(|err| RedDBError::Internal(err.to_string()))?;
746 if !tags.is_empty() {
747 self.runtime
748 .inner
749 .db
750 .store()
751 .set_metadata(
752 collection,
753 id,
754 Metadata::with_fields(vault_tags_metadata(tags)),
755 )
756 .map_err(|err| RedDBError::Internal(err.to_string()))?;
757 self.runtime
758 .inner
759 .kv_tag_index
760 .replace(collection, key, id, tags);
761 }
762 self.get_vault_entry_version(collection, key, version)?
763 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764 }
765
766 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768 let versions = self.vault_versions(collection, key)?;
769 let store = self.runtime.inner.db.store();
770 let mut purged = 0usize;
771 for entry in versions {
772 if store
773 .delete(collection, entry.id)
774 .map_err(|err| RedDBError::Internal(err.to_string()))?
775 {
776 store.context_index().remove_entity(entry.id);
777 purged += 1;
778 }
779 }
780 Ok(purged)
781 }
782
783 fn ensure_declared_model(
784 &self,
785 model: crate::catalog::CollectionModel,
786 collection: &str,
787 ) -> RedDBResult<()> {
788 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789 return Ok(());
790 };
791 if contract.declared_model == model
792 || contract.declared_model == crate::catalog::CollectionModel::Mixed
793 {
794 return Ok(());
795 }
796 Err(RedDBError::InvalidOperation(format!(
797 "collection '{}' is declared as '{}' and does not allow '{}' operations",
798 collection,
799 keyed_model_name(contract.declared_model),
800 keyed_model_name(model)
801 )))
802 }
803}
804
805impl RedDBRuntime {
806 pub(crate) fn seal_vault_value(
807 &self,
808 collection: &str,
809 value: crate::storage::schema::Value,
810 ) -> RedDBResult<crate::storage::schema::Value> {
811 let key = self.vault_encryption_key(collection)?;
812 let plaintext = value.to_bytes();
813 let nonce_bytes = crate::auth::store::random_bytes(12);
814 let mut nonce = [0u8; 12];
815 nonce.copy_from_slice(&nonce_bytes[..12]);
816 let aad = format!("reddb.vault.{collection}");
817 let ciphertext =
818 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819 let mut payload = Vec::with_capacity(12 + ciphertext.len());
820 payload.extend_from_slice(&nonce);
821 payload.extend_from_slice(&ciphertext);
822 Ok(crate::storage::schema::Value::Secret(payload))
823 }
824
825 fn vault_key_available(&self, collection: &str) -> bool {
826 self.vault_encryption_key(collection).is_ok()
827 }
828
829 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832 })?;
833 if !auth_store.is_vault_backed() {
834 return Err(RedDBError::Query(
835 "vault sealed_unavailable: key provider is sealed".to_string(),
836 ));
837 }
838
839 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840 return decode_vault_key(&hex_key);
841 }
842 auth_store.vault_secret_key().ok_or_else(|| {
843 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844 })
845 }
846
847 fn unseal_vault_value(
848 &self,
849 collection: &str,
850 sealed: &crate::storage::schema::Value,
851 ) -> RedDBResult<crate::storage::schema::Value> {
852 let crate::storage::schema::Value::Secret(payload) = sealed else {
853 return Err(RedDBError::Query(
854 "vault unseal failed: stored value is not sealed".to_string(),
855 ));
856 };
857 if payload.len() < 12 {
858 return Err(RedDBError::Query(
859 "vault unseal failed: sealed payload is truncated".to_string(),
860 ));
861 }
862 let key = self.vault_encryption_key(collection)?;
863 let mut nonce = [0u8; 12];
864 nonce.copy_from_slice(&payload[..12]);
865 let aad = format!("reddb.vault.{collection}");
866 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867 &key,
868 &nonce,
869 aad.as_bytes(),
870 &payload[12..],
871 )
872 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873 let (value, consumed) =
874 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876 })?;
877 if consumed != plaintext.len() {
878 return Err(RedDBError::Query(
879 "vault unseal failed: trailing plaintext bytes".to_string(),
880 ));
881 }
882 Ok(value)
883 }
884
885 pub(crate) fn peek_vault_unsealed(
891 &self,
892 collection: &str,
893 key: &str,
894 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
895 let ops = KvAtomicOps::new(self);
896 let Some(entry) = ops.get_vault_entry(collection, key)? else {
897 return Ok(None);
898 };
899 if entry.tombstone {
900 return Ok(None);
901 }
902 Ok(self.unseal_vault_value(collection, &entry.value).ok())
903 }
904
905 fn vault_target_resource(collection: &str, key: &str) -> String {
906 if collection == "red.vault" {
907 return format!("red.vault/{}", key.to_ascii_lowercase());
908 }
909 format!("{collection}.{key}")
910 }
911
912 fn current_vault_actor() -> String {
913 current_auth_identity()
914 .map(|(principal, _)| principal)
915 .unwrap_or_else(|| "anonymous".to_string())
916 }
917
918 fn vault_request_id() -> String {
919 let conn_id = current_connection_id();
920 if conn_id == 0 {
921 "embedded".to_string()
922 } else {
923 format!("conn-{conn_id}")
924 }
925 }
926
927 fn check_vault_capability(
928 &self,
929 action: &str,
930 collection: &str,
931 key: &str,
932 ) -> Result<(), String> {
933 let Some(auth_store) = self.inner.auth_store.read().clone() else {
934 return Ok(());
935 };
936 if !auth_store.iam_authorization_enabled() {
937 return Ok(());
938 }
939 let Some((principal, role)) = current_auth_identity() else {
940 return Err(
941 "IAM authorization is enabled; vault capability check requires an authenticated principal"
942 .to_string(),
943 );
944 };
945 let tenant = current_tenant();
946 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
947 let mut resource = crate::auth::policies::ResourceRef::new(
948 "vault",
949 Self::vault_target_resource(collection, key),
950 );
951 if let Some(ref tenant) = tenant {
952 resource = resource.with_tenant(tenant.clone());
953 }
954 let ctx = crate::auth::policies::EvalContext {
955 principal_tenant: tenant.clone(),
956 current_tenant: tenant,
957 peer_ip: None,
958 mfa_present: false,
959 now_ms: crate::utils::now_unix_millis() as u128,
960 principal_is_admin_role: role == crate::auth::Role::Admin,
961 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
962 principal_is_platform_scoped: principal_id.tenant.is_none(),
963 };
964 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
965 Ok(())
966 } else {
967 Err(format!(
968 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
969 principal,
970 action,
971 Self::vault_target_resource(collection, key)
972 ))
973 }
974 }
975
976 fn check_system_vault_capability(
977 &self,
978 action: &str,
979 collection: &str,
980 key: &str,
981 ) -> Result<(), String> {
982 if collection != "red.vault" {
983 return Ok(());
984 }
985 self.check_vault_capability(action, collection, key)
986 }
987
988 fn audit_vault_unseal(
989 &self,
990 collection: &str,
991 key: &str,
992 outcome: crate::runtime::audit_log::Outcome,
993 reason: &str,
994 entry: Option<&VaultEntry>,
995 ) {
996 let actor = Self::current_vault_actor();
997 let request_id = Self::vault_request_id();
998 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
999 .principal(actor.clone())
1000 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1001 .resource(format!(
1002 "vault:{}",
1003 Self::vault_target_resource(collection, key)
1004 ))
1005 .outcome(outcome)
1006 .correlation_id(request_id.clone())
1007 .fields([
1008 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1009 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1010 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1011 crate::runtime::audit_log::AuditFieldEscaper::field(
1012 "target",
1013 Self::vault_target_resource(collection, key),
1014 ),
1015 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1016 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1017 crate::runtime::audit_log::AuditFieldEscaper::field(
1018 "connection_id",
1019 current_connection_id(),
1020 ),
1021 ]);
1022 if let Some(tenant) = current_tenant() {
1023 builder = builder.tenant(tenant);
1024 }
1025 if let Some(entry) = entry {
1026 builder = builder.fields([
1027 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1028 crate::runtime::audit_log::AuditFieldEscaper::field(
1029 "sequence_id",
1030 entry.sequence_id,
1031 ),
1032 ]);
1033 }
1034 self.audit_log().record_event(builder.build());
1035 }
1036
1037 fn audit_vault_lifecycle(
1038 &self,
1039 operation: &str,
1040 collection: &str,
1041 key: &str,
1042 outcome: crate::runtime::audit_log::Outcome,
1043 reason: &str,
1044 entry: Option<&VaultEntry>,
1045 ) {
1046 let actor = Self::current_vault_actor();
1047 let request_id = Self::vault_request_id();
1048 let mut builder =
1049 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1050 .principal(actor.clone())
1051 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1052 .resource(format!(
1053 "vault:{}",
1054 Self::vault_target_resource(collection, key)
1055 ))
1056 .outcome(outcome)
1057 .correlation_id(request_id.clone())
1058 .fields([
1059 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1060 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1061 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1062 crate::runtime::audit_log::AuditFieldEscaper::field(
1063 "target",
1064 Self::vault_target_resource(collection, key),
1065 ),
1066 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1067 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1068 crate::runtime::audit_log::AuditFieldEscaper::field(
1069 "connection_id",
1070 current_connection_id(),
1071 ),
1072 ]);
1073 if let Some(tenant) = current_tenant() {
1074 builder = builder.tenant(tenant);
1075 }
1076 if let Some(entry) = entry {
1077 builder = builder.fields([
1078 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1079 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1080 crate::runtime::audit_log::AuditFieldEscaper::field(
1081 "sequence_id",
1082 entry.sequence_id,
1083 ),
1084 ]);
1085 }
1086 self.audit_log().record_event(builder.build());
1087 }
1088
1089 fn emit_vault_control_event(
1090 &self,
1091 kind: crate::runtime::control_events::EventKind,
1092 outcome: crate::runtime::control_events::Outcome,
1093 action: &'static str,
1094 collection: &str,
1095 key: &str,
1096 reason: &str,
1097 entry: Option<&VaultEntry>,
1098 extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1099 ) -> RedDBResult<()> {
1100 use crate::runtime::control_events::{
1101 ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1102 };
1103 use std::borrow::Cow;
1104
1105 let tenant = current_tenant();
1106 let principal = current_auth_identity().map(|(principal, _)| principal);
1107 let actor_user = principal
1108 .as_ref()
1109 .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1110 let request_id = Self::vault_request_id();
1111 let actor = actor_user
1112 .as_ref()
1113 .map(ActorRef::User)
1114 .unwrap_or(ActorRef::Anonymous);
1115 let ctx = ControlEventCtx {
1116 actor,
1117 scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1118 request_id: Some(Cow::Borrowed(request_id.as_str())),
1119 trace_id: None,
1120 };
1121
1122 let target = Self::vault_target_resource(collection, key);
1123 let mut fields = std::collections::HashMap::new();
1124 fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1125 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1126 fields.insert("key".to_string(), Sensitivity::raw(key));
1127 fields.insert(
1128 "connection_id".to_string(),
1129 Sensitivity::raw(current_connection_id().to_string()),
1130 );
1131 if let Some(entry) = entry {
1132 fields.insert(
1133 "entity_id".to_string(),
1134 Sensitivity::raw(entry.id.raw().to_string()),
1135 );
1136 fields.insert(
1137 "sequence_id".to_string(),
1138 Sensitivity::raw(entry.sequence_id.to_string()),
1139 );
1140 fields.insert(
1141 "version".to_string(),
1142 Sensitivity::raw(entry.version.to_string()),
1143 );
1144 fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1145 fields.insert(
1146 "tombstone".to_string(),
1147 Sensitivity::raw(entry.tombstone.to_string()),
1148 );
1149 if !entry.tombstone {
1150 fields.insert(
1151 "fingerprint".to_string(),
1152 Sensitivity::raw(vault_fingerprint(&entry.value)),
1153 );
1154 }
1155 fields.insert(
1156 "tags".to_string(),
1157 Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1158 );
1159 }
1160 for (key, value) in extra_fields {
1161 fields.insert(key, value);
1162 }
1163
1164 let event = ControlEvent {
1165 kind,
1166 outcome,
1167 action: Cow::Borrowed(action),
1168 resource: Some(format!("vault:{target}")),
1169 reason: Some(reason.to_string()),
1170 matched_policy_id: None,
1171 fields,
1172 };
1173 let ledger = self.inner.control_event_ledger.read();
1174 match ledger.emit(&ctx, event) {
1175 Ok(_) => Ok(()),
1176 Err(err) if self.inner.control_event_config.require_persistence() => {
1177 Err(RedDBError::Internal(err.to_string()))
1178 }
1179 Err(_) => Ok(()),
1180 }
1181 }
1182
1183 pub(crate) fn resolve_vault_secret_value(
1184 &self,
1185 collection: &str,
1186 key: &str,
1187 ) -> RedDBResult<Value> {
1188 let ops = KvAtomicOps::new(self);
1189 let entry = ops.get_vault_entry(collection, key)?;
1190 if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1191 self.audit_vault_unseal(
1192 collection,
1193 key,
1194 crate::runtime::audit_log::Outcome::Denied,
1195 &reason,
1196 entry.as_ref(),
1197 );
1198 self.emit_vault_control_event(
1199 crate::runtime::control_events::EventKind::VaultRead,
1200 crate::runtime::control_events::Outcome::Denied,
1201 "vault:read",
1202 collection,
1203 key,
1204 &reason,
1205 entry.as_ref(),
1206 Vec::new(),
1207 )?;
1208 return Err(RedDBError::Query(reason));
1209 }
1210 let Some(entry) = entry else {
1211 let reason = "not_found";
1212 self.audit_vault_unseal(
1213 collection,
1214 key,
1215 crate::runtime::audit_log::Outcome::Denied,
1216 reason,
1217 None,
1218 );
1219 self.emit_vault_control_event(
1220 crate::runtime::control_events::EventKind::VaultRead,
1221 crate::runtime::control_events::Outcome::Denied,
1222 "vault:read",
1223 collection,
1224 key,
1225 reason,
1226 None,
1227 Vec::new(),
1228 )?;
1229 return Err(RedDBError::NotFound(format!(
1230 "vault secret '{}.{}' not found",
1231 collection, key
1232 )));
1233 };
1234 if entry.tombstone {
1235 let reason = "deleted";
1236 self.audit_vault_unseal(
1237 collection,
1238 key,
1239 crate::runtime::audit_log::Outcome::Denied,
1240 reason,
1241 Some(&entry),
1242 );
1243 self.emit_vault_control_event(
1244 crate::runtime::control_events::EventKind::VaultRead,
1245 crate::runtime::control_events::Outcome::Denied,
1246 "vault:read",
1247 collection,
1248 key,
1249 reason,
1250 Some(&entry),
1251 Vec::new(),
1252 )?;
1253 return Err(RedDBError::NotFound(format!(
1254 "vault secret '{}.{}' is deleted",
1255 collection, key
1256 )));
1257 }
1258 match self.unseal_vault_value(collection, &entry.value) {
1259 Ok(value) => {
1260 self.audit_vault_unseal(
1261 collection,
1262 key,
1263 crate::runtime::audit_log::Outcome::Success,
1264 "ok",
1265 Some(&entry),
1266 );
1267 self.emit_vault_control_event(
1268 crate::runtime::control_events::EventKind::VaultRead,
1269 crate::runtime::control_events::Outcome::Allowed,
1270 "vault:read",
1271 collection,
1272 key,
1273 "ok",
1274 Some(&entry),
1275 Vec::new(),
1276 )?;
1277 Ok(value)
1278 }
1279 Err(err) => {
1280 let reason = err.to_string();
1281 self.audit_vault_unseal(
1282 collection,
1283 key,
1284 crate::runtime::audit_log::Outcome::Error,
1285 &reason,
1286 Some(&entry),
1287 );
1288 self.emit_vault_control_event(
1289 crate::runtime::control_events::EventKind::VaultRead,
1290 crate::runtime::control_events::Outcome::Error,
1291 "vault:read",
1292 collection,
1293 key,
1294 &reason,
1295 Some(&entry),
1296 Vec::new(),
1297 )?;
1298 Err(err)
1299 }
1300 }
1301 }
1302
1303 pub fn execute_kv_command(
1305 &self,
1306 raw_query: &str,
1307 cmd: &crate::storage::query::ast::KvCommand,
1308 ) -> RedDBResult<RuntimeQueryResult> {
1309 use crate::storage::query::ast::KvCommand;
1310
1311 let ops = KvAtomicOps::new(self);
1312
1313 match cmd {
1314 KvCommand::Put {
1315 model,
1316 collection,
1317 key,
1318 value,
1319 ttl_ms,
1320 tags,
1321 if_not_exists,
1322 } => {
1323 if *model == crate::catalog::CollectionModel::Vault {
1324 self.check_system_vault_capability("vault:write", collection, key)
1325 .map_err(RedDBError::Query)?;
1326 }
1327 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1328 let (created, id) = ops.set_with_tags_for_model(
1329 *model,
1330 collection,
1331 key,
1332 value.clone(),
1333 *ttl_ms,
1334 tags,
1335 *if_not_exists,
1336 )?;
1337
1338 let mut result = UnifiedResult::with_columns(vec![
1339 "ok".into(),
1340 "collection".into(),
1341 "key".into(),
1342 "id".into(),
1343 "created".into(),
1344 "tags".into(),
1345 ]);
1346 let mut record = UnifiedRecord::new();
1347 record.set("ok", Value::Boolean(true));
1348 record.set("collection", Value::text(collection.clone()));
1349 record.set("key", Value::text(key.clone()));
1350 record.set("id", Value::Integer(id.raw() as i64));
1351 record.set("created", Value::Boolean(created));
1352 record.set("tags", kv_tags_value(tags));
1353 result.push(record);
1354
1355 Ok(RuntimeQueryResult {
1356 query: raw_query.to_string(),
1357 mode: crate::storage::query::modes::QueryMode::Sql,
1358 statement: if *model == crate::catalog::CollectionModel::Vault {
1359 "vault_put"
1360 } else {
1361 "kv_put"
1362 },
1363 engine: if *model == crate::catalog::CollectionModel::Vault {
1364 "vault"
1365 } else {
1366 "kv"
1367 },
1368 result,
1369 affected_rows: 1,
1370 statement_type: if created { "insert" } else { "update" },
1371 bookmark: None,
1372 })
1373 }
1374 KvCommand::InvalidateTags { collection, tags } => {
1375 let invalidated = ops.invalidate_tags(collection, tags)?;
1376
1377 let mut result = UnifiedResult::with_columns(vec![
1378 "ok".into(),
1379 "collection".into(),
1380 "invalidated".into(),
1381 "tags".into(),
1382 ]);
1383 let mut record = UnifiedRecord::new();
1384 record.set("ok", Value::Boolean(true));
1385 record.set("collection", Value::text(collection.clone()));
1386 record.set("invalidated", Value::Integer(invalidated as i64));
1387 record.set("tags", kv_tags_value(tags));
1388 result.push(record);
1389
1390 Ok(RuntimeQueryResult {
1391 query: raw_query.to_string(),
1392 mode: crate::storage::query::modes::QueryMode::Sql,
1393 statement: "kv_invalidate_tags",
1394 engine: "kv",
1395 result,
1396 affected_rows: invalidated as u64,
1397 statement_type: "delete",
1398 bookmark: None,
1399 })
1400 }
1401
1402 KvCommand::Rotate {
1403 collection,
1404 key,
1405 value,
1406 tags,
1407 } => {
1408 self.check_system_vault_capability("vault:write", collection, key)
1409 .map_err(RedDBError::Query)?;
1410 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1411 let entry = ops.append_vault_version(
1412 collection,
1413 key,
1414 value.clone(),
1415 "rotate",
1416 false,
1417 tags,
1418 )?;
1419 self.record_kv_watch_event(
1420 crate::replication::cdc::ChangeOperation::Update,
1421 collection,
1422 key,
1423 entry.id.raw(),
1424 None,
1425 Some(vault_entry_metadata_json(&entry)),
1426 );
1427 self.audit_vault_lifecycle(
1428 "rotate",
1429 collection,
1430 key,
1431 crate::runtime::audit_log::Outcome::Success,
1432 "ok",
1433 Some(&entry),
1434 );
1435 self.emit_vault_control_event(
1436 crate::runtime::control_events::EventKind::VaultRotate,
1437 crate::runtime::control_events::Outcome::Allowed,
1438 "vault:rotate",
1439 collection,
1440 key,
1441 "ok",
1442 Some(&entry),
1443 Vec::new(),
1444 )?;
1445 Ok(vault_write_result(
1446 raw_query,
1447 "vault_rotate",
1448 "update",
1449 collection,
1450 key,
1451 &entry,
1452 1,
1453 ))
1454 }
1455
1456 KvCommand::List {
1457 model,
1458 collection,
1459 prefix,
1460 limit,
1461 offset,
1462 } => {
1463 if *model != crate::catalog::CollectionModel::Vault {
1464 return Err(RedDBError::InvalidOperation(
1465 "LIST is not supported through normal KV command execution".to_string(),
1466 ));
1467 }
1468 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1469 entries.sort_by(|left, right| left.key.cmp(&right.key));
1470 let mut result = UnifiedResult::with_columns(vec![
1471 "collection".into(),
1472 "key".into(),
1473 "version".into(),
1474 "fingerprint".into(),
1475 "tags".into(),
1476 "created_at".into(),
1477 "updated_at".into(),
1478 "status".into(),
1479 "tombstone".into(),
1480 "op".into(),
1481 ]);
1482 let mut visible = Vec::new();
1483 for entry in entries {
1484 match self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1485 {
1486 Ok(()) => {
1487 self.emit_vault_control_event(
1488 crate::runtime::control_events::EventKind::VaultMetadataRead,
1489 crate::runtime::control_events::Outcome::Allowed,
1490 "vault:read_metadata",
1491 collection,
1492 &entry.key,
1493 "ok",
1494 Some(&entry),
1495 Vec::new(),
1496 )?;
1497 visible.push(entry);
1498 }
1499 Err(reason) => {
1500 self.emit_vault_control_event(
1501 crate::runtime::control_events::EventKind::VaultMetadataRead,
1502 crate::runtime::control_events::Outcome::Denied,
1503 "vault:read_metadata",
1504 collection,
1505 &entry.key,
1506 &reason,
1507 Some(&entry),
1508 Vec::new(),
1509 )?;
1510 }
1511 }
1512 }
1513 for entry in visible
1514 .into_iter()
1515 .skip(*offset)
1516 .take(limit.unwrap_or(usize::MAX))
1517 {
1518 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1519 }
1520 Ok(RuntimeQueryResult {
1521 query: raw_query.to_string(),
1522 mode: crate::storage::query::modes::QueryMode::Sql,
1523 statement: "vault_list",
1524 engine: "vault",
1525 result,
1526 affected_rows: 0,
1527 statement_type: "select",
1528 bookmark: None,
1529 })
1530 }
1531
1532 KvCommand::History { collection, key } => {
1533 let latest = ops.get_vault_entry(collection, key)?;
1534 if let Err(reason) =
1535 self.check_vault_capability("vault:read_metadata", collection, key)
1536 {
1537 self.emit_vault_control_event(
1538 crate::runtime::control_events::EventKind::VaultMetadataRead,
1539 crate::runtime::control_events::Outcome::Denied,
1540 "vault:read_metadata",
1541 collection,
1542 key,
1543 &reason,
1544 latest.as_ref(),
1545 Vec::new(),
1546 )?;
1547 return Err(RedDBError::Query(reason));
1548 }
1549 let versions =
1550 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1551 let result = vault_history_result(collection, key, &versions);
1552 self.emit_vault_control_event(
1553 crate::runtime::control_events::EventKind::VaultMetadataRead,
1554 crate::runtime::control_events::Outcome::Allowed,
1555 "vault:read_metadata",
1556 collection,
1557 key,
1558 "ok",
1559 latest.as_ref(),
1560 Vec::new(),
1561 )?;
1562 Ok(RuntimeQueryResult {
1563 query: raw_query.to_string(),
1564 mode: crate::storage::query::modes::QueryMode::Sql,
1565 statement: "vault_history",
1566 engine: "vault",
1567 result,
1568 affected_rows: 0,
1569 statement_type: "select",
1570 bookmark: None,
1571 })
1572 }
1573
1574 KvCommand::Purge { collection, key } => {
1575 let entry = ops.get_vault_entry(collection, key)?;
1576 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1577 self.audit_vault_lifecycle(
1578 "purge",
1579 collection,
1580 key,
1581 crate::runtime::audit_log::Outcome::Denied,
1582 &reason,
1583 entry.as_ref(),
1584 );
1585 self.emit_vault_control_event(
1586 crate::runtime::control_events::EventKind::VaultPurge,
1587 crate::runtime::control_events::Outcome::Denied,
1588 "vault:purge",
1589 collection,
1590 key,
1591 &reason,
1592 entry.as_ref(),
1593 Vec::new(),
1594 )?;
1595 return Err(RedDBError::Query(reason));
1596 }
1597 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1598 let purged = ops.purge_vault_versions(collection, key)?;
1599 self.audit_vault_lifecycle(
1600 "purge",
1601 collection,
1602 key,
1603 crate::runtime::audit_log::Outcome::Success,
1604 "ok",
1605 entry.as_ref(),
1606 );
1607 self.emit_vault_control_event(
1608 crate::runtime::control_events::EventKind::VaultPurge,
1609 crate::runtime::control_events::Outcome::Allowed,
1610 "vault:purge",
1611 collection,
1612 key,
1613 "ok",
1614 entry.as_ref(),
1615 vec![(
1616 "purged".to_string(),
1617 crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1618 )],
1619 )?;
1620 let mut result = UnifiedResult::with_columns(vec![
1621 "ok".into(),
1622 "collection".into(),
1623 "key".into(),
1624 "purged".into(),
1625 ]);
1626 let mut record = UnifiedRecord::new();
1627 record.set("ok", Value::Boolean(true));
1628 record.set("collection", Value::text(collection.clone()));
1629 record.set("key", Value::text(key.clone()));
1630 record.set("purged", Value::Integer(purged as i64));
1631 result.push(record);
1632 Ok(RuntimeQueryResult {
1633 query: raw_query.to_string(),
1634 mode: crate::storage::query::modes::QueryMode::Sql,
1635 statement: "vault_purge",
1636 engine: "vault",
1637 result,
1638 affected_rows: purged as u64,
1639 statement_type: "delete",
1640 bookmark: None,
1641 })
1642 }
1643
1644 KvCommand::Get {
1645 model,
1646 collection,
1647 key,
1648 } => {
1649 if *model == crate::catalog::CollectionModel::Vault {
1650 let entry = ops.get_vault_entry(collection, key)?;
1651 if let Err(reason) =
1652 self.check_vault_capability("vault:read_metadata", collection, key)
1653 {
1654 self.emit_vault_control_event(
1655 crate::runtime::control_events::EventKind::VaultMetadataRead,
1656 crate::runtime::control_events::Outcome::Denied,
1657 "vault:read_metadata",
1658 collection,
1659 key,
1660 &reason,
1661 entry.as_ref(),
1662 Vec::new(),
1663 )?;
1664 return Err(RedDBError::Query(reason));
1665 }
1666 let key_available = self.vault_key_available(collection);
1667 let result =
1668 vault_metadata_result(collection, key, entry.as_ref(), key_available);
1669 self.emit_vault_control_event(
1670 crate::runtime::control_events::EventKind::VaultMetadataRead,
1671 crate::runtime::control_events::Outcome::Allowed,
1672 "vault:read_metadata",
1673 collection,
1674 key,
1675 "ok",
1676 entry.as_ref(),
1677 Vec::new(),
1678 )?;
1679 return Ok(RuntimeQueryResult {
1680 query: raw_query.to_string(),
1681 mode: crate::storage::query::modes::QueryMode::Sql,
1682 statement: "vault_get",
1683 engine: "vault",
1684 result,
1685 affected_rows: 0,
1686 statement_type: "select",
1687 bookmark: None,
1688 });
1689 }
1690
1691 let entity = ops.get_entity(*model, collection, key)?;
1692 let value = entity.as_ref().and_then(kv_value_from_entity);
1693 if *model == crate::catalog::CollectionModel::Kv {
1694 self.inner.kv_stats.incr_gets();
1695 }
1696 let mut result = UnifiedResult::with_columns(vec![
1697 "rid".into(),
1698 "collection".into(),
1699 "kind".into(),
1700 "tenant".into(),
1701 "created_at".into(),
1702 "updated_at".into(),
1703 "key".into(),
1704 "value".into(),
1705 "tags".into(),
1706 ]);
1707 let mut record = UnifiedRecord::new();
1708 if let Some(entity) = entity.as_ref() {
1709 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1710 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1711 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1712 } else {
1713 record.set("rid", Value::Null);
1714 record.set("created_at", Value::Null);
1715 record.set("updated_at", Value::Null);
1716 }
1717 record.set("collection", Value::text(collection.clone()));
1718 record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1719 record.set("tenant", Value::Null);
1720 record.set("key", Value::text(key.clone()));
1721 record.set(
1722 "value",
1723 value.unwrap_or(crate::storage::schema::Value::Null),
1724 );
1725 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1726 result.push(record);
1727
1728 Ok(RuntimeQueryResult {
1729 query: raw_query.to_string(),
1730 mode: crate::storage::query::modes::QueryMode::Sql,
1731 statement: "kv_get",
1732 engine: "kv",
1733 result,
1734 affected_rows: 0,
1735 statement_type: "select",
1736 bookmark: None,
1737 })
1738 }
1739 KvCommand::Watch {
1740 model,
1741 collection,
1742 key,
1743 prefix,
1744 from_lsn,
1745 } => {
1746 let watch_key = if *prefix {
1747 format!("{key}.*")
1748 } else {
1749 key.clone()
1750 };
1751 let endpoint = match from_lsn {
1752 Some(lsn) => format!(
1753 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1754 keyed_model_name(*model)
1755 ),
1756 None => format!(
1757 "/collections/{collection}/{}/{watch_key}/watch",
1758 keyed_model_name(*model)
1759 ),
1760 };
1761 let mut result = UnifiedResult::with_columns(vec![
1762 "collection".into(),
1763 "key".into(),
1764 "prefix".into(),
1765 "from_lsn".into(),
1766 "watch_url".into(),
1767 "streaming".into(),
1768 ]);
1769 let mut record = UnifiedRecord::new();
1770 record.set("collection", Value::text(collection.clone()));
1771 record.set("key", Value::text(watch_key));
1772 record.set("prefix", Value::Boolean(*prefix));
1773 record.set(
1774 "from_lsn",
1775 from_lsn
1776 .map(Value::UnsignedInteger)
1777 .unwrap_or(crate::storage::schema::Value::Null),
1778 );
1779 record.set("watch_url", Value::text(endpoint));
1780 record.set("streaming", Value::Boolean(true));
1781 result.push(record);
1782
1783 Ok(RuntimeQueryResult {
1784 query: raw_query.to_string(),
1785 mode: crate::storage::query::modes::QueryMode::Sql,
1786 statement: "kv_watch",
1787 engine: keyed_model_name(*model),
1788 result,
1789 affected_rows: 0,
1790 statement_type: "stream",
1791 bookmark: None,
1792 })
1793 }
1794
1795 KvCommand::Unseal {
1796 collection,
1797 key,
1798 version,
1799 } => {
1800 let latest = ops.get_vault_entry(collection, key)?;
1801 let entry = match version {
1802 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1803 None => latest.clone(),
1804 };
1805 let action = match (version, latest.as_ref()) {
1806 (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1807 (Some(_), _) => "vault:unseal_history",
1808 _ => "vault:read",
1809 };
1810 let event_kind = if action == "vault:read" {
1811 crate::runtime::control_events::EventKind::VaultRead
1812 } else {
1813 crate::runtime::control_events::EventKind::VaultUnseal
1814 };
1815 if let Err(reason) = self.check_vault_capability(action, collection, key) {
1816 self.audit_vault_unseal(
1817 collection,
1818 key,
1819 crate::runtime::audit_log::Outcome::Denied,
1820 &reason,
1821 entry.as_ref(),
1822 );
1823 self.emit_vault_control_event(
1824 event_kind,
1825 crate::runtime::control_events::Outcome::Denied,
1826 action,
1827 collection,
1828 key,
1829 &reason,
1830 entry.as_ref(),
1831 Vec::new(),
1832 )?;
1833 return Err(RedDBError::Query(reason));
1834 }
1835 let Some(entry) = entry else {
1836 let reason = "not_found";
1837 self.audit_vault_unseal(
1838 collection,
1839 key,
1840 crate::runtime::audit_log::Outcome::Denied,
1841 reason,
1842 None,
1843 );
1844 self.emit_vault_control_event(
1845 event_kind,
1846 crate::runtime::control_events::Outcome::Denied,
1847 action,
1848 collection,
1849 key,
1850 reason,
1851 None,
1852 Vec::new(),
1853 )?;
1854 return Err(RedDBError::NotFound(format!(
1855 "vault secret '{}.{}' not found",
1856 collection, key
1857 )));
1858 };
1859 if entry.tombstone {
1860 let reason = "deleted";
1861 self.audit_vault_unseal(
1862 collection,
1863 key,
1864 crate::runtime::audit_log::Outcome::Denied,
1865 reason,
1866 Some(&entry),
1867 );
1868 self.emit_vault_control_event(
1869 event_kind,
1870 crate::runtime::control_events::Outcome::Denied,
1871 action,
1872 collection,
1873 key,
1874 reason,
1875 Some(&entry),
1876 Vec::new(),
1877 )?;
1878 return Err(RedDBError::NotFound(format!(
1879 "vault secret '{}.{}' is deleted",
1880 collection, key
1881 )));
1882 }
1883 match self.unseal_vault_value(collection, &entry.value) {
1884 Ok(value) => {
1885 self.audit_vault_unseal(
1886 collection,
1887 key,
1888 crate::runtime::audit_log::Outcome::Success,
1889 "ok",
1890 Some(&entry),
1891 );
1892 self.emit_vault_control_event(
1893 event_kind,
1894 crate::runtime::control_events::Outcome::Allowed,
1895 action,
1896 collection,
1897 key,
1898 "ok",
1899 Some(&entry),
1900 Vec::new(),
1901 )?;
1902 let mut result = UnifiedResult::with_columns(vec![
1903 "collection".into(),
1904 "key".into(),
1905 "value".into(),
1906 ]);
1907 let mut record = UnifiedRecord::new();
1908 record.set("collection", Value::text(collection.clone()));
1909 record.set("key", Value::text(key.clone()));
1910 record.set("value", value);
1911 result.push(record);
1912 Ok(RuntimeQueryResult {
1913 query: raw_query.to_string(),
1914 mode: crate::storage::query::modes::QueryMode::Sql,
1915 statement: "vault_unseal",
1916 engine: "vault",
1917 result,
1918 affected_rows: 0,
1919 statement_type: "select",
1920 bookmark: None,
1921 })
1922 }
1923 Err(err) => {
1924 let reason = err.to_string();
1925 self.audit_vault_unseal(
1926 collection,
1927 key,
1928 crate::runtime::audit_log::Outcome::Error,
1929 &reason,
1930 Some(&entry),
1931 );
1932 self.emit_vault_control_event(
1933 event_kind,
1934 crate::runtime::control_events::Outcome::Error,
1935 action,
1936 collection,
1937 key,
1938 &reason,
1939 Some(&entry),
1940 Vec::new(),
1941 )?;
1942 Err(err)
1943 }
1944 }
1945 }
1946
1947 KvCommand::Incr {
1948 model,
1949 collection,
1950 key,
1951 by,
1952 ttl_ms,
1953 } => {
1954 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1955 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1956
1957 let mut result = UnifiedResult::with_columns(vec![
1958 "ok".into(),
1959 "collection".into(),
1960 "key".into(),
1961 "value".into(),
1962 ]);
1963 let mut record = UnifiedRecord::new();
1964 record.set("ok", Value::Boolean(true));
1965 record.set("collection", Value::text(collection.clone()));
1966 record.set("key", Value::text(key.clone()));
1967 record.set("value", Value::Integer(new_value));
1968 result.push(record);
1969
1970 Ok(RuntimeQueryResult {
1971 query: raw_query.to_string(),
1972 mode: crate::storage::query::modes::QueryMode::Sql,
1973 statement: "kv_incr",
1974 engine: "kv",
1975 result,
1976 affected_rows: 1,
1977 statement_type: "update",
1978 bookmark: None,
1979 })
1980 }
1981
1982 KvCommand::Cas {
1983 model,
1984 collection,
1985 key,
1986 expected,
1987 new_value,
1988 ttl_ms,
1989 } => {
1990 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1991 let (ok, current) = ops.cas(
1992 *model,
1993 collection,
1994 key,
1995 expected.as_ref(),
1996 new_value.clone(),
1997 *ttl_ms,
1998 )?;
1999
2000 let mut result = UnifiedResult::with_columns(vec![
2001 "ok".into(),
2002 "collection".into(),
2003 "key".into(),
2004 "current".into(),
2005 ]);
2006 let mut record = UnifiedRecord::new();
2007 record.set("ok", Value::Boolean(ok));
2008 record.set("collection", Value::text(collection.clone()));
2009 record.set("key", Value::text(key.clone()));
2010 record.set(
2011 "current",
2012 current.unwrap_or(crate::storage::schema::Value::Null),
2013 );
2014 result.push(record);
2015
2016 Ok(RuntimeQueryResult {
2017 query: raw_query.to_string(),
2018 mode: crate::storage::query::modes::QueryMode::Sql,
2019 statement: "kv_cas",
2020 engine: "kv",
2021 result,
2022 affected_rows: if ok { 1 } else { 0 },
2023 statement_type: "update",
2024 bookmark: None,
2025 })
2026 }
2027
2028 KvCommand::Delete {
2029 model,
2030 collection,
2031 key,
2032 } => {
2033 if *model == crate::catalog::CollectionModel::Vault {
2034 self.check_system_vault_capability("vault:write", collection, key)
2035 .map_err(RedDBError::Query)?;
2036 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2037 let entry = ops.append_vault_version(
2038 collection,
2039 key,
2040 Value::Null,
2041 "delete",
2042 true,
2043 &[],
2044 )?;
2045 self.record_kv_watch_event(
2046 crate::replication::cdc::ChangeOperation::Delete,
2047 collection,
2048 key,
2049 entry.id.raw(),
2050 None,
2051 Some(vault_entry_metadata_json(&entry)),
2052 );
2053 self.audit_vault_lifecycle(
2054 "delete",
2055 collection,
2056 key,
2057 crate::runtime::audit_log::Outcome::Success,
2058 "ok",
2059 Some(&entry),
2060 );
2061 return Ok(vault_write_result(
2062 raw_query,
2063 "vault_delete",
2064 "delete",
2065 collection,
2066 key,
2067 &entry,
2068 1,
2069 ));
2070 }
2071 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2072 let deleted = ops.delete(*model, collection, key)?;
2073
2074 let mut result = UnifiedResult::with_columns(vec![
2075 "ok".into(),
2076 "collection".into(),
2077 "key".into(),
2078 "deleted".into(),
2079 ]);
2080 let mut record = UnifiedRecord::new();
2081 record.set("ok", Value::Boolean(true));
2082 record.set("collection", Value::text(collection.clone()));
2083 record.set("key", Value::text(key.clone()));
2084 record.set("deleted", Value::Boolean(deleted));
2085 result.push(record);
2086
2087 Ok(RuntimeQueryResult {
2088 query: raw_query.to_string(),
2089 mode: crate::storage::query::modes::QueryMode::Sql,
2090 statement: "kv_delete",
2091 engine: "kv",
2092 result,
2093 affected_rows: if deleted { 1 } else { 0 },
2094 statement_type: "delete",
2095 bookmark: None,
2096 })
2097 }
2098 }
2099 }
2100
2101 pub fn vault_watch_events_since(
2102 &self,
2103 collection: &str,
2104 key: &str,
2105 since_lsn: u64,
2106 max_count: usize,
2107 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2108 self.kv_watch_events_since(collection, key, since_lsn, max_count)
2109 .into_iter()
2110 .filter(|event| {
2111 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2112 .is_ok()
2113 })
2114 .map(vault_filter_watch_event)
2115 .collect()
2116 }
2117
2118 pub fn vault_watch_events_since_prefix(
2119 &self,
2120 collection: &str,
2121 prefix: &str,
2122 since_lsn: u64,
2123 max_count: usize,
2124 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2125 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2126 .into_iter()
2127 .filter(|event| {
2128 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2129 .is_ok()
2130 })
2131 .map(vault_filter_watch_event)
2132 .collect()
2133 }
2134
2135 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2136 let auth_store = match self.inner.auth_store.read().clone() {
2137 Some(store) => store,
2138 None => return Ok(()),
2139 };
2140 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2141 Some(identity) => identity,
2142 None => return Ok(()),
2143 };
2144 if role < crate::auth::Role::Write {
2145 return Err(RedDBError::Query(format!(
2146 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2147 )));
2148 }
2149 if !auth_store.iam_authorization_enabled() {
2150 return Ok(());
2151 }
2152
2153 let tenant = crate::runtime::impl_core::current_tenant();
2154 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2155 let mut resource =
2156 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2157 if let Some(tenant) = tenant.clone() {
2158 resource = resource.with_tenant(tenant);
2159 }
2160 let ctx = crate::auth::policies::EvalContext {
2161 principal_tenant: tenant.clone(),
2162 current_tenant: tenant,
2163 peer_ip: None,
2164 mfa_present: false,
2165 now_ms: current_unix_ms(),
2166 principal_is_admin_role: role == crate::auth::Role::Admin,
2167 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2168 principal_is_platform_scoped: principal.tenant.is_none(),
2169 };
2170 if auth_store.check_policy_authz_with_role(
2171 &principal,
2172 "kv:invalidate",
2173 &resource,
2174 &ctx,
2175 role,
2176 ) {
2177 Ok(())
2178 } else {
2179 Err(RedDBError::Query(format!(
2180 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2181 )))
2182 }
2183 }
2184}
2185
2186fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2187 let ttl_ms = ttl_ms?;
2188 Some(Metadata::with_fields(
2189 [(
2190 "_ttl_ms".to_string(),
2191 if ttl_ms <= i64::MAX as u64 {
2192 MetadataValue::Int(ttl_ms as i64)
2193 } else {
2194 MetadataValue::Timestamp(ttl_ms)
2195 },
2196 )]
2197 .into_iter()
2198 .collect(),
2199 ))
2200}
2201
2202fn vault_write_result(
2203 raw_query: &str,
2204 statement: &'static str,
2205 statement_type: &'static str,
2206 collection: &str,
2207 key: &str,
2208 entry: &VaultEntry,
2209 affected_rows: u64,
2210) -> RuntimeQueryResult {
2211 let mut result = UnifiedResult::with_columns(vec![
2212 "ok".into(),
2213 "collection".into(),
2214 "key".into(),
2215 "version".into(),
2216 "fingerprint".into(),
2217 "tombstone".into(),
2218 "op".into(),
2219 "id".into(),
2220 ]);
2221 let mut record = UnifiedRecord::new();
2222 record.set("ok", Value::Boolean(true));
2223 record.set("collection", Value::text(collection.to_string()));
2224 record.set("key", Value::text(key.to_string()));
2225 record.set("version", Value::Integer(entry.version));
2226 if entry.tombstone {
2227 record.set("fingerprint", Value::Null);
2228 } else {
2229 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2230 }
2231 record.set("tombstone", Value::Boolean(entry.tombstone));
2232 record.set("op", Value::text(entry.op.clone()));
2233 record.set("id", Value::Integer(entry.id.raw() as i64));
2234 result.push(record);
2235 RuntimeQueryResult {
2236 query: raw_query.to_string(),
2237 mode: crate::storage::query::modes::QueryMode::Sql,
2238 statement,
2239 engine: "vault",
2240 result,
2241 affected_rows,
2242 statement_type,
2243 bookmark: None,
2244 }
2245}
2246
2247fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2248 let mut result = UnifiedResult::with_columns(vec![
2249 "collection".into(),
2250 "key".into(),
2251 "version".into(),
2252 "fingerprint".into(),
2253 "tags".into(),
2254 "created_at".into(),
2255 "updated_at".into(),
2256 "status".into(),
2257 "tombstone".into(),
2258 "op".into(),
2259 ]);
2260 for entry in versions {
2261 push_vault_metadata_record(&mut result, collection, key, entry);
2262 }
2263 result
2264}
2265
2266fn push_vault_metadata_record(
2267 result: &mut UnifiedResult,
2268 collection: &str,
2269 key: &str,
2270 entry: &VaultEntry,
2271) {
2272 let mut record = UnifiedRecord::new();
2273 record.set("collection", Value::text(collection.to_string()));
2274 record.set("key", Value::text(key.to_string()));
2275 record.set("version", Value::Integer(entry.version));
2276 if entry.tombstone {
2277 record.set("fingerprint", Value::Null);
2278 record.set("status", Value::text("deleted"));
2279 } else {
2280 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2281 record.set("status", Value::text("sealed"));
2282 }
2283 record.set("tags", vault_tags_value(&entry.metadata));
2284 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2285 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2286 record.set("tombstone", Value::Boolean(entry.tombstone));
2287 record.set("op", Value::text(entry.op.clone()));
2288 result.push(record);
2289}
2290
2291fn vault_metadata_result(
2292 collection: &str,
2293 key: &str,
2294 entry: Option<&VaultEntry>,
2295 key_available: bool,
2296) -> UnifiedResult {
2297 let mut result = UnifiedResult::with_columns(vec![
2298 "collection".into(),
2299 "key".into(),
2300 "version".into(),
2301 "fingerprint".into(),
2302 "tags".into(),
2303 "created_at".into(),
2304 "updated_at".into(),
2305 "value".into(),
2306 "status".into(),
2307 "tombstone".into(),
2308 "op".into(),
2309 ]);
2310 let mut record = UnifiedRecord::new();
2311 record.set("collection", Value::text(collection.to_string()));
2312 record.set("key", Value::text(key.to_string()));
2313 match entry {
2314 Some(entry) => {
2315 record.set("version", Value::Integer(entry.version));
2316 if entry.tombstone {
2317 record.set("fingerprint", Value::Null);
2318 } else {
2319 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2320 }
2321 record.set("tags", vault_tags_value(&entry.metadata));
2322 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2323 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2324 record.set("value", Value::text("***"));
2325 record.set(
2326 "status",
2327 Value::text(if entry.tombstone {
2328 "deleted"
2329 } else if key_available {
2330 "sealed"
2331 } else {
2332 "sealed_unavailable"
2333 }),
2334 );
2335 record.set("tombstone", Value::Boolean(entry.tombstone));
2336 record.set("op", Value::text(entry.op.clone()));
2337 }
2338 None => {
2339 record.set("version", Value::Null);
2340 record.set("fingerprint", Value::Null);
2341 record.set("tags", Value::Array(Vec::new()));
2342 record.set("created_at", Value::Null);
2343 record.set("updated_at", Value::Null);
2344 record.set("value", Value::text(""));
2345 record.set("status", Value::text("missing"));
2346 record.set("tombstone", Value::Boolean(false));
2347 record.set("op", Value::Null);
2348 }
2349 }
2350 result.push(record);
2351 result
2352}
2353
2354fn vault_fingerprint(value: &Value) -> String {
2355 match value {
2356 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2357 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2358 }
2359}
2360
2361fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2362 let mut object = crate::json::Map::new();
2363 object.insert(
2364 "key".to_string(),
2365 crate::json::Value::String(entry.key.clone()),
2366 );
2367 object.insert(
2368 "version".to_string(),
2369 crate::json::Value::Number(entry.version as f64),
2370 );
2371 object.insert(
2372 "fingerprint".to_string(),
2373 if entry.tombstone {
2374 crate::json::Value::Null
2375 } else {
2376 crate::json::Value::String(vault_fingerprint(&entry.value))
2377 },
2378 );
2379 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2380 object.insert(
2381 "actor".to_string(),
2382 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2383 );
2384 object.insert(
2385 "sequence_id".to_string(),
2386 crate::json::Value::Number(entry.sequence_id as f64),
2387 );
2388 object.insert(
2389 "tombstone".to_string(),
2390 crate::json::Value::Bool(entry.tombstone),
2391 );
2392 object.insert(
2393 "op".to_string(),
2394 crate::json::Value::String(entry.op.clone()),
2395 );
2396 crate::json::Value::Object(object)
2397}
2398
2399fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2400 match vault_tags_value(metadata) {
2401 Value::Array(values) => crate::json::Value::Array(
2402 values
2403 .into_iter()
2404 .filter_map(|value| match value {
2405 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2406 _ => None,
2407 })
2408 .collect(),
2409 ),
2410 _ => crate::json::Value::Array(Vec::new()),
2411 }
2412}
2413
2414fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2415 [(
2416 "tags".to_string(),
2417 MetadataValue::Array(
2418 tags.iter()
2419 .map(|tag| MetadataValue::String(tag.clone()))
2420 .collect(),
2421 ),
2422 )]
2423 .into_iter()
2424 .collect()
2425}
2426
2427fn vault_filter_watch_event(
2428 mut event: crate::replication::cdc::KvWatchEvent,
2429) -> crate::replication::cdc::KvWatchEvent {
2430 event.before = event.before.and_then(vault_metadata_json_only);
2431 event.after = event.after.and_then(vault_metadata_json_only);
2432 event
2433}
2434
2435fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2436 let object = value.as_object()?;
2437 let mut out = crate::json::Map::new();
2438 for field in [
2439 "key",
2440 "version",
2441 "fingerprint",
2442 "tags",
2443 "actor",
2444 "sequence_id",
2445 "tombstone",
2446 "op",
2447 ] {
2448 if let Some(value) = object.get(field) {
2449 out.insert(field.to_string(), value.clone());
2450 }
2451 }
2452 Some(crate::json::Value::Object(out))
2453}
2454
2455fn vault_tags_value(metadata: &Metadata) -> Value {
2456 match metadata.get("tags") {
2457 Some(MetadataValue::Array(values)) => Value::Array(
2458 values
2459 .iter()
2460 .filter_map(|value| match value {
2461 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2462 _ => None,
2463 })
2464 .collect(),
2465 ),
2466 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2467 Value::Array(vec![Value::text(tag.clone())])
2468 }
2469 _ => Value::Array(Vec::new()),
2470 }
2471}
2472
2473fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2474 let bytes = hex::decode(hex_key)
2475 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2476 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2477 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2478 })?;
2479 Ok(key)
2480}
2481
2482fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2483 if tags.is_empty() {
2484 return None;
2485 }
2486 let values = tags
2487 .iter()
2488 .map(|tag| MetadataValue::String(tag.clone()))
2489 .collect();
2490 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2491}
2492
2493fn kv_tags_value(tags: &[String]) -> Value {
2494 let json = crate::json::Value::Array(
2495 tags.iter()
2496 .map(|tag| crate::json::Value::String(tag.clone()))
2497 .collect(),
2498 );
2499 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2500}
2501
2502fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2503 if let crate::storage::EntityData::Row(ref row) = entity.data {
2504 if let Some(ref named) = row.named {
2505 return named.get("value").cloned();
2506 }
2507 }
2508 None
2509}
2510
2511fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2512 let now = current_unix_ms();
2513 crate::physical::CollectionContract {
2514 name: name.to_string(),
2515 declared_model: crate::catalog::CollectionModel::Kv,
2516 schema_mode: crate::catalog::SchemaMode::Dynamic,
2517 origin: crate::physical::ContractOrigin::Implicit,
2518 version: 1,
2519 created_at_unix_ms: now,
2520 updated_at_unix_ms: now,
2521 default_ttl_ms: None,
2522 vector_dimension: None,
2523 vector_metric: None,
2524 context_index_fields: Vec::new(),
2525 declared_columns: Vec::new(),
2526 table_def: None,
2527 timestamps_enabled: false,
2528 context_index_enabled: false,
2529 metrics_raw_retention_ms: None,
2530 metrics_rollup_policies: Vec::new(),
2531 metrics_tenant_identity: None,
2532 metrics_namespace: None,
2533 append_only: false,
2534 subscriptions: Vec::new(),
2535 analytics_config: Vec::new(),
2536 session_key: None,
2537 session_gap_ms: None,
2538 retention_duration_ms: None,
2539 }
2540}
2541
2542fn current_unix_ms() -> u128 {
2543 std::time::SystemTime::now()
2544 .duration_since(std::time::UNIX_EPOCH)
2545 .unwrap_or_default()
2546 .as_millis()
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551 use crate::api::RedDBOptions;
2552 use crate::catalog::CollectionModel;
2553 use crate::runtime::RedDBRuntime;
2554
2555 fn rt() -> RedDBRuntime {
2556 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2557 }
2558
2559 #[test]
2560 fn incr_missing_key_initialises_at_by() {
2561 let r = rt();
2562 let ops = super::KvAtomicOps::new(&r);
2563 let v = ops
2564 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2565 .unwrap();
2566 assert_eq!(v, 5);
2567 }
2568
2569 #[test]
2570 fn kv_runtime_stats_count_public_ops() {
2571 let r = rt();
2572 let ops = super::KvAtomicOps::new(&r);
2573
2574 ops.set(
2575 CollectionModel::Kv,
2576 "kv_default",
2577 "profile",
2578 crate::storage::schema::Value::text("alice"),
2579 None,
2580 false,
2581 )
2582 .unwrap();
2583 ops.get(CollectionModel::Kv, "kv_default", "profile")
2584 .unwrap();
2585 ops.delete(CollectionModel::Kv, "kv_default", "profile")
2586 .unwrap();
2587 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2588 .unwrap();
2589 ops.cas(
2590 CollectionModel::Kv,
2591 "kv_default",
2592 "profile",
2593 None,
2594 crate::storage::schema::Value::text("created"),
2595 None,
2596 )
2597 .unwrap();
2598 ops.cas(
2599 CollectionModel::Kv,
2600 "kv_default",
2601 "profile",
2602 Some(&crate::storage::schema::Value::text("different")),
2603 crate::storage::schema::Value::text("ignored"),
2604 None,
2605 )
2606 .unwrap();
2607
2608 let stats = r.stats().kv;
2609 assert_eq!(stats.puts, 1);
2610 assert_eq!(stats.gets, 1);
2611 assert_eq!(stats.deletes, 1);
2612 assert_eq!(stats.incrs, 1);
2613 assert_eq!(stats.cas_success, 1);
2614 assert_eq!(stats.cas_conflict, 1);
2615 }
2616
2617 #[test]
2618 fn kv_invalidate_tags_removes_matching_entries_only() {
2619 let r = rt();
2620
2621 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2622 .unwrap();
2623
2624 let miss = r
2625 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2626 .unwrap();
2627 assert_eq!(miss.affected_rows, 0);
2628 assert!(matches!(
2629 r.execute_query("KV GET sessions.blob")
2630 .unwrap()
2631 .result
2632 .records[0]
2633 .get("value"),
2634 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2635 ));
2636
2637 let hit = r
2638 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2639 .unwrap();
2640 assert_eq!(hit.affected_rows, 1);
2641 assert!(matches!(
2642 r.execute_query("KV GET sessions.blob")
2643 .unwrap()
2644 .result
2645 .records[0]
2646 .get("value"),
2647 Some(crate::storage::schema::Value::Null)
2648 ));
2649 }
2650
2651 #[test]
2652 fn kv_runtime_stats_count_watch_streams_and_events() {
2653 let r = rt();
2654 let ops = super::KvAtomicOps::new(&r);
2655 assert_eq!(r.stats().kv.watch_streams_active, 0);
2656
2657 {
2658 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2659 assert_eq!(r.stats().kv.watch_streams_active, 1);
2660
2661 ops.set(
2662 CollectionModel::Kv,
2663 "kv_default",
2664 "watched",
2665 crate::storage::schema::Value::Integer(1),
2666 None,
2667 false,
2668 )
2669 .unwrap();
2670 let event = stream.poll_next().expect("watch event");
2671 assert_eq!(event.key, "watched");
2672 assert_eq!(r.stats().kv.watch_events_emitted, 1);
2673
2674 stream.record_drop_count(3);
2675 assert_eq!(r.stats().kv.watch_drops, 3);
2676 }
2677
2678 assert_eq!(r.stats().kv.watch_streams_active, 0);
2679 }
2680
2681 #[test]
2682 fn incr_existing_integer_accumulates() {
2683 let r = rt();
2684 let ops = super::KvAtomicOps::new(&r);
2685 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2686 .unwrap();
2687 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2688 .unwrap();
2689 let v = ops
2690 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2691 .unwrap();
2692 assert_eq!(v, 3);
2693 }
2694
2695 #[test]
2696 fn decr_via_negative_by() {
2697 let r = rt();
2698 let ops = super::KvAtomicOps::new(&r);
2699 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2700 .unwrap();
2701 let v = ops
2702 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2703 .unwrap();
2704 assert_eq!(v, 7);
2705 }
2706
2707 #[test]
2708 fn concurrent_incr_single_key_is_atomic() {
2709 const THREADS: usize = 8;
2710 const ITERS: usize = 1000;
2711
2712 let runtime = std::sync::Arc::new(rt());
2713 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2714 let mut handles = Vec::new();
2715
2716 for _ in 0..THREADS {
2717 let runtime = std::sync::Arc::clone(&runtime);
2718 let barrier = std::sync::Arc::clone(&barrier);
2719 handles.push(std::thread::spawn(move || {
2720 let ops = super::KvAtomicOps::new(&runtime);
2721 barrier.wait();
2722 for _ in 0..ITERS {
2723 ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2724 .unwrap();
2725 }
2726 }));
2727 }
2728
2729 for handle in handles {
2730 handle.join().expect("worker should finish");
2731 }
2732
2733 let ops = super::KvAtomicOps::new(&runtime);
2734 assert_eq!(
2735 ops.get(CollectionModel::Kv, "kv_default", "counter")
2736 .unwrap(),
2737 Some(crate::storage::schema::Value::Integer(
2738 (THREADS * ITERS) as i64
2739 ))
2740 );
2741 }
2742
2743 #[test]
2744 fn incr_on_string_value_returns_error() {
2745 let r = rt();
2746 let ops = super::KvAtomicOps::new(&r);
2747 ops.set(
2748 CollectionModel::Kv,
2749 "kv_default",
2750 "name",
2751 crate::storage::schema::Value::text("alice"),
2752 None,
2753 false,
2754 )
2755 .unwrap();
2756 let err = ops
2757 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2758 .unwrap_err();
2759 assert!(err.to_string().contains("non-integer"));
2760 }
2761
2762 #[test]
2765 fn cas_matching_value_succeeds() {
2766 let r = rt();
2767 let ops = super::KvAtomicOps::new(&r);
2768 ops.set(
2769 CollectionModel::Kv,
2770 "kv_default",
2771 "lock",
2772 crate::storage::schema::Value::text("free"),
2773 None,
2774 false,
2775 )
2776 .unwrap();
2777 let (ok, prev) = ops
2778 .cas(
2779 CollectionModel::Kv,
2780 "kv_default",
2781 "lock",
2782 Some(&crate::storage::schema::Value::text("free")),
2783 crate::storage::schema::Value::text("held"),
2784 None,
2785 )
2786 .unwrap();
2787 assert!(ok);
2788 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2789 assert_eq!(
2791 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2792 Some(crate::storage::schema::Value::text("held"))
2793 );
2794 }
2795
2796 #[test]
2797 fn concurrent_cas_allows_one_success_per_round() {
2798 const THREADS: usize = 8;
2799 const ROUNDS: usize = 100;
2800
2801 let runtime = std::sync::Arc::new(rt());
2802 let ops = super::KvAtomicOps::new(&runtime);
2803 ops.set(
2804 CollectionModel::Kv,
2805 "kv_default",
2806 "cas_counter",
2807 crate::storage::schema::Value::Integer(0),
2808 None,
2809 false,
2810 )
2811 .unwrap();
2812
2813 let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2814 let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2815 let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2816 let mut handles = Vec::new();
2817
2818 for _ in 0..THREADS {
2819 let runtime = std::sync::Arc::clone(&runtime);
2820 let start_round = std::sync::Arc::clone(&start_round);
2821 let finish_round = std::sync::Arc::clone(&finish_round);
2822 let successes = std::sync::Arc::clone(&successes);
2823 handles.push(std::thread::spawn(move || {
2824 let ops = super::KvAtomicOps::new(&runtime);
2825 for round in 0..ROUNDS {
2826 start_round.wait();
2827 let (ok, _) = ops
2828 .cas(
2829 CollectionModel::Kv,
2830 "kv_default",
2831 "cas_counter",
2832 Some(&crate::storage::schema::Value::Integer(round as i64)),
2833 crate::storage::schema::Value::Integer((round + 1) as i64),
2834 None,
2835 )
2836 .unwrap();
2837 if ok {
2838 successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2839 }
2840 finish_round.wait();
2841 }
2842 }));
2843 }
2844
2845 for handle in handles {
2846 handle.join().expect("worker should finish");
2847 }
2848
2849 assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2850 assert_eq!(
2851 ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2852 .unwrap(),
2853 Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2854 );
2855 }
2856
2857 #[test]
2858 fn cas_mismatching_value_fails() {
2859 let r = rt();
2860 let ops = super::KvAtomicOps::new(&r);
2861 ops.set(
2862 CollectionModel::Kv,
2863 "kv_default",
2864 "lock",
2865 crate::storage::schema::Value::text("free"),
2866 None,
2867 false,
2868 )
2869 .unwrap();
2870 let (ok, current) = ops
2871 .cas(
2872 CollectionModel::Kv,
2873 "kv_default",
2874 "lock",
2875 Some(&crate::storage::schema::Value::text("held")),
2876 crate::storage::schema::Value::text("worker-7"),
2877 None,
2878 )
2879 .unwrap();
2880 assert!(!ok);
2881 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2882 assert_eq!(
2884 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2885 Some(crate::storage::schema::Value::text("free"))
2886 );
2887 }
2888
2889 #[test]
2890 fn cas_expect_null_on_missing_key_creates() {
2891 let r = rt();
2892 let ops = super::KvAtomicOps::new(&r);
2893 let (ok, prev) = ops
2894 .cas(
2895 CollectionModel::Kv,
2896 "kv_default",
2897 "new_key",
2898 None,
2899 crate::storage::schema::Value::text("created"),
2900 None,
2901 )
2902 .unwrap();
2903 assert!(ok);
2904 assert_eq!(prev, None);
2905 assert_eq!(
2906 ops.get(CollectionModel::Kv, "kv_default", "new_key")
2907 .unwrap(),
2908 Some(crate::storage::schema::Value::text("created"))
2909 );
2910 }
2911
2912 #[test]
2913 fn cas_expect_null_on_existing_key_fails() {
2914 let r = rt();
2915 let ops = super::KvAtomicOps::new(&r);
2916 ops.set(
2917 CollectionModel::Kv,
2918 "kv_default",
2919 "taken",
2920 crate::storage::schema::Value::text("worker-1"),
2921 None,
2922 false,
2923 )
2924 .unwrap();
2925 let (ok, current) = ops
2926 .cas(
2927 CollectionModel::Kv,
2928 "kv_default",
2929 "taken",
2930 None,
2931 crate::storage::schema::Value::text("worker-2"),
2932 None,
2933 )
2934 .unwrap();
2935 assert!(!ok);
2936 assert_eq!(
2937 current,
2938 Some(crate::storage::schema::Value::text("worker-1"))
2939 );
2940 }
2941
2942 #[test]
2943 fn cas_via_sql_roundtrip() {
2944 let r = rt();
2945 r.execute_query("KV PUT lock = 'free'").unwrap();
2947 let res = r
2949 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2950 .unwrap();
2951 let row = &res.result.records[0];
2952 assert_eq!(
2953 row.get("ok"),
2954 Some(&crate::storage::schema::Value::Boolean(true))
2955 );
2956 let res2 = r
2958 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2959 .unwrap();
2960 let row2 = &res2.result.records[0];
2961 assert_eq!(
2962 row2.get("ok"),
2963 Some(&crate::storage::schema::Value::Boolean(false))
2964 );
2965 }
2966
2967 #[test]
2968 fn cas_expect_null_via_sql() {
2969 let r = rt();
2970 let res = r
2971 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2972 .unwrap();
2973 let row = &res.result.records[0];
2974 assert_eq!(
2975 row.get("ok"),
2976 Some(&crate::storage::schema::Value::Boolean(true))
2977 );
2978 let res2 = r
2980 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2981 .unwrap();
2982 let row2 = &res2.result.records[0];
2983 assert_eq!(
2984 row2.get("ok"),
2985 Some(&crate::storage::schema::Value::Boolean(false))
2986 );
2987 }
2988
2989 #[test]
2990 fn incr_via_sql_roundtrip() {
2991 let r = rt();
2992 let res = r.execute_query("KV INCR hits").unwrap();
2993 let row = &res.result.records[0];
2994 assert_eq!(
2995 row.get("value"),
2996 Some(&crate::storage::schema::Value::Integer(1))
2997 );
2998 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2999 let row2 = &res2.result.records[0];
3000 assert_eq!(
3001 row2.get("value"),
3002 Some(&crate::storage::schema::Value::Integer(5))
3003 );
3004 }
3005
3006 #[test]
3007 fn concurrent_self_referential_update_is_atomic() {
3008 const THREADS: usize = 8;
3009 const ITERS: usize = 100;
3010
3011 let runtime = std::sync::Arc::new(rt());
3012 runtime
3013 .execute_query("CREATE TABLE counters (id INT, n INT)")
3014 .unwrap();
3015 runtime
3016 .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3017 .unwrap();
3018
3019 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3020 let mut handles = Vec::new();
3021 for _ in 0..THREADS {
3022 let runtime = std::sync::Arc::clone(&runtime);
3023 let barrier = std::sync::Arc::clone(&barrier);
3024 handles.push(std::thread::spawn(move || {
3025 barrier.wait();
3026 for _ in 0..ITERS {
3027 runtime
3028 .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3029 .unwrap();
3030 }
3031 }));
3032 }
3033
3034 for handle in handles {
3035 handle.join().expect("worker should finish");
3036 }
3037
3038 let selected = runtime
3039 .execute_query("SELECT n FROM counters WHERE id = 1")
3040 .unwrap();
3041 assert_eq!(
3042 selected.result.records[0].get("n"),
3043 Some(&crate::storage::schema::Value::Integer(
3044 (THREADS * ITERS) as i64
3045 ))
3046 );
3047 }
3048
3049 #[test]
3050 fn watch_stream_delivers_key_events_in_lsn_order() {
3051 let r = rt();
3052 let ops = super::KvAtomicOps::new(&r);
3053 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3054
3055 ops.set(
3056 CollectionModel::Kv,
3057 "kv_default",
3058 "seq",
3059 crate::storage::schema::Value::Integer(1),
3060 None,
3061 false,
3062 )
3063 .unwrap();
3064 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3065 .unwrap();
3066 ops.delete(CollectionModel::Kv, "kv_default", "seq")
3067 .unwrap();
3068 ops.set(
3069 CollectionModel::Kv,
3070 "kv_default",
3071 "seq",
3072 crate::storage::schema::Value::Integer(9),
3073 None,
3074 false,
3075 )
3076 .unwrap();
3077
3078 let mut events = Vec::new();
3079 while let Some(event) = stream.poll_next() {
3080 events.push(event);
3081 if events.len() == 4 {
3082 break;
3083 }
3084 }
3085
3086 assert_eq!(events.len(), 4);
3087 assert_eq!(
3088 events[0].op,
3089 crate::replication::cdc::ChangeOperation::Insert
3090 );
3091 assert_eq!(
3092 events[1].op,
3093 crate::replication::cdc::ChangeOperation::Update
3094 );
3095 assert_eq!(
3096 events[2].op,
3097 crate::replication::cdc::ChangeOperation::Delete
3098 );
3099 assert_eq!(
3100 events[3].op,
3101 crate::replication::cdc::ChangeOperation::Insert
3102 );
3103 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3104 }
3105
3106 #[test]
3107 fn watch_prefix_stream_delivers_matching_events_only() {
3108 let r = rt();
3109 let ops = super::KvAtomicOps::new(&r);
3110 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3111
3112 ops.set(
3113 CollectionModel::Kv,
3114 "kv_default",
3115 "acct:1",
3116 crate::storage::schema::Value::Integer(1),
3117 None,
3118 false,
3119 )
3120 .unwrap();
3121 ops.set(
3122 CollectionModel::Kv,
3123 "kv_default",
3124 "session:1",
3125 crate::storage::schema::Value::Integer(2),
3126 None,
3127 false,
3128 )
3129 .unwrap();
3130 ops.set(
3131 CollectionModel::Kv,
3132 "kv_default",
3133 "acct:2",
3134 crate::storage::schema::Value::Integer(3),
3135 None,
3136 false,
3137 )
3138 .unwrap();
3139
3140 let first = stream.poll_next().expect("first prefix event");
3141 let second = stream.poll_next().expect("second prefix event");
3142 assert_eq!(first.key, "acct:1");
3143 assert_eq!(second.key, "acct:2");
3144 assert!(stream.poll_next().is_none());
3145 }
3146
3147 #[test]
3148 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3149 let r = rt();
3150 let ops = super::KvAtomicOps::new(&r);
3151 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3152
3153 let mut last_seen_lsn = 0;
3154 for value in 0..5 {
3155 ops.set(
3156 CollectionModel::Kv,
3157 "kv_default",
3158 "resume",
3159 crate::storage::schema::Value::Integer(value),
3160 None,
3161 false,
3162 )
3163 .unwrap();
3164 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3165 }
3166 drop(stream);
3167
3168 for value in 5..55 {
3169 ops.set(
3170 CollectionModel::Kv,
3171 "kv_default",
3172 "resume",
3173 crate::storage::schema::Value::Integer(value),
3174 None,
3175 false,
3176 )
3177 .unwrap();
3178 }
3179
3180 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3181 let mut lsns = Vec::new();
3182 while let Some(event) = resumed.poll_next() {
3183 lsns.push(event.lsn);
3184 if lsns.len() == 50 {
3185 break;
3186 }
3187 }
3188
3189 assert_eq!(lsns.len(), 50);
3190 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3191 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3192 assert!(resumed.poll_next().is_none());
3193 }
3194
3195 #[test]
3196 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3197 let r = rt();
3198 let ops = super::KvAtomicOps::new(&r);
3199 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3200
3201 for value in 0..10_000 {
3202 ops.set(
3203 CollectionModel::Kv,
3204 "kv_default",
3205 "slow",
3206 crate::storage::schema::Value::Integer(value),
3207 None,
3208 false,
3209 )
3210 .unwrap();
3211 }
3212
3213 let event = stream.poll_next().expect("tail event after drops");
3214 assert!(event.lsn > 1);
3215 assert!(event.dropped_event_count > 0);
3216 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3217 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3218 }
3219
3220 #[test]
3221 fn watch_stream_idle_timeout_closes_subscription() {
3222 let r = rt();
3223 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3224 .unwrap();
3225
3226 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3227 assert_eq!(r.stats().kv.watch_streams_active, 1);
3228 std::thread::sleep(std::time::Duration::from_millis(5));
3229
3230 assert!(stream.poll_next().is_none());
3231 assert_eq!(r.stats().kv.watch_streams_active, 0);
3232 }
3233
3234 #[test]
3235 fn watch_stream_does_not_emit_rolled_back_put() {
3236 let r = rt();
3237 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3238
3239 r.execute_query("BEGIN").unwrap();
3240 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3241 r.execute_query("ROLLBACK").unwrap();
3242
3243 assert!(stream.poll_next().is_none());
3244 }
3245}