1use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20 match model {
21 crate::catalog::CollectionModel::Kv => "kv",
22 crate::catalog::CollectionModel::Vault => "vault",
23 crate::catalog::CollectionModel::Config => "config",
24 _ => "collection",
25 }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30 id: crate::storage::EntityId,
31 key: String,
32 value: crate::storage::schema::Value,
33 metadata: Metadata,
34 created_at: u64,
35 updated_at: u64,
36 sequence_id: u64,
37 version: i64,
38 tombstone: bool,
39 op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43 fn key(&self) -> &str {
44 &self.key
45 }
46
47 fn version(&self) -> i64 {
48 self.version
49 }
50}
51
52impl VaultEntry {
53 fn from_keyed_row(
54 version: super::keyed_spine::KeyedRowVersion,
55 metadata: Metadata,
56 created_at: u64,
57 updated_at: u64,
58 sequence_id: u64,
59 ) -> Self {
60 Self {
61 id: version.id,
62 key: version.key,
63 value: version.value,
64 metadata,
65 created_at,
66 updated_at,
67 sequence_id,
68 version: version.version,
69 tombstone: version.tombstone,
70 op: version.op,
71 }
72 }
73}
74
75pub struct KvAtomicOps<'a> {
80 runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84 pub fn new(runtime: &'a RedDBRuntime) -> Self {
85 Self { runtime }
86 }
87
88 pub fn set(
92 &self,
93 model: crate::catalog::CollectionModel,
94 collection: &str,
95 key: &str,
96 value: crate::storage::schema::Value,
97 ttl_ms: Option<u64>,
98 if_not_exists: bool,
99 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100 self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101 }
102
103 pub fn set_with_tags(
104 &self,
105 collection: &str,
106 key: &str,
107 value: crate::storage::schema::Value,
108 ttl_ms: Option<u64>,
109 tags: &[String],
110 if_not_exists: bool,
111 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112 self.set_with_tags_for_model(
113 crate::catalog::CollectionModel::Kv,
114 collection,
115 key,
116 value,
117 ttl_ms,
118 tags,
119 if_not_exists,
120 )
121 }
122
123 fn set_with_tags_for_model(
124 &self,
125 model: crate::catalog::CollectionModel,
126 collection: &str,
127 key: &str,
128 value: crate::storage::schema::Value,
129 ttl_ms: Option<u64>,
130 tags: &[String],
131 if_not_exists: bool,
132 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133 self.set_with_tags_and_metadata_for_model(
134 model,
135 collection,
136 key,
137 value,
138 ttl_ms,
139 tags,
140 if_not_exists,
141 Vec::new(),
142 )
143 }
144
145 pub fn set_with_tags_and_metadata(
146 &self,
147 collection: &str,
148 key: &str,
149 value: crate::storage::schema::Value,
150 ttl_ms: Option<u64>,
151 tags: &[String],
152 if_not_exists: bool,
153 metadata: Vec<(String, MetadataValue)>,
154 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155 self.set_with_tags_and_metadata_for_model(
156 crate::catalog::CollectionModel::Kv,
157 collection,
158 key,
159 value,
160 ttl_ms,
161 tags,
162 if_not_exists,
163 metadata,
164 )
165 }
166
167 fn set_with_tags_and_metadata_for_model(
168 &self,
169 model: crate::catalog::CollectionModel,
170 collection: &str,
171 key: &str,
172 value: crate::storage::schema::Value,
173 ttl_ms: Option<u64>,
174 tags: &[String],
175 if_not_exists: bool,
176 mut metadata: Vec<(String, MetadataValue)>,
177 ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178 self.ensure_keyed_collection(model, collection)?;
179
180 if model == crate::catalog::CollectionModel::Vault {
181 let latest = self.get_vault_entry(collection, key)?;
182 let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183 if if_not_exists && was_present {
184 return Ok((false, latest.expect("checked present").id));
185 }
186 let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187 self.runtime.record_kv_watch_event(
188 if latest.is_some() {
189 crate::replication::cdc::ChangeOperation::Update
190 } else {
191 crate::replication::cdc::ChangeOperation::Insert
192 },
193 collection,
194 key,
195 entry.id.raw(),
196 latest.as_ref().map(vault_entry_metadata_json),
197 Some(vault_entry_metadata_json(&entry)),
198 );
199 return Ok((!was_present, entry.id));
200 }
201
202 let existing = self.get_entry(model, collection, key)?;
203 let was_present = existing.is_some();
204
205 if if_not_exists && was_present {
206 let (_, id) = existing.unwrap();
207 self.runtime.inner.kv_stats.incr_puts();
208 return Ok((false, id));
209 }
210
211 let before = existing
212 .as_ref()
213 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214 let op = if was_present {
215 crate::replication::cdc::ChangeOperation::Update
216 } else {
217 crate::replication::cdc::ChangeOperation::Insert
218 };
219 let after = Some(crate::presentation::entity_json::storage_value_to_json(
220 &value,
221 ));
222
223 if was_present {
225 self.delete(model, collection, key)?;
226 }
227
228 if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229 metadata.extend(ttl_metadata.fields);
230 }
231 if let Some(tags_metadata) = kv_tags_metadata(tags) {
232 metadata.push(tags_metadata);
233 }
234
235 let output = self
236 .runtime
237 .create_kv(crate::application::entity::CreateKvInput {
238 collection: collection.to_string(),
239 key: key.to_string(),
240 value,
241 metadata,
242 })?;
243 if model == crate::catalog::CollectionModel::Kv {
244 self.runtime
245 .inner
246 .kv_tag_index
247 .replace(collection, key, output.id, tags);
248 }
249
250 if model == crate::catalog::CollectionModel::Kv {
251 self.runtime
252 .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253 }
254
255 if model == crate::catalog::CollectionModel::Kv {
256 self.runtime.inner.kv_stats.incr_puts();
257 }
258 Ok((!was_present, output.id))
259 }
260
261 pub fn get(
263 &self,
264 model: crate::catalog::CollectionModel,
265 collection: &str,
266 key: &str,
267 ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268 let result = self.get_entry(model, collection, key)?;
269 if model == crate::catalog::CollectionModel::Kv {
270 self.runtime.inner.kv_stats.incr_gets();
271 }
272 Ok(result.map(|(v, _)| v))
273 }
274
275 pub fn delete(
277 &self,
278 model: crate::catalog::CollectionModel,
279 collection: &str,
280 key: &str,
281 ) -> RedDBResult<bool> {
282 self.ensure_declared_model(model, collection)?;
283 let found = self.get_entry(model, collection, key)?;
284 if let Some((value, id)) = found {
285 let store = self.runtime.inner.db.store();
286 let deleted = store
287 .delete(collection, id)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 if deleted {
290 store.context_index().remove_entity(id);
291 if model == crate::catalog::CollectionModel::Kv {
292 self.runtime.inner.kv_tag_index.remove(collection, key);
293 self.runtime.record_kv_watch_event(
294 crate::replication::cdc::ChangeOperation::Delete,
295 collection,
296 key,
297 id.raw(),
298 Some(crate::presentation::entity_json::storage_value_to_json(
299 &value,
300 )),
301 None,
302 );
303 self.runtime.inner.kv_stats.incr_deletes();
304 }
305 }
306 Ok(deleted)
307 } else {
308 Ok(false)
309 }
310 }
311
312 pub fn incr(
317 &self,
318 model: crate::catalog::CollectionModel,
319 collection: &str,
320 key: &str,
321 by: i64,
322 ttl_ms: Option<u64>,
323 ) -> RedDBResult<i64> {
324 if model == crate::catalog::CollectionModel::Vault {
325 return Err(RedDBError::InvalidOperation(
326 "VAULT INCR is not supported for sealed secrets".to_string(),
327 ));
328 }
329 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330 let _rmw_guard = rmw_lock.lock();
331 self.ensure_kv_collection(collection)?;
332 let existing = self.runtime.get_kv(collection, key)?;
333 let current: i64 = match existing.as_ref() {
334 None => 0,
335 Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336 Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337 Some((other, _)) => {
338 return Err(RedDBError::Internal(format!(
339 "INCR on non-integer value: {:?}",
340 other
341 )));
342 }
343 };
344
345 let next = current
346 .checked_add(by)
347 .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349 if existing.is_some() {
351 self.runtime.delete_kv(collection, key)?;
352 }
353
354 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355 .map(|m| m.fields.into_iter().collect())
356 .unwrap_or_default();
357
358 let output = self
359 .runtime
360 .create_kv(crate::application::entity::CreateKvInput {
361 collection: collection.to_string(),
362 key: key.to_string(),
363 value: crate::storage::schema::Value::Integer(next),
364 metadata: meta_vec,
365 })?;
366 self.runtime
367 .inner
368 .kv_tag_index
369 .replace(collection, key, output.id, &[]);
370
371 self.runtime.record_kv_watch_event(
372 if existing.is_some() {
373 crate::replication::cdc::ChangeOperation::Update
374 } else {
375 crate::replication::cdc::ChangeOperation::Insert
376 },
377 collection,
378 key,
379 output.id.raw(),
380 existing
381 .as_ref()
382 .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383 Some(crate::presentation::entity_json::storage_value_to_json(
384 &crate::storage::schema::Value::Integer(next),
385 )),
386 );
387
388 self.runtime.inner.kv_stats.incr_incrs();
389 Ok(next)
390 }
391
392 pub fn cas(
400 &self,
401 model: crate::catalog::CollectionModel,
402 collection: &str,
403 key: &str,
404 expected: Option<&crate::storage::schema::Value>,
405 new_value: crate::storage::schema::Value,
406 ttl_ms: Option<u64>,
407 ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408 if model == crate::catalog::CollectionModel::Vault {
409 return Err(RedDBError::InvalidOperation(
410 "VAULT CAS is not supported for sealed secrets".to_string(),
411 ));
412 }
413 let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414 let _rmw_guard = rmw_lock.lock();
415 self.ensure_kv_collection(collection)?;
416 let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418 let matches = match (¤t, expected) {
419 (None, None) => true,
420 (Some(cur), Some(exp)) => cur == exp,
421 _ => false,
422 };
423
424 if !matches {
425 self.runtime.inner.kv_stats.incr_cas_conflict();
426 return Ok((false, current));
427 }
428
429 if current.is_some() {
431 self.runtime.delete_kv(collection, key)?;
432 }
433
434 let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435 .map(|m| m.fields.into_iter().collect())
436 .unwrap_or_default();
437
438 let output = self
439 .runtime
440 .create_kv(crate::application::entity::CreateKvInput {
441 collection: collection.to_string(),
442 key: key.to_string(),
443 value: new_value.clone(),
444 metadata: meta_vec,
445 })?;
446 self.runtime
447 .inner
448 .kv_tag_index
449 .replace(collection, key, output.id, &[]);
450
451 self.runtime.record_kv_watch_event(
452 if current.is_some() {
453 crate::replication::cdc::ChangeOperation::Update
454 } else {
455 crate::replication::cdc::ChangeOperation::Insert
456 },
457 collection,
458 key,
459 output.id.raw(),
460 current
461 .as_ref()
462 .map(crate::presentation::entity_json::storage_value_to_json),
463 Some(crate::presentation::entity_json::storage_value_to_json(
464 &new_value,
465 )),
466 );
467
468 self.runtime.inner.kv_stats.incr_cas_success();
469 Ok((true, current))
470 }
471
472 pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473 self.runtime
474 .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475 self.runtime.check_kv_invalidate_policy(collection)?;
476 self.ensure_kv_collection(collection)?;
477 let entries = self
478 .runtime
479 .inner
480 .kv_tag_index
481 .entries_for_tags(collection, tags);
482 if entries.is_empty() {
483 return Ok(0);
484 }
485
486 let store = self.runtime.inner.db.store();
487 let mut removed = 0usize;
488 for (key, id) in entries {
489 let before = store
490 .get(collection, id)
491 .and_then(|entity| kv_value_from_entity(&entity));
492 let deleted = store
493 .delete(collection, id)
494 .map_err(|err| RedDBError::Internal(err.to_string()))?;
495 if deleted {
496 store.context_index().remove_entity(id);
497 self.runtime.inner.kv_tag_index.remove(collection, &key);
498 self.runtime.record_kv_watch_event(
499 crate::replication::cdc::ChangeOperation::Delete,
500 collection,
501 &key,
502 id.raw(),
503 before
504 .as_ref()
505 .map(crate::presentation::entity_json::storage_value_to_json),
506 None,
507 );
508 removed += 1;
509 }
510 }
511 if removed > 0 {
512 self.runtime.inner.kv_stats.incr_deletes();
513 }
514 Ok(removed)
515 }
516
517 pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518 self.runtime
519 .inner
520 .kv_tag_index
521 .tags_for_key(collection, key)
522 }
523
524 fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526 self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527 }
528
529 fn ensure_keyed_collection(
530 &self,
531 model: crate::catalog::CollectionModel,
532 collection: &str,
533 ) -> RedDBResult<()> {
534 let store = self.runtime.inner.db.store();
535 if store.get_collection(collection).is_some() {
536 return self.ensure_declared_model(model, collection);
537 }
538 if model != crate::catalog::CollectionModel::Kv {
539 return Err(RedDBError::NotFound(format!(
540 "{} collection '{collection}' does not exist",
541 keyed_model_name(model)
542 )));
543 }
544 let auto_create = self
546 .runtime
547 .config_bool("red.config.kv.default_collection", true);
548 if !auto_create {
549 return Err(RedDBError::NotFound(format!(
550 "kv collection '{collection}' does not exist and auto-create is disabled \
551 (red.config.kv.default_collection = false)"
552 )));
553 }
554 store
555 .create_collection(collection)
556 .map_err(|err| RedDBError::Internal(err.to_string()))?;
557 self.runtime
558 .inner
559 .db
560 .save_collection_contract(kv_collection_contract(collection))
561 .map_err(|err| RedDBError::Internal(err.to_string()))?;
562 Ok(())
563 }
564
565 fn get_entry(
566 &self,
567 model: crate::catalog::CollectionModel,
568 collection: &str,
569 key: &str,
570 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571 let Some(entity) = self.get_entity(model, collection, key)? else {
572 return Ok(None);
573 };
574 Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575 }
576
577 fn get_entity(
578 &self,
579 model: crate::catalog::CollectionModel,
580 collection: &str,
581 key: &str,
582 ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583 self.ensure_declared_model(model, collection)?;
584 let store = self.runtime.inner.db.store();
585 let Some(manager) = store.get_collection(collection) else {
586 return Ok(None);
587 };
588 let entities = manager.query_all(|_| true);
589 for entity in entities {
590 if let crate::storage::EntityData::Row(ref row) = entity.data {
591 if let Some(ref named) = row.named {
592 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593 if &**k == key {
594 return Ok(Some(entity));
595 }
596 }
597 }
598 }
599 }
600 Ok(None)
601 }
602
603 fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604 self.vault_versions(collection, key)
605 .map(super::keyed_spine::latest_version)
606 }
607
608 fn get_vault_entry_version(
609 &self,
610 collection: &str,
611 key: &str,
612 version: i64,
613 ) -> RedDBResult<Option<VaultEntry>> {
614 Ok(self
615 .vault_versions(collection, key)?
616 .into_iter()
617 .find(|entry| entry.version == version))
618 }
619
620 fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622 let store = self.runtime.inner.db.store();
623 let Some(manager) = store.get_collection(collection) else {
624 return Ok(Vec::new());
625 };
626 let entities = manager.query_all(|_| true);
627 let mut versions = Vec::new();
628 for entity in entities {
629 let crate::storage::EntityData::Row(ref row) = entity.data else {
630 continue;
631 };
632 let Some(version) =
633 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634 else {
635 continue;
636 };
637 if version.key != key {
638 continue;
639 }
640 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641 versions.push(VaultEntry::from_keyed_row(
642 version,
643 metadata,
644 entity.created_at,
645 entity.updated_at,
646 entity.sequence_id,
647 ));
648 }
649 Ok(versions)
650 }
651
652 fn latest_vault_entries(
653 &self,
654 collection: &str,
655 prefix: Option<&str>,
656 ) -> RedDBResult<Vec<VaultEntry>> {
657 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658 let store = self.runtime.inner.db.store();
659 let Some(manager) = store.get_collection(collection) else {
660 return Ok(Vec::new());
661 };
662 let mut versions = Vec::new();
663 for entity in manager.query_all(|_| true) {
664 let crate::storage::EntityData::Row(ref row) = entity.data else {
665 continue;
666 };
667 let Some(version) =
668 super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669 else {
670 continue;
671 };
672 let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673 let entry = VaultEntry::from_keyed_row(
674 version,
675 metadata,
676 entity.created_at,
677 entity.updated_at,
678 entity.sequence_id,
679 );
680 versions.push(entry);
681 }
682 Ok(super::keyed_spine::latest_versions(versions, prefix))
683 }
684
685 fn append_vault_version(
686 &self,
687 collection: &str,
688 key: &str,
689 value: crate::storage::schema::Value,
690 op: &str,
691 tombstone: bool,
692 tags: &[String],
693 ) -> RedDBResult<VaultEntry> {
694 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695 let version = self
696 .get_vault_entry(collection, key)?
697 .map(|entry| entry.version)
698 .unwrap_or(0)
699 + 1;
700 let stored_value = if tombstone {
701 crate::storage::schema::Value::Null
702 } else {
703 self.runtime.seal_vault_value(collection, value)?
704 };
705 let now = current_unix_ms() as i64;
706 let fields = vec![
707 (
708 "key".to_string(),
709 crate::storage::schema::Value::text(key.to_string()),
710 ),
711 ("value".to_string(), stored_value),
712 (
713 "version".to_string(),
714 crate::storage::schema::Value::Integer(version),
715 ),
716 (
717 "tombstone".to_string(),
718 crate::storage::schema::Value::Boolean(tombstone),
719 ),
720 (
721 "op".to_string(),
722 crate::storage::schema::Value::text(op.to_string()),
723 ),
724 (
725 "created_at_ms".to_string(),
726 crate::storage::schema::Value::Integer(now),
727 ),
728 ];
729 let mut row = crate::storage::RowData::new(Vec::new());
730 row.named = Some(fields.into_iter().collect());
731 let entity = crate::storage::UnifiedEntity::new(
732 crate::storage::EntityId::new(0),
733 crate::storage::EntityKind::TableRow {
734 table: std::sync::Arc::from(collection),
735 row_id: 0,
736 },
737 crate::storage::EntityData::Row(row),
738 );
739 let id = self
740 .runtime
741 .inner
742 .db
743 .store()
744 .insert(collection, entity)
745 .map_err(|err| RedDBError::Internal(err.to_string()))?;
746 if !tags.is_empty() {
747 self.runtime
748 .inner
749 .db
750 .store()
751 .set_metadata(
752 collection,
753 id,
754 Metadata::with_fields(vault_tags_metadata(tags)),
755 )
756 .map_err(|err| RedDBError::Internal(err.to_string()))?;
757 self.runtime
758 .inner
759 .kv_tag_index
760 .replace(collection, key, id, tags);
761 }
762 self.get_vault_entry_version(collection, key, version)?
763 .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764 }
765
766 fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767 self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768 let versions = self.vault_versions(collection, key)?;
769 let store = self.runtime.inner.db.store();
770 let mut purged = 0usize;
771 for entry in versions {
772 if store
773 .delete(collection, entry.id)
774 .map_err(|err| RedDBError::Internal(err.to_string()))?
775 {
776 store.context_index().remove_entity(entry.id);
777 purged += 1;
778 }
779 }
780 Ok(purged)
781 }
782
783 fn ensure_declared_model(
784 &self,
785 model: crate::catalog::CollectionModel,
786 collection: &str,
787 ) -> RedDBResult<()> {
788 let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789 return Ok(());
790 };
791 if contract.declared_model == model
792 || contract.declared_model == crate::catalog::CollectionModel::Mixed
793 {
794 return Ok(());
795 }
796 Err(RedDBError::InvalidOperation(format!(
797 "collection '{}' is declared as '{}' and does not allow '{}' operations",
798 collection,
799 keyed_model_name(contract.declared_model),
800 keyed_model_name(model)
801 )))
802 }
803}
804
805impl RedDBRuntime {
806 pub(crate) fn seal_vault_value(
807 &self,
808 collection: &str,
809 value: crate::storage::schema::Value,
810 ) -> RedDBResult<crate::storage::schema::Value> {
811 let key = self.vault_encryption_key(collection)?;
812 let plaintext = value.to_bytes();
813 let nonce_bytes = crate::auth::store::random_bytes(12);
814 let mut nonce = [0u8; 12];
815 nonce.copy_from_slice(&nonce_bytes[..12]);
816 let aad = format!("reddb.vault.{collection}");
817 let ciphertext =
818 crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819 let mut payload = Vec::with_capacity(12 + ciphertext.len());
820 payload.extend_from_slice(&nonce);
821 payload.extend_from_slice(&ciphertext);
822 Ok(crate::storage::schema::Value::Secret(payload))
823 }
824
825 fn vault_key_available(&self, collection: &str) -> bool {
826 self.vault_encryption_key(collection).is_ok()
827 }
828
829 fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831 RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832 })?;
833 if !auth_store.is_vault_backed() {
834 return Err(RedDBError::Query(
835 "vault sealed_unavailable: key provider is sealed".to_string(),
836 ));
837 }
838
839 if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840 return decode_vault_key(&hex_key);
841 }
842 auth_store.vault_secret_key().ok_or_else(|| {
843 RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844 })
845 }
846
847 fn unseal_vault_value(
848 &self,
849 collection: &str,
850 sealed: &crate::storage::schema::Value,
851 ) -> RedDBResult<crate::storage::schema::Value> {
852 let crate::storage::schema::Value::Secret(payload) = sealed else {
853 return Err(RedDBError::Query(
854 "vault unseal failed: stored value is not sealed".to_string(),
855 ));
856 };
857 if payload.len() < 12 {
858 return Err(RedDBError::Query(
859 "vault unseal failed: sealed payload is truncated".to_string(),
860 ));
861 }
862 let key = self.vault_encryption_key(collection)?;
863 let mut nonce = [0u8; 12];
864 nonce.copy_from_slice(&payload[..12]);
865 let aad = format!("reddb.vault.{collection}");
866 let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867 &key,
868 &nonce,
869 aad.as_bytes(),
870 &payload[12..],
871 )
872 .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873 let (value, consumed) =
874 crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875 RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876 })?;
877 if consumed != plaintext.len() {
878 return Err(RedDBError::Query(
879 "vault unseal failed: trailing plaintext bytes".to_string(),
880 ));
881 }
882 Ok(value)
883 }
884
885 fn vault_target_resource(collection: &str, key: &str) -> String {
886 if collection == "red.vault" {
887 return format!("red.vault/{}", key.to_ascii_lowercase());
888 }
889 format!("{collection}.{key}")
890 }
891
892 fn current_vault_actor() -> String {
893 current_auth_identity()
894 .map(|(principal, _)| principal)
895 .unwrap_or_else(|| "anonymous".to_string())
896 }
897
898 fn vault_request_id() -> String {
899 let conn_id = current_connection_id();
900 if conn_id == 0 {
901 "embedded".to_string()
902 } else {
903 format!("conn-{conn_id}")
904 }
905 }
906
907 fn check_vault_capability(
908 &self,
909 action: &str,
910 collection: &str,
911 key: &str,
912 ) -> Result<(), String> {
913 let Some(auth_store) = self.inner.auth_store.read().clone() else {
914 return Ok(());
915 };
916 if !auth_store.iam_authorization_enabled() {
917 return Ok(());
918 }
919 let Some((principal, role)) = current_auth_identity() else {
920 return Err(
921 "IAM authorization is enabled; vault capability check requires an authenticated principal"
922 .to_string(),
923 );
924 };
925 let tenant = current_tenant();
926 let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
927 let mut resource = crate::auth::policies::ResourceRef::new(
928 "vault",
929 Self::vault_target_resource(collection, key),
930 );
931 if let Some(ref tenant) = tenant {
932 resource = resource.with_tenant(tenant.clone());
933 }
934 let ctx = crate::auth::policies::EvalContext {
935 principal_tenant: tenant.clone(),
936 current_tenant: tenant,
937 peer_ip: None,
938 mfa_present: false,
939 now_ms: crate::utils::now_unix_millis() as u128,
940 principal_is_admin_role: role == crate::auth::Role::Admin,
941 principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
942 principal_is_platform_scoped: principal_id.tenant.is_none(),
943 };
944 if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
945 Ok(())
946 } else {
947 Err(format!(
948 "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
949 principal,
950 action,
951 Self::vault_target_resource(collection, key)
952 ))
953 }
954 }
955
956 fn check_system_vault_capability(
957 &self,
958 action: &str,
959 collection: &str,
960 key: &str,
961 ) -> Result<(), String> {
962 if collection != "red.vault" {
963 return Ok(());
964 }
965 self.check_vault_capability(action, collection, key)
966 }
967
968 fn audit_vault_unseal(
969 &self,
970 collection: &str,
971 key: &str,
972 outcome: crate::runtime::audit_log::Outcome,
973 reason: &str,
974 entry: Option<&VaultEntry>,
975 ) {
976 let actor = Self::current_vault_actor();
977 let request_id = Self::vault_request_id();
978 let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
979 .principal(actor.clone())
980 .source(crate::runtime::audit_log::AuditAuthSource::Password)
981 .resource(format!(
982 "vault:{}",
983 Self::vault_target_resource(collection, key)
984 ))
985 .outcome(outcome)
986 .correlation_id(request_id.clone())
987 .fields([
988 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
989 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
990 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
991 crate::runtime::audit_log::AuditFieldEscaper::field(
992 "target",
993 Self::vault_target_resource(collection, key),
994 ),
995 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
996 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
997 crate::runtime::audit_log::AuditFieldEscaper::field(
998 "connection_id",
999 current_connection_id(),
1000 ),
1001 ]);
1002 if let Some(tenant) = current_tenant() {
1003 builder = builder.tenant(tenant);
1004 }
1005 if let Some(entry) = entry {
1006 builder = builder.fields([
1007 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1008 crate::runtime::audit_log::AuditFieldEscaper::field(
1009 "sequence_id",
1010 entry.sequence_id,
1011 ),
1012 ]);
1013 }
1014 self.audit_log().record_event(builder.build());
1015 }
1016
1017 fn audit_vault_lifecycle(
1018 &self,
1019 operation: &str,
1020 collection: &str,
1021 key: &str,
1022 outcome: crate::runtime::audit_log::Outcome,
1023 reason: &str,
1024 entry: Option<&VaultEntry>,
1025 ) {
1026 let actor = Self::current_vault_actor();
1027 let request_id = Self::vault_request_id();
1028 let mut builder =
1029 crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1030 .principal(actor.clone())
1031 .source(crate::runtime::audit_log::AuditAuthSource::Password)
1032 .resource(format!(
1033 "vault:{}",
1034 Self::vault_target_resource(collection, key)
1035 ))
1036 .outcome(outcome)
1037 .correlation_id(request_id.clone())
1038 .fields([
1039 crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1040 crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1041 crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1042 crate::runtime::audit_log::AuditFieldEscaper::field(
1043 "target",
1044 Self::vault_target_resource(collection, key),
1045 ),
1046 crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1047 crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1048 crate::runtime::audit_log::AuditFieldEscaper::field(
1049 "connection_id",
1050 current_connection_id(),
1051 ),
1052 ]);
1053 if let Some(tenant) = current_tenant() {
1054 builder = builder.tenant(tenant);
1055 }
1056 if let Some(entry) = entry {
1057 builder = builder.fields([
1058 crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1059 crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1060 crate::runtime::audit_log::AuditFieldEscaper::field(
1061 "sequence_id",
1062 entry.sequence_id,
1063 ),
1064 ]);
1065 }
1066 self.audit_log().record_event(builder.build());
1067 }
1068
1069 fn emit_vault_control_event(
1070 &self,
1071 kind: crate::runtime::control_events::EventKind,
1072 outcome: crate::runtime::control_events::Outcome,
1073 action: &'static str,
1074 collection: &str,
1075 key: &str,
1076 reason: &str,
1077 entry: Option<&VaultEntry>,
1078 extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1079 ) -> RedDBResult<()> {
1080 use crate::runtime::control_events::{
1081 ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1082 };
1083 use std::borrow::Cow;
1084
1085 let tenant = current_tenant();
1086 let principal = current_auth_identity().map(|(principal, _)| principal);
1087 let actor_user = principal
1088 .as_ref()
1089 .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1090 let request_id = Self::vault_request_id();
1091 let actor = actor_user
1092 .as_ref()
1093 .map(ActorRef::User)
1094 .unwrap_or(ActorRef::Anonymous);
1095 let ctx = ControlEventCtx {
1096 actor,
1097 scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1098 request_id: Some(Cow::Borrowed(request_id.as_str())),
1099 trace_id: None,
1100 };
1101
1102 let target = Self::vault_target_resource(collection, key);
1103 let mut fields = std::collections::HashMap::new();
1104 fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1105 fields.insert("collection".to_string(), Sensitivity::raw(collection));
1106 fields.insert("key".to_string(), Sensitivity::raw(key));
1107 fields.insert(
1108 "connection_id".to_string(),
1109 Sensitivity::raw(current_connection_id().to_string()),
1110 );
1111 if let Some(entry) = entry {
1112 fields.insert(
1113 "entity_id".to_string(),
1114 Sensitivity::raw(entry.id.raw().to_string()),
1115 );
1116 fields.insert(
1117 "sequence_id".to_string(),
1118 Sensitivity::raw(entry.sequence_id.to_string()),
1119 );
1120 fields.insert(
1121 "version".to_string(),
1122 Sensitivity::raw(entry.version.to_string()),
1123 );
1124 fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1125 fields.insert(
1126 "tombstone".to_string(),
1127 Sensitivity::raw(entry.tombstone.to_string()),
1128 );
1129 if !entry.tombstone {
1130 fields.insert(
1131 "fingerprint".to_string(),
1132 Sensitivity::raw(vault_fingerprint(&entry.value)),
1133 );
1134 }
1135 fields.insert(
1136 "tags".to_string(),
1137 Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1138 );
1139 }
1140 for (key, value) in extra_fields {
1141 fields.insert(key, value);
1142 }
1143
1144 let event = ControlEvent {
1145 kind,
1146 outcome,
1147 action: Cow::Borrowed(action),
1148 resource: Some(format!("vault:{target}")),
1149 reason: Some(reason.to_string()),
1150 matched_policy_id: None,
1151 fields,
1152 };
1153 let ledger = self.inner.control_event_ledger.read();
1154 match ledger.emit(&ctx, event) {
1155 Ok(_) => Ok(()),
1156 Err(err) if self.inner.control_event_config.require_persistence() => {
1157 Err(RedDBError::Internal(err.to_string()))
1158 }
1159 Err(_) => Ok(()),
1160 }
1161 }
1162
1163 pub(crate) fn resolve_vault_secret_value(
1164 &self,
1165 collection: &str,
1166 key: &str,
1167 ) -> RedDBResult<Value> {
1168 let ops = KvAtomicOps::new(self);
1169 let entry = ops.get_vault_entry(collection, key)?;
1170 if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1171 self.audit_vault_unseal(
1172 collection,
1173 key,
1174 crate::runtime::audit_log::Outcome::Denied,
1175 &reason,
1176 entry.as_ref(),
1177 );
1178 self.emit_vault_control_event(
1179 crate::runtime::control_events::EventKind::VaultRead,
1180 crate::runtime::control_events::Outcome::Denied,
1181 "vault:read",
1182 collection,
1183 key,
1184 &reason,
1185 entry.as_ref(),
1186 Vec::new(),
1187 )?;
1188 return Err(RedDBError::Query(reason));
1189 }
1190 let Some(entry) = entry else {
1191 let reason = "not_found";
1192 self.audit_vault_unseal(
1193 collection,
1194 key,
1195 crate::runtime::audit_log::Outcome::Denied,
1196 reason,
1197 None,
1198 );
1199 self.emit_vault_control_event(
1200 crate::runtime::control_events::EventKind::VaultRead,
1201 crate::runtime::control_events::Outcome::Denied,
1202 "vault:read",
1203 collection,
1204 key,
1205 reason,
1206 None,
1207 Vec::new(),
1208 )?;
1209 return Err(RedDBError::NotFound(format!(
1210 "vault secret '{}.{}' not found",
1211 collection, key
1212 )));
1213 };
1214 if entry.tombstone {
1215 let reason = "deleted";
1216 self.audit_vault_unseal(
1217 collection,
1218 key,
1219 crate::runtime::audit_log::Outcome::Denied,
1220 reason,
1221 Some(&entry),
1222 );
1223 self.emit_vault_control_event(
1224 crate::runtime::control_events::EventKind::VaultRead,
1225 crate::runtime::control_events::Outcome::Denied,
1226 "vault:read",
1227 collection,
1228 key,
1229 reason,
1230 Some(&entry),
1231 Vec::new(),
1232 )?;
1233 return Err(RedDBError::NotFound(format!(
1234 "vault secret '{}.{}' is deleted",
1235 collection, key
1236 )));
1237 }
1238 match self.unseal_vault_value(collection, &entry.value) {
1239 Ok(value) => {
1240 self.audit_vault_unseal(
1241 collection,
1242 key,
1243 crate::runtime::audit_log::Outcome::Success,
1244 "ok",
1245 Some(&entry),
1246 );
1247 self.emit_vault_control_event(
1248 crate::runtime::control_events::EventKind::VaultRead,
1249 crate::runtime::control_events::Outcome::Allowed,
1250 "vault:read",
1251 collection,
1252 key,
1253 "ok",
1254 Some(&entry),
1255 Vec::new(),
1256 )?;
1257 Ok(value)
1258 }
1259 Err(err) => {
1260 let reason = err.to_string();
1261 self.audit_vault_unseal(
1262 collection,
1263 key,
1264 crate::runtime::audit_log::Outcome::Error,
1265 &reason,
1266 Some(&entry),
1267 );
1268 self.emit_vault_control_event(
1269 crate::runtime::control_events::EventKind::VaultRead,
1270 crate::runtime::control_events::Outcome::Error,
1271 "vault:read",
1272 collection,
1273 key,
1274 &reason,
1275 Some(&entry),
1276 Vec::new(),
1277 )?;
1278 Err(err)
1279 }
1280 }
1281 }
1282
1283 pub fn execute_kv_command(
1285 &self,
1286 raw_query: &str,
1287 cmd: &crate::storage::query::ast::KvCommand,
1288 ) -> RedDBResult<RuntimeQueryResult> {
1289 use crate::storage::query::ast::KvCommand;
1290
1291 let ops = KvAtomicOps::new(self);
1292
1293 match cmd {
1294 KvCommand::Put {
1295 model,
1296 collection,
1297 key,
1298 value,
1299 ttl_ms,
1300 tags,
1301 if_not_exists,
1302 } => {
1303 if *model == crate::catalog::CollectionModel::Vault {
1304 self.check_system_vault_capability("vault:write", collection, key)
1305 .map_err(RedDBError::Query)?;
1306 }
1307 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1308 let (created, id) = ops.set_with_tags_for_model(
1309 *model,
1310 collection,
1311 key,
1312 value.clone(),
1313 *ttl_ms,
1314 tags,
1315 *if_not_exists,
1316 )?;
1317
1318 let mut result = UnifiedResult::with_columns(vec![
1319 "ok".into(),
1320 "collection".into(),
1321 "key".into(),
1322 "id".into(),
1323 "created".into(),
1324 "tags".into(),
1325 ]);
1326 let mut record = UnifiedRecord::new();
1327 record.set("ok", Value::Boolean(true));
1328 record.set("collection", Value::text(collection.clone()));
1329 record.set("key", Value::text(key.clone()));
1330 record.set("id", Value::Integer(id.raw() as i64));
1331 record.set("created", Value::Boolean(created));
1332 record.set("tags", kv_tags_value(tags));
1333 result.push(record);
1334
1335 Ok(RuntimeQueryResult {
1336 query: raw_query.to_string(),
1337 mode: crate::storage::query::modes::QueryMode::Sql,
1338 statement: if *model == crate::catalog::CollectionModel::Vault {
1339 "vault_put"
1340 } else {
1341 "kv_put"
1342 },
1343 engine: if *model == crate::catalog::CollectionModel::Vault {
1344 "vault"
1345 } else {
1346 "kv"
1347 },
1348 result,
1349 affected_rows: 1,
1350 statement_type: if created { "insert" } else { "update" },
1351 })
1352 }
1353 KvCommand::InvalidateTags { collection, tags } => {
1354 let invalidated = ops.invalidate_tags(collection, tags)?;
1355
1356 let mut result = UnifiedResult::with_columns(vec![
1357 "ok".into(),
1358 "collection".into(),
1359 "invalidated".into(),
1360 "tags".into(),
1361 ]);
1362 let mut record = UnifiedRecord::new();
1363 record.set("ok", Value::Boolean(true));
1364 record.set("collection", Value::text(collection.clone()));
1365 record.set("invalidated", Value::Integer(invalidated as i64));
1366 record.set("tags", kv_tags_value(tags));
1367 result.push(record);
1368
1369 Ok(RuntimeQueryResult {
1370 query: raw_query.to_string(),
1371 mode: crate::storage::query::modes::QueryMode::Sql,
1372 statement: "kv_invalidate_tags",
1373 engine: "kv",
1374 result,
1375 affected_rows: invalidated as u64,
1376 statement_type: "delete",
1377 })
1378 }
1379
1380 KvCommand::Rotate {
1381 collection,
1382 key,
1383 value,
1384 tags,
1385 } => {
1386 self.check_system_vault_capability("vault:write", collection, key)
1387 .map_err(RedDBError::Query)?;
1388 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1389 let entry = ops.append_vault_version(
1390 collection,
1391 key,
1392 value.clone(),
1393 "rotate",
1394 false,
1395 tags,
1396 )?;
1397 self.record_kv_watch_event(
1398 crate::replication::cdc::ChangeOperation::Update,
1399 collection,
1400 key,
1401 entry.id.raw(),
1402 None,
1403 Some(vault_entry_metadata_json(&entry)),
1404 );
1405 self.audit_vault_lifecycle(
1406 "rotate",
1407 collection,
1408 key,
1409 crate::runtime::audit_log::Outcome::Success,
1410 "ok",
1411 Some(&entry),
1412 );
1413 self.emit_vault_control_event(
1414 crate::runtime::control_events::EventKind::VaultRotate,
1415 crate::runtime::control_events::Outcome::Allowed,
1416 "vault:rotate",
1417 collection,
1418 key,
1419 "ok",
1420 Some(&entry),
1421 Vec::new(),
1422 )?;
1423 Ok(vault_write_result(
1424 raw_query,
1425 "vault_rotate",
1426 "update",
1427 collection,
1428 key,
1429 &entry,
1430 1,
1431 ))
1432 }
1433
1434 KvCommand::List {
1435 model,
1436 collection,
1437 prefix,
1438 limit,
1439 offset,
1440 } => {
1441 if *model != crate::catalog::CollectionModel::Vault {
1442 return Err(RedDBError::InvalidOperation(
1443 "LIST is not supported through normal KV command execution".to_string(),
1444 ));
1445 }
1446 let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1447 entries.sort_by(|left, right| left.key.cmp(&right.key));
1448 let mut result = UnifiedResult::with_columns(vec![
1449 "collection".into(),
1450 "key".into(),
1451 "version".into(),
1452 "fingerprint".into(),
1453 "tags".into(),
1454 "created_at".into(),
1455 "updated_at".into(),
1456 "status".into(),
1457 "tombstone".into(),
1458 "op".into(),
1459 ]);
1460 let mut visible = Vec::new();
1461 for entry in entries {
1462 match self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1463 {
1464 Ok(()) => {
1465 self.emit_vault_control_event(
1466 crate::runtime::control_events::EventKind::VaultMetadataRead,
1467 crate::runtime::control_events::Outcome::Allowed,
1468 "vault:read_metadata",
1469 collection,
1470 &entry.key,
1471 "ok",
1472 Some(&entry),
1473 Vec::new(),
1474 )?;
1475 visible.push(entry);
1476 }
1477 Err(reason) => {
1478 self.emit_vault_control_event(
1479 crate::runtime::control_events::EventKind::VaultMetadataRead,
1480 crate::runtime::control_events::Outcome::Denied,
1481 "vault:read_metadata",
1482 collection,
1483 &entry.key,
1484 &reason,
1485 Some(&entry),
1486 Vec::new(),
1487 )?;
1488 }
1489 }
1490 }
1491 for entry in visible
1492 .into_iter()
1493 .skip(*offset)
1494 .take(limit.unwrap_or(usize::MAX))
1495 {
1496 push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1497 }
1498 Ok(RuntimeQueryResult {
1499 query: raw_query.to_string(),
1500 mode: crate::storage::query::modes::QueryMode::Sql,
1501 statement: "vault_list",
1502 engine: "vault",
1503 result,
1504 affected_rows: 0,
1505 statement_type: "select",
1506 })
1507 }
1508
1509 KvCommand::History { collection, key } => {
1510 let latest = ops.get_vault_entry(collection, key)?;
1511 if let Err(reason) =
1512 self.check_vault_capability("vault:read_metadata", collection, key)
1513 {
1514 self.emit_vault_control_event(
1515 crate::runtime::control_events::EventKind::VaultMetadataRead,
1516 crate::runtime::control_events::Outcome::Denied,
1517 "vault:read_metadata",
1518 collection,
1519 key,
1520 &reason,
1521 latest.as_ref(),
1522 Vec::new(),
1523 )?;
1524 return Err(RedDBError::Query(reason));
1525 }
1526 let versions =
1527 super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1528 let result = vault_history_result(collection, key, &versions);
1529 self.emit_vault_control_event(
1530 crate::runtime::control_events::EventKind::VaultMetadataRead,
1531 crate::runtime::control_events::Outcome::Allowed,
1532 "vault:read_metadata",
1533 collection,
1534 key,
1535 "ok",
1536 latest.as_ref(),
1537 Vec::new(),
1538 )?;
1539 Ok(RuntimeQueryResult {
1540 query: raw_query.to_string(),
1541 mode: crate::storage::query::modes::QueryMode::Sql,
1542 statement: "vault_history",
1543 engine: "vault",
1544 result,
1545 affected_rows: 0,
1546 statement_type: "select",
1547 })
1548 }
1549
1550 KvCommand::Purge { collection, key } => {
1551 let entry = ops.get_vault_entry(collection, key)?;
1552 if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1553 self.audit_vault_lifecycle(
1554 "purge",
1555 collection,
1556 key,
1557 crate::runtime::audit_log::Outcome::Denied,
1558 &reason,
1559 entry.as_ref(),
1560 );
1561 self.emit_vault_control_event(
1562 crate::runtime::control_events::EventKind::VaultPurge,
1563 crate::runtime::control_events::Outcome::Denied,
1564 "vault:purge",
1565 collection,
1566 key,
1567 &reason,
1568 entry.as_ref(),
1569 Vec::new(),
1570 )?;
1571 return Err(RedDBError::Query(reason));
1572 }
1573 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1574 let purged = ops.purge_vault_versions(collection, key)?;
1575 self.audit_vault_lifecycle(
1576 "purge",
1577 collection,
1578 key,
1579 crate::runtime::audit_log::Outcome::Success,
1580 "ok",
1581 entry.as_ref(),
1582 );
1583 self.emit_vault_control_event(
1584 crate::runtime::control_events::EventKind::VaultPurge,
1585 crate::runtime::control_events::Outcome::Allowed,
1586 "vault:purge",
1587 collection,
1588 key,
1589 "ok",
1590 entry.as_ref(),
1591 vec![(
1592 "purged".to_string(),
1593 crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1594 )],
1595 )?;
1596 let mut result = UnifiedResult::with_columns(vec![
1597 "ok".into(),
1598 "collection".into(),
1599 "key".into(),
1600 "purged".into(),
1601 ]);
1602 let mut record = UnifiedRecord::new();
1603 record.set("ok", Value::Boolean(true));
1604 record.set("collection", Value::text(collection.clone()));
1605 record.set("key", Value::text(key.clone()));
1606 record.set("purged", Value::Integer(purged as i64));
1607 result.push(record);
1608 Ok(RuntimeQueryResult {
1609 query: raw_query.to_string(),
1610 mode: crate::storage::query::modes::QueryMode::Sql,
1611 statement: "vault_purge",
1612 engine: "vault",
1613 result,
1614 affected_rows: purged as u64,
1615 statement_type: "delete",
1616 })
1617 }
1618
1619 KvCommand::Get {
1620 model,
1621 collection,
1622 key,
1623 } => {
1624 if *model == crate::catalog::CollectionModel::Vault {
1625 let entry = ops.get_vault_entry(collection, key)?;
1626 if let Err(reason) =
1627 self.check_vault_capability("vault:read_metadata", collection, key)
1628 {
1629 self.emit_vault_control_event(
1630 crate::runtime::control_events::EventKind::VaultMetadataRead,
1631 crate::runtime::control_events::Outcome::Denied,
1632 "vault:read_metadata",
1633 collection,
1634 key,
1635 &reason,
1636 entry.as_ref(),
1637 Vec::new(),
1638 )?;
1639 return Err(RedDBError::Query(reason));
1640 }
1641 let key_available = self.vault_key_available(collection);
1642 let result =
1643 vault_metadata_result(collection, key, entry.as_ref(), key_available);
1644 self.emit_vault_control_event(
1645 crate::runtime::control_events::EventKind::VaultMetadataRead,
1646 crate::runtime::control_events::Outcome::Allowed,
1647 "vault:read_metadata",
1648 collection,
1649 key,
1650 "ok",
1651 entry.as_ref(),
1652 Vec::new(),
1653 )?;
1654 return Ok(RuntimeQueryResult {
1655 query: raw_query.to_string(),
1656 mode: crate::storage::query::modes::QueryMode::Sql,
1657 statement: "vault_get",
1658 engine: "vault",
1659 result,
1660 affected_rows: 0,
1661 statement_type: "select",
1662 });
1663 }
1664
1665 let entity = ops.get_entity(*model, collection, key)?;
1666 let value = entity.as_ref().and_then(kv_value_from_entity);
1667 if *model == crate::catalog::CollectionModel::Kv {
1668 self.inner.kv_stats.incr_gets();
1669 }
1670 let mut result = UnifiedResult::with_columns(vec![
1671 "rid".into(),
1672 "collection".into(),
1673 "kind".into(),
1674 "tenant".into(),
1675 "created_at".into(),
1676 "updated_at".into(),
1677 "key".into(),
1678 "value".into(),
1679 "tags".into(),
1680 ]);
1681 let mut record = UnifiedRecord::new();
1682 if let Some(entity) = entity.as_ref() {
1683 record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1684 record.set("created_at", Value::UnsignedInteger(entity.created_at));
1685 record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1686 } else {
1687 record.set("rid", Value::Null);
1688 record.set("created_at", Value::Null);
1689 record.set("updated_at", Value::Null);
1690 }
1691 record.set("collection", Value::text(collection.clone()));
1692 record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1693 record.set("tenant", Value::Null);
1694 record.set("key", Value::text(key.clone()));
1695 record.set(
1696 "value",
1697 value.unwrap_or(crate::storage::schema::Value::Null),
1698 );
1699 record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1700 result.push(record);
1701
1702 Ok(RuntimeQueryResult {
1703 query: raw_query.to_string(),
1704 mode: crate::storage::query::modes::QueryMode::Sql,
1705 statement: "kv_get",
1706 engine: "kv",
1707 result,
1708 affected_rows: 0,
1709 statement_type: "select",
1710 })
1711 }
1712 KvCommand::Watch {
1713 model,
1714 collection,
1715 key,
1716 prefix,
1717 from_lsn,
1718 } => {
1719 let watch_key = if *prefix {
1720 format!("{key}.*")
1721 } else {
1722 key.clone()
1723 };
1724 let endpoint = match from_lsn {
1725 Some(lsn) => format!(
1726 "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1727 keyed_model_name(*model)
1728 ),
1729 None => format!(
1730 "/collections/{collection}/{}/{watch_key}/watch",
1731 keyed_model_name(*model)
1732 ),
1733 };
1734 let mut result = UnifiedResult::with_columns(vec![
1735 "collection".into(),
1736 "key".into(),
1737 "prefix".into(),
1738 "from_lsn".into(),
1739 "watch_url".into(),
1740 "streaming".into(),
1741 ]);
1742 let mut record = UnifiedRecord::new();
1743 record.set("collection", Value::text(collection.clone()));
1744 record.set("key", Value::text(watch_key));
1745 record.set("prefix", Value::Boolean(*prefix));
1746 record.set(
1747 "from_lsn",
1748 from_lsn
1749 .map(Value::UnsignedInteger)
1750 .unwrap_or(crate::storage::schema::Value::Null),
1751 );
1752 record.set("watch_url", Value::text(endpoint));
1753 record.set("streaming", Value::Boolean(true));
1754 result.push(record);
1755
1756 Ok(RuntimeQueryResult {
1757 query: raw_query.to_string(),
1758 mode: crate::storage::query::modes::QueryMode::Sql,
1759 statement: "kv_watch",
1760 engine: keyed_model_name(*model),
1761 result,
1762 affected_rows: 0,
1763 statement_type: "stream",
1764 })
1765 }
1766
1767 KvCommand::Unseal {
1768 collection,
1769 key,
1770 version,
1771 } => {
1772 let latest = ops.get_vault_entry(collection, key)?;
1773 let entry = match version {
1774 Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1775 None => latest.clone(),
1776 };
1777 let action = match (version, latest.as_ref()) {
1778 (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1779 (Some(_), _) => "vault:unseal_history",
1780 _ => "vault:read",
1781 };
1782 let event_kind = if action == "vault:read" {
1783 crate::runtime::control_events::EventKind::VaultRead
1784 } else {
1785 crate::runtime::control_events::EventKind::VaultUnseal
1786 };
1787 if let Err(reason) = self.check_vault_capability(action, collection, key) {
1788 self.audit_vault_unseal(
1789 collection,
1790 key,
1791 crate::runtime::audit_log::Outcome::Denied,
1792 &reason,
1793 entry.as_ref(),
1794 );
1795 self.emit_vault_control_event(
1796 event_kind,
1797 crate::runtime::control_events::Outcome::Denied,
1798 action,
1799 collection,
1800 key,
1801 &reason,
1802 entry.as_ref(),
1803 Vec::new(),
1804 )?;
1805 return Err(RedDBError::Query(reason));
1806 }
1807 let Some(entry) = entry else {
1808 let reason = "not_found";
1809 self.audit_vault_unseal(
1810 collection,
1811 key,
1812 crate::runtime::audit_log::Outcome::Denied,
1813 reason,
1814 None,
1815 );
1816 self.emit_vault_control_event(
1817 event_kind,
1818 crate::runtime::control_events::Outcome::Denied,
1819 action,
1820 collection,
1821 key,
1822 reason,
1823 None,
1824 Vec::new(),
1825 )?;
1826 return Err(RedDBError::NotFound(format!(
1827 "vault secret '{}.{}' not found",
1828 collection, key
1829 )));
1830 };
1831 if entry.tombstone {
1832 let reason = "deleted";
1833 self.audit_vault_unseal(
1834 collection,
1835 key,
1836 crate::runtime::audit_log::Outcome::Denied,
1837 reason,
1838 Some(&entry),
1839 );
1840 self.emit_vault_control_event(
1841 event_kind,
1842 crate::runtime::control_events::Outcome::Denied,
1843 action,
1844 collection,
1845 key,
1846 reason,
1847 Some(&entry),
1848 Vec::new(),
1849 )?;
1850 return Err(RedDBError::NotFound(format!(
1851 "vault secret '{}.{}' is deleted",
1852 collection, key
1853 )));
1854 }
1855 match self.unseal_vault_value(collection, &entry.value) {
1856 Ok(value) => {
1857 self.audit_vault_unseal(
1858 collection,
1859 key,
1860 crate::runtime::audit_log::Outcome::Success,
1861 "ok",
1862 Some(&entry),
1863 );
1864 self.emit_vault_control_event(
1865 event_kind,
1866 crate::runtime::control_events::Outcome::Allowed,
1867 action,
1868 collection,
1869 key,
1870 "ok",
1871 Some(&entry),
1872 Vec::new(),
1873 )?;
1874 let mut result = UnifiedResult::with_columns(vec![
1875 "collection".into(),
1876 "key".into(),
1877 "value".into(),
1878 ]);
1879 let mut record = UnifiedRecord::new();
1880 record.set("collection", Value::text(collection.clone()));
1881 record.set("key", Value::text(key.clone()));
1882 record.set("value", value);
1883 result.push(record);
1884 Ok(RuntimeQueryResult {
1885 query: raw_query.to_string(),
1886 mode: crate::storage::query::modes::QueryMode::Sql,
1887 statement: "vault_unseal",
1888 engine: "vault",
1889 result,
1890 affected_rows: 0,
1891 statement_type: "select",
1892 })
1893 }
1894 Err(err) => {
1895 let reason = err.to_string();
1896 self.audit_vault_unseal(
1897 collection,
1898 key,
1899 crate::runtime::audit_log::Outcome::Error,
1900 &reason,
1901 Some(&entry),
1902 );
1903 self.emit_vault_control_event(
1904 event_kind,
1905 crate::runtime::control_events::Outcome::Error,
1906 action,
1907 collection,
1908 key,
1909 &reason,
1910 Some(&entry),
1911 Vec::new(),
1912 )?;
1913 Err(err)
1914 }
1915 }
1916 }
1917
1918 KvCommand::Incr {
1919 model,
1920 collection,
1921 key,
1922 by,
1923 ttl_ms,
1924 } => {
1925 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1926 let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1927
1928 let mut result = UnifiedResult::with_columns(vec![
1929 "ok".into(),
1930 "collection".into(),
1931 "key".into(),
1932 "value".into(),
1933 ]);
1934 let mut record = UnifiedRecord::new();
1935 record.set("ok", Value::Boolean(true));
1936 record.set("collection", Value::text(collection.clone()));
1937 record.set("key", Value::text(key.clone()));
1938 record.set("value", Value::Integer(new_value));
1939 result.push(record);
1940
1941 Ok(RuntimeQueryResult {
1942 query: raw_query.to_string(),
1943 mode: crate::storage::query::modes::QueryMode::Sql,
1944 statement: "kv_incr",
1945 engine: "kv",
1946 result,
1947 affected_rows: 1,
1948 statement_type: "update",
1949 })
1950 }
1951
1952 KvCommand::Cas {
1953 model,
1954 collection,
1955 key,
1956 expected,
1957 new_value,
1958 ttl_ms,
1959 } => {
1960 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1961 let (ok, current) = ops.cas(
1962 *model,
1963 collection,
1964 key,
1965 expected.as_ref(),
1966 new_value.clone(),
1967 *ttl_ms,
1968 )?;
1969
1970 let mut result = UnifiedResult::with_columns(vec![
1971 "ok".into(),
1972 "collection".into(),
1973 "key".into(),
1974 "current".into(),
1975 ]);
1976 let mut record = UnifiedRecord::new();
1977 record.set("ok", Value::Boolean(ok));
1978 record.set("collection", Value::text(collection.clone()));
1979 record.set("key", Value::text(key.clone()));
1980 record.set(
1981 "current",
1982 current.unwrap_or(crate::storage::schema::Value::Null),
1983 );
1984 result.push(record);
1985
1986 Ok(RuntimeQueryResult {
1987 query: raw_query.to_string(),
1988 mode: crate::storage::query::modes::QueryMode::Sql,
1989 statement: "kv_cas",
1990 engine: "kv",
1991 result,
1992 affected_rows: if ok { 1 } else { 0 },
1993 statement_type: "update",
1994 })
1995 }
1996
1997 KvCommand::Delete {
1998 model,
1999 collection,
2000 key,
2001 } => {
2002 if *model == crate::catalog::CollectionModel::Vault {
2003 self.check_system_vault_capability("vault:write", collection, key)
2004 .map_err(RedDBError::Query)?;
2005 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2006 let entry = ops.append_vault_version(
2007 collection,
2008 key,
2009 Value::Null,
2010 "delete",
2011 true,
2012 &[],
2013 )?;
2014 self.record_kv_watch_event(
2015 crate::replication::cdc::ChangeOperation::Delete,
2016 collection,
2017 key,
2018 entry.id.raw(),
2019 None,
2020 Some(vault_entry_metadata_json(&entry)),
2021 );
2022 self.audit_vault_lifecycle(
2023 "delete",
2024 collection,
2025 key,
2026 crate::runtime::audit_log::Outcome::Success,
2027 "ok",
2028 Some(&entry),
2029 );
2030 return Ok(vault_write_result(
2031 raw_query,
2032 "vault_delete",
2033 "delete",
2034 collection,
2035 key,
2036 &entry,
2037 1,
2038 ));
2039 }
2040 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2041 let deleted = ops.delete(*model, collection, key)?;
2042
2043 let mut result = UnifiedResult::with_columns(vec![
2044 "ok".into(),
2045 "collection".into(),
2046 "key".into(),
2047 "deleted".into(),
2048 ]);
2049 let mut record = UnifiedRecord::new();
2050 record.set("ok", Value::Boolean(true));
2051 record.set("collection", Value::text(collection.clone()));
2052 record.set("key", Value::text(key.clone()));
2053 record.set("deleted", Value::Boolean(deleted));
2054 result.push(record);
2055
2056 Ok(RuntimeQueryResult {
2057 query: raw_query.to_string(),
2058 mode: crate::storage::query::modes::QueryMode::Sql,
2059 statement: "kv_delete",
2060 engine: "kv",
2061 result,
2062 affected_rows: if deleted { 1 } else { 0 },
2063 statement_type: "delete",
2064 })
2065 }
2066 }
2067 }
2068
2069 pub fn vault_watch_events_since(
2070 &self,
2071 collection: &str,
2072 key: &str,
2073 since_lsn: u64,
2074 max_count: usize,
2075 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2076 self.kv_watch_events_since(collection, key, since_lsn, max_count)
2077 .into_iter()
2078 .filter(|event| {
2079 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2080 .is_ok()
2081 })
2082 .map(vault_filter_watch_event)
2083 .collect()
2084 }
2085
2086 pub fn vault_watch_events_since_prefix(
2087 &self,
2088 collection: &str,
2089 prefix: &str,
2090 since_lsn: u64,
2091 max_count: usize,
2092 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2093 self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2094 .into_iter()
2095 .filter(|event| {
2096 self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2097 .is_ok()
2098 })
2099 .map(vault_filter_watch_event)
2100 .collect()
2101 }
2102
2103 fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2104 let auth_store = match self.inner.auth_store.read().clone() {
2105 Some(store) => store,
2106 None => return Ok(()),
2107 };
2108 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2109 Some(identity) => identity,
2110 None => return Ok(()),
2111 };
2112 if role < crate::auth::Role::Write {
2113 return Err(RedDBError::Query(format!(
2114 "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2115 )));
2116 }
2117 if !auth_store.iam_authorization_enabled() {
2118 return Ok(());
2119 }
2120
2121 let tenant = crate::runtime::impl_core::current_tenant();
2122 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2123 let mut resource =
2124 crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2125 if let Some(tenant) = tenant.clone() {
2126 resource = resource.with_tenant(tenant);
2127 }
2128 let ctx = crate::auth::policies::EvalContext {
2129 principal_tenant: tenant.clone(),
2130 current_tenant: tenant,
2131 peer_ip: None,
2132 mfa_present: false,
2133 now_ms: current_unix_ms(),
2134 principal_is_admin_role: role == crate::auth::Role::Admin,
2135 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2136 principal_is_platform_scoped: principal.tenant.is_none(),
2137 };
2138 if auth_store.check_policy_authz_with_role(
2139 &principal,
2140 "kv:invalidate",
2141 &resource,
2142 &ctx,
2143 role,
2144 ) {
2145 Ok(())
2146 } else {
2147 Err(RedDBError::Query(format!(
2148 "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2149 )))
2150 }
2151 }
2152}
2153
2154fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2155 let ttl_ms = ttl_ms?;
2156 Some(Metadata::with_fields(
2157 [(
2158 "_ttl_ms".to_string(),
2159 if ttl_ms <= i64::MAX as u64 {
2160 MetadataValue::Int(ttl_ms as i64)
2161 } else {
2162 MetadataValue::Timestamp(ttl_ms)
2163 },
2164 )]
2165 .into_iter()
2166 .collect(),
2167 ))
2168}
2169
2170fn vault_write_result(
2171 raw_query: &str,
2172 statement: &'static str,
2173 statement_type: &'static str,
2174 collection: &str,
2175 key: &str,
2176 entry: &VaultEntry,
2177 affected_rows: u64,
2178) -> RuntimeQueryResult {
2179 let mut result = UnifiedResult::with_columns(vec![
2180 "ok".into(),
2181 "collection".into(),
2182 "key".into(),
2183 "version".into(),
2184 "fingerprint".into(),
2185 "tombstone".into(),
2186 "op".into(),
2187 "id".into(),
2188 ]);
2189 let mut record = UnifiedRecord::new();
2190 record.set("ok", Value::Boolean(true));
2191 record.set("collection", Value::text(collection.to_string()));
2192 record.set("key", Value::text(key.to_string()));
2193 record.set("version", Value::Integer(entry.version));
2194 if entry.tombstone {
2195 record.set("fingerprint", Value::Null);
2196 } else {
2197 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2198 }
2199 record.set("tombstone", Value::Boolean(entry.tombstone));
2200 record.set("op", Value::text(entry.op.clone()));
2201 record.set("id", Value::Integer(entry.id.raw() as i64));
2202 result.push(record);
2203 RuntimeQueryResult {
2204 query: raw_query.to_string(),
2205 mode: crate::storage::query::modes::QueryMode::Sql,
2206 statement,
2207 engine: "vault",
2208 result,
2209 affected_rows,
2210 statement_type,
2211 }
2212}
2213
2214fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2215 let mut result = UnifiedResult::with_columns(vec![
2216 "collection".into(),
2217 "key".into(),
2218 "version".into(),
2219 "fingerprint".into(),
2220 "tags".into(),
2221 "created_at".into(),
2222 "updated_at".into(),
2223 "status".into(),
2224 "tombstone".into(),
2225 "op".into(),
2226 ]);
2227 for entry in versions {
2228 push_vault_metadata_record(&mut result, collection, key, entry);
2229 }
2230 result
2231}
2232
2233fn push_vault_metadata_record(
2234 result: &mut UnifiedResult,
2235 collection: &str,
2236 key: &str,
2237 entry: &VaultEntry,
2238) {
2239 let mut record = UnifiedRecord::new();
2240 record.set("collection", Value::text(collection.to_string()));
2241 record.set("key", Value::text(key.to_string()));
2242 record.set("version", Value::Integer(entry.version));
2243 if entry.tombstone {
2244 record.set("fingerprint", Value::Null);
2245 record.set("status", Value::text("deleted"));
2246 } else {
2247 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2248 record.set("status", Value::text("sealed"));
2249 }
2250 record.set("tags", vault_tags_value(&entry.metadata));
2251 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2252 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2253 record.set("tombstone", Value::Boolean(entry.tombstone));
2254 record.set("op", Value::text(entry.op.clone()));
2255 result.push(record);
2256}
2257
2258fn vault_metadata_result(
2259 collection: &str,
2260 key: &str,
2261 entry: Option<&VaultEntry>,
2262 key_available: bool,
2263) -> UnifiedResult {
2264 let mut result = UnifiedResult::with_columns(vec![
2265 "collection".into(),
2266 "key".into(),
2267 "version".into(),
2268 "fingerprint".into(),
2269 "tags".into(),
2270 "created_at".into(),
2271 "updated_at".into(),
2272 "value".into(),
2273 "status".into(),
2274 "tombstone".into(),
2275 "op".into(),
2276 ]);
2277 let mut record = UnifiedRecord::new();
2278 record.set("collection", Value::text(collection.to_string()));
2279 record.set("key", Value::text(key.to_string()));
2280 match entry {
2281 Some(entry) => {
2282 record.set("version", Value::Integer(entry.version));
2283 if entry.tombstone {
2284 record.set("fingerprint", Value::Null);
2285 } else {
2286 record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2287 }
2288 record.set("tags", vault_tags_value(&entry.metadata));
2289 record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2290 record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2291 record.set("value", Value::text("***"));
2292 record.set(
2293 "status",
2294 Value::text(if entry.tombstone {
2295 "deleted"
2296 } else if key_available {
2297 "sealed"
2298 } else {
2299 "sealed_unavailable"
2300 }),
2301 );
2302 record.set("tombstone", Value::Boolean(entry.tombstone));
2303 record.set("op", Value::text(entry.op.clone()));
2304 }
2305 None => {
2306 record.set("version", Value::Null);
2307 record.set("fingerprint", Value::Null);
2308 record.set("tags", Value::Array(Vec::new()));
2309 record.set("created_at", Value::Null);
2310 record.set("updated_at", Value::Null);
2311 record.set("value", Value::text(""));
2312 record.set("status", Value::text("missing"));
2313 record.set("tombstone", Value::Boolean(false));
2314 record.set("op", Value::Null);
2315 }
2316 }
2317 result.push(record);
2318 result
2319}
2320
2321fn vault_fingerprint(value: &Value) -> String {
2322 match value {
2323 Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2324 other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2325 }
2326}
2327
2328fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2329 let mut object = crate::json::Map::new();
2330 object.insert(
2331 "key".to_string(),
2332 crate::json::Value::String(entry.key.clone()),
2333 );
2334 object.insert(
2335 "version".to_string(),
2336 crate::json::Value::Number(entry.version as f64),
2337 );
2338 object.insert(
2339 "fingerprint".to_string(),
2340 if entry.tombstone {
2341 crate::json::Value::Null
2342 } else {
2343 crate::json::Value::String(vault_fingerprint(&entry.value))
2344 },
2345 );
2346 object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2347 object.insert(
2348 "actor".to_string(),
2349 crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2350 );
2351 object.insert(
2352 "sequence_id".to_string(),
2353 crate::json::Value::Number(entry.sequence_id as f64),
2354 );
2355 object.insert(
2356 "tombstone".to_string(),
2357 crate::json::Value::Bool(entry.tombstone),
2358 );
2359 object.insert(
2360 "op".to_string(),
2361 crate::json::Value::String(entry.op.clone()),
2362 );
2363 crate::json::Value::Object(object)
2364}
2365
2366fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2367 match vault_tags_value(metadata) {
2368 Value::Array(values) => crate::json::Value::Array(
2369 values
2370 .into_iter()
2371 .filter_map(|value| match value {
2372 Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2373 _ => None,
2374 })
2375 .collect(),
2376 ),
2377 _ => crate::json::Value::Array(Vec::new()),
2378 }
2379}
2380
2381fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2382 [(
2383 "tags".to_string(),
2384 MetadataValue::Array(
2385 tags.iter()
2386 .map(|tag| MetadataValue::String(tag.clone()))
2387 .collect(),
2388 ),
2389 )]
2390 .into_iter()
2391 .collect()
2392}
2393
2394fn vault_filter_watch_event(
2395 mut event: crate::replication::cdc::KvWatchEvent,
2396) -> crate::replication::cdc::KvWatchEvent {
2397 event.before = event.before.and_then(vault_metadata_json_only);
2398 event.after = event.after.and_then(vault_metadata_json_only);
2399 event
2400}
2401
2402fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2403 let object = value.as_object()?;
2404 let mut out = crate::json::Map::new();
2405 for field in [
2406 "key",
2407 "version",
2408 "fingerprint",
2409 "tags",
2410 "actor",
2411 "sequence_id",
2412 "tombstone",
2413 "op",
2414 ] {
2415 if let Some(value) = object.get(field) {
2416 out.insert(field.to_string(), value.clone());
2417 }
2418 }
2419 Some(crate::json::Value::Object(out))
2420}
2421
2422fn vault_tags_value(metadata: &Metadata) -> Value {
2423 match metadata.get("tags") {
2424 Some(MetadataValue::Array(values)) => Value::Array(
2425 values
2426 .iter()
2427 .filter_map(|value| match value {
2428 MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2429 _ => None,
2430 })
2431 .collect(),
2432 ),
2433 Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2434 Value::Array(vec![Value::text(tag.clone())])
2435 }
2436 _ => Value::Array(Vec::new()),
2437 }
2438}
2439
2440fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2441 let bytes = hex::decode(hex_key)
2442 .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2443 let key: [u8; 32] = bytes.try_into().map_err(|_| {
2444 RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2445 })?;
2446 Ok(key)
2447}
2448
2449fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2450 if tags.is_empty() {
2451 return None;
2452 }
2453 let values = tags
2454 .iter()
2455 .map(|tag| MetadataValue::String(tag.clone()))
2456 .collect();
2457 Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2458}
2459
2460fn kv_tags_value(tags: &[String]) -> Value {
2461 let json = crate::json::Value::Array(
2462 tags.iter()
2463 .map(|tag| crate::json::Value::String(tag.clone()))
2464 .collect(),
2465 );
2466 Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2467}
2468
2469fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2470 if let crate::storage::EntityData::Row(ref row) = entity.data {
2471 if let Some(ref named) = row.named {
2472 return named.get("value").cloned();
2473 }
2474 }
2475 None
2476}
2477
2478fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2479 let now = current_unix_ms();
2480 crate::physical::CollectionContract {
2481 name: name.to_string(),
2482 declared_model: crate::catalog::CollectionModel::Kv,
2483 schema_mode: crate::catalog::SchemaMode::Dynamic,
2484 origin: crate::physical::ContractOrigin::Implicit,
2485 version: 1,
2486 created_at_unix_ms: now,
2487 updated_at_unix_ms: now,
2488 default_ttl_ms: None,
2489 vector_dimension: None,
2490 vector_metric: None,
2491 context_index_fields: Vec::new(),
2492 declared_columns: Vec::new(),
2493 table_def: None,
2494 timestamps_enabled: false,
2495 context_index_enabled: false,
2496 metrics_raw_retention_ms: None,
2497 metrics_rollup_policies: Vec::new(),
2498 metrics_tenant_identity: None,
2499 metrics_namespace: None,
2500 append_only: false,
2501 subscriptions: Vec::new(),
2502 session_key: None,
2503 session_gap_ms: None,
2504 retention_duration_ms: None,
2505 }
2506}
2507
2508fn current_unix_ms() -> u128 {
2509 std::time::SystemTime::now()
2510 .duration_since(std::time::UNIX_EPOCH)
2511 .unwrap_or_default()
2512 .as_millis()
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517 use crate::api::RedDBOptions;
2518 use crate::catalog::CollectionModel;
2519 use crate::runtime::RedDBRuntime;
2520
2521 fn rt() -> RedDBRuntime {
2522 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2523 }
2524
2525 #[test]
2526 fn incr_missing_key_initialises_at_by() {
2527 let r = rt();
2528 let ops = super::KvAtomicOps::new(&r);
2529 let v = ops
2530 .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2531 .unwrap();
2532 assert_eq!(v, 5);
2533 }
2534
2535 #[test]
2536 fn kv_runtime_stats_count_public_ops() {
2537 let r = rt();
2538 let ops = super::KvAtomicOps::new(&r);
2539
2540 ops.set(
2541 CollectionModel::Kv,
2542 "kv_default",
2543 "profile",
2544 crate::storage::schema::Value::text("alice"),
2545 None,
2546 false,
2547 )
2548 .unwrap();
2549 ops.get(CollectionModel::Kv, "kv_default", "profile")
2550 .unwrap();
2551 ops.delete(CollectionModel::Kv, "kv_default", "profile")
2552 .unwrap();
2553 ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2554 .unwrap();
2555 ops.cas(
2556 CollectionModel::Kv,
2557 "kv_default",
2558 "profile",
2559 None,
2560 crate::storage::schema::Value::text("created"),
2561 None,
2562 )
2563 .unwrap();
2564 ops.cas(
2565 CollectionModel::Kv,
2566 "kv_default",
2567 "profile",
2568 Some(&crate::storage::schema::Value::text("different")),
2569 crate::storage::schema::Value::text("ignored"),
2570 None,
2571 )
2572 .unwrap();
2573
2574 let stats = r.stats().kv;
2575 assert_eq!(stats.puts, 1);
2576 assert_eq!(stats.gets, 1);
2577 assert_eq!(stats.deletes, 1);
2578 assert_eq!(stats.incrs, 1);
2579 assert_eq!(stats.cas_success, 1);
2580 assert_eq!(stats.cas_conflict, 1);
2581 }
2582
2583 #[test]
2584 fn kv_invalidate_tags_removes_matching_entries_only() {
2585 let r = rt();
2586
2587 r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2588 .unwrap();
2589
2590 let miss = r
2591 .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2592 .unwrap();
2593 assert_eq!(miss.affected_rows, 0);
2594 assert!(matches!(
2595 r.execute_query("KV GET sessions.blob")
2596 .unwrap()
2597 .result
2598 .records[0]
2599 .get("value"),
2600 Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2601 ));
2602
2603 let hit = r
2604 .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2605 .unwrap();
2606 assert_eq!(hit.affected_rows, 1);
2607 assert!(matches!(
2608 r.execute_query("KV GET sessions.blob")
2609 .unwrap()
2610 .result
2611 .records[0]
2612 .get("value"),
2613 Some(crate::storage::schema::Value::Null)
2614 ));
2615 }
2616
2617 #[test]
2618 fn kv_runtime_stats_count_watch_streams_and_events() {
2619 let r = rt();
2620 let ops = super::KvAtomicOps::new(&r);
2621 assert_eq!(r.stats().kv.watch_streams_active, 0);
2622
2623 {
2624 let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2625 assert_eq!(r.stats().kv.watch_streams_active, 1);
2626
2627 ops.set(
2628 CollectionModel::Kv,
2629 "kv_default",
2630 "watched",
2631 crate::storage::schema::Value::Integer(1),
2632 None,
2633 false,
2634 )
2635 .unwrap();
2636 let event = stream.poll_next().expect("watch event");
2637 assert_eq!(event.key, "watched");
2638 assert_eq!(r.stats().kv.watch_events_emitted, 1);
2639
2640 stream.record_drop_count(3);
2641 assert_eq!(r.stats().kv.watch_drops, 3);
2642 }
2643
2644 assert_eq!(r.stats().kv.watch_streams_active, 0);
2645 }
2646
2647 #[test]
2648 fn incr_existing_integer_accumulates() {
2649 let r = rt();
2650 let ops = super::KvAtomicOps::new(&r);
2651 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2652 .unwrap();
2653 ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2654 .unwrap();
2655 let v = ops
2656 .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2657 .unwrap();
2658 assert_eq!(v, 3);
2659 }
2660
2661 #[test]
2662 fn decr_via_negative_by() {
2663 let r = rt();
2664 let ops = super::KvAtomicOps::new(&r);
2665 ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2666 .unwrap();
2667 let v = ops
2668 .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2669 .unwrap();
2670 assert_eq!(v, 7);
2671 }
2672
2673 #[test]
2674 fn concurrent_incr_single_key_is_atomic() {
2675 const THREADS: usize = 8;
2676 const ITERS: usize = 1000;
2677
2678 let runtime = std::sync::Arc::new(rt());
2679 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2680 let mut handles = Vec::new();
2681
2682 for _ in 0..THREADS {
2683 let runtime = std::sync::Arc::clone(&runtime);
2684 let barrier = std::sync::Arc::clone(&barrier);
2685 handles.push(std::thread::spawn(move || {
2686 let ops = super::KvAtomicOps::new(&runtime);
2687 barrier.wait();
2688 for _ in 0..ITERS {
2689 ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2690 .unwrap();
2691 }
2692 }));
2693 }
2694
2695 for handle in handles {
2696 handle.join().expect("worker should finish");
2697 }
2698
2699 let ops = super::KvAtomicOps::new(&runtime);
2700 assert_eq!(
2701 ops.get(CollectionModel::Kv, "kv_default", "counter")
2702 .unwrap(),
2703 Some(crate::storage::schema::Value::Integer(
2704 (THREADS * ITERS) as i64
2705 ))
2706 );
2707 }
2708
2709 #[test]
2710 fn incr_on_string_value_returns_error() {
2711 let r = rt();
2712 let ops = super::KvAtomicOps::new(&r);
2713 ops.set(
2714 CollectionModel::Kv,
2715 "kv_default",
2716 "name",
2717 crate::storage::schema::Value::text("alice"),
2718 None,
2719 false,
2720 )
2721 .unwrap();
2722 let err = ops
2723 .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2724 .unwrap_err();
2725 assert!(err.to_string().contains("non-integer"));
2726 }
2727
2728 #[test]
2731 fn cas_matching_value_succeeds() {
2732 let r = rt();
2733 let ops = super::KvAtomicOps::new(&r);
2734 ops.set(
2735 CollectionModel::Kv,
2736 "kv_default",
2737 "lock",
2738 crate::storage::schema::Value::text("free"),
2739 None,
2740 false,
2741 )
2742 .unwrap();
2743 let (ok, prev) = ops
2744 .cas(
2745 CollectionModel::Kv,
2746 "kv_default",
2747 "lock",
2748 Some(&crate::storage::schema::Value::text("free")),
2749 crate::storage::schema::Value::text("held"),
2750 None,
2751 )
2752 .unwrap();
2753 assert!(ok);
2754 assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2755 assert_eq!(
2757 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2758 Some(crate::storage::schema::Value::text("held"))
2759 );
2760 }
2761
2762 #[test]
2763 fn concurrent_cas_allows_one_success_per_round() {
2764 const THREADS: usize = 8;
2765 const ROUNDS: usize = 100;
2766
2767 let runtime = std::sync::Arc::new(rt());
2768 let ops = super::KvAtomicOps::new(&runtime);
2769 ops.set(
2770 CollectionModel::Kv,
2771 "kv_default",
2772 "cas_counter",
2773 crate::storage::schema::Value::Integer(0),
2774 None,
2775 false,
2776 )
2777 .unwrap();
2778
2779 let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2780 let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2781 let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2782 let mut handles = Vec::new();
2783
2784 for _ in 0..THREADS {
2785 let runtime = std::sync::Arc::clone(&runtime);
2786 let start_round = std::sync::Arc::clone(&start_round);
2787 let finish_round = std::sync::Arc::clone(&finish_round);
2788 let successes = std::sync::Arc::clone(&successes);
2789 handles.push(std::thread::spawn(move || {
2790 let ops = super::KvAtomicOps::new(&runtime);
2791 for round in 0..ROUNDS {
2792 start_round.wait();
2793 let (ok, _) = ops
2794 .cas(
2795 CollectionModel::Kv,
2796 "kv_default",
2797 "cas_counter",
2798 Some(&crate::storage::schema::Value::Integer(round as i64)),
2799 crate::storage::schema::Value::Integer((round + 1) as i64),
2800 None,
2801 )
2802 .unwrap();
2803 if ok {
2804 successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2805 }
2806 finish_round.wait();
2807 }
2808 }));
2809 }
2810
2811 for handle in handles {
2812 handle.join().expect("worker should finish");
2813 }
2814
2815 assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2816 assert_eq!(
2817 ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2818 .unwrap(),
2819 Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2820 );
2821 }
2822
2823 #[test]
2824 fn cas_mismatching_value_fails() {
2825 let r = rt();
2826 let ops = super::KvAtomicOps::new(&r);
2827 ops.set(
2828 CollectionModel::Kv,
2829 "kv_default",
2830 "lock",
2831 crate::storage::schema::Value::text("free"),
2832 None,
2833 false,
2834 )
2835 .unwrap();
2836 let (ok, current) = ops
2837 .cas(
2838 CollectionModel::Kv,
2839 "kv_default",
2840 "lock",
2841 Some(&crate::storage::schema::Value::text("held")),
2842 crate::storage::schema::Value::text("worker-7"),
2843 None,
2844 )
2845 .unwrap();
2846 assert!(!ok);
2847 assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2848 assert_eq!(
2850 ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2851 Some(crate::storage::schema::Value::text("free"))
2852 );
2853 }
2854
2855 #[test]
2856 fn cas_expect_null_on_missing_key_creates() {
2857 let r = rt();
2858 let ops = super::KvAtomicOps::new(&r);
2859 let (ok, prev) = ops
2860 .cas(
2861 CollectionModel::Kv,
2862 "kv_default",
2863 "new_key",
2864 None,
2865 crate::storage::schema::Value::text("created"),
2866 None,
2867 )
2868 .unwrap();
2869 assert!(ok);
2870 assert_eq!(prev, None);
2871 assert_eq!(
2872 ops.get(CollectionModel::Kv, "kv_default", "new_key")
2873 .unwrap(),
2874 Some(crate::storage::schema::Value::text("created"))
2875 );
2876 }
2877
2878 #[test]
2879 fn cas_expect_null_on_existing_key_fails() {
2880 let r = rt();
2881 let ops = super::KvAtomicOps::new(&r);
2882 ops.set(
2883 CollectionModel::Kv,
2884 "kv_default",
2885 "taken",
2886 crate::storage::schema::Value::text("worker-1"),
2887 None,
2888 false,
2889 )
2890 .unwrap();
2891 let (ok, current) = ops
2892 .cas(
2893 CollectionModel::Kv,
2894 "kv_default",
2895 "taken",
2896 None,
2897 crate::storage::schema::Value::text("worker-2"),
2898 None,
2899 )
2900 .unwrap();
2901 assert!(!ok);
2902 assert_eq!(
2903 current,
2904 Some(crate::storage::schema::Value::text("worker-1"))
2905 );
2906 }
2907
2908 #[test]
2909 fn cas_via_sql_roundtrip() {
2910 let r = rt();
2911 r.execute_query("KV PUT lock = 'free'").unwrap();
2913 let res = r
2915 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2916 .unwrap();
2917 let row = &res.result.records[0];
2918 assert_eq!(
2919 row.get("ok"),
2920 Some(&crate::storage::schema::Value::Boolean(true))
2921 );
2922 let res2 = r
2924 .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2925 .unwrap();
2926 let row2 = &res2.result.records[0];
2927 assert_eq!(
2928 row2.get("ok"),
2929 Some(&crate::storage::schema::Value::Boolean(false))
2930 );
2931 }
2932
2933 #[test]
2934 fn cas_expect_null_via_sql() {
2935 let r = rt();
2936 let res = r
2937 .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2938 .unwrap();
2939 let row = &res.result.records[0];
2940 assert_eq!(
2941 row.get("ok"),
2942 Some(&crate::storage::schema::Value::Boolean(true))
2943 );
2944 let res2 = r
2946 .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2947 .unwrap();
2948 let row2 = &res2.result.records[0];
2949 assert_eq!(
2950 row2.get("ok"),
2951 Some(&crate::storage::schema::Value::Boolean(false))
2952 );
2953 }
2954
2955 #[test]
2956 fn incr_via_sql_roundtrip() {
2957 let r = rt();
2958 let res = r.execute_query("KV INCR hits").unwrap();
2959 let row = &res.result.records[0];
2960 assert_eq!(
2961 row.get("value"),
2962 Some(&crate::storage::schema::Value::Integer(1))
2963 );
2964 let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2965 let row2 = &res2.result.records[0];
2966 assert_eq!(
2967 row2.get("value"),
2968 Some(&crate::storage::schema::Value::Integer(5))
2969 );
2970 }
2971
2972 #[test]
2973 fn concurrent_self_referential_update_is_atomic() {
2974 const THREADS: usize = 8;
2975 const ITERS: usize = 100;
2976
2977 let runtime = std::sync::Arc::new(rt());
2978 runtime
2979 .execute_query("CREATE TABLE counters (id INT, n INT)")
2980 .unwrap();
2981 runtime
2982 .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
2983 .unwrap();
2984
2985 let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2986 let mut handles = Vec::new();
2987 for _ in 0..THREADS {
2988 let runtime = std::sync::Arc::clone(&runtime);
2989 let barrier = std::sync::Arc::clone(&barrier);
2990 handles.push(std::thread::spawn(move || {
2991 barrier.wait();
2992 for _ in 0..ITERS {
2993 runtime
2994 .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
2995 .unwrap();
2996 }
2997 }));
2998 }
2999
3000 for handle in handles {
3001 handle.join().expect("worker should finish");
3002 }
3003
3004 let selected = runtime
3005 .execute_query("SELECT n FROM counters WHERE id = 1")
3006 .unwrap();
3007 assert_eq!(
3008 selected.result.records[0].get("n"),
3009 Some(&crate::storage::schema::Value::Integer(
3010 (THREADS * ITERS) as i64
3011 ))
3012 );
3013 }
3014
3015 #[test]
3016 fn watch_stream_delivers_key_events_in_lsn_order() {
3017 let r = rt();
3018 let ops = super::KvAtomicOps::new(&r);
3019 let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3020
3021 ops.set(
3022 CollectionModel::Kv,
3023 "kv_default",
3024 "seq",
3025 crate::storage::schema::Value::Integer(1),
3026 None,
3027 false,
3028 )
3029 .unwrap();
3030 ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3031 .unwrap();
3032 ops.delete(CollectionModel::Kv, "kv_default", "seq")
3033 .unwrap();
3034 ops.set(
3035 CollectionModel::Kv,
3036 "kv_default",
3037 "seq",
3038 crate::storage::schema::Value::Integer(9),
3039 None,
3040 false,
3041 )
3042 .unwrap();
3043
3044 let mut events = Vec::new();
3045 while let Some(event) = stream.poll_next() {
3046 events.push(event);
3047 if events.len() == 4 {
3048 break;
3049 }
3050 }
3051
3052 assert_eq!(events.len(), 4);
3053 assert_eq!(
3054 events[0].op,
3055 crate::replication::cdc::ChangeOperation::Insert
3056 );
3057 assert_eq!(
3058 events[1].op,
3059 crate::replication::cdc::ChangeOperation::Update
3060 );
3061 assert_eq!(
3062 events[2].op,
3063 crate::replication::cdc::ChangeOperation::Delete
3064 );
3065 assert_eq!(
3066 events[3].op,
3067 crate::replication::cdc::ChangeOperation::Insert
3068 );
3069 assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3070 }
3071
3072 #[test]
3073 fn watch_prefix_stream_delivers_matching_events_only() {
3074 let r = rt();
3075 let ops = super::KvAtomicOps::new(&r);
3076 let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3077
3078 ops.set(
3079 CollectionModel::Kv,
3080 "kv_default",
3081 "acct:1",
3082 crate::storage::schema::Value::Integer(1),
3083 None,
3084 false,
3085 )
3086 .unwrap();
3087 ops.set(
3088 CollectionModel::Kv,
3089 "kv_default",
3090 "session:1",
3091 crate::storage::schema::Value::Integer(2),
3092 None,
3093 false,
3094 )
3095 .unwrap();
3096 ops.set(
3097 CollectionModel::Kv,
3098 "kv_default",
3099 "acct:2",
3100 crate::storage::schema::Value::Integer(3),
3101 None,
3102 false,
3103 )
3104 .unwrap();
3105
3106 let first = stream.poll_next().expect("first prefix event");
3107 let second = stream.poll_next().expect("second prefix event");
3108 assert_eq!(first.key, "acct:1");
3109 assert_eq!(second.key, "acct:2");
3110 assert!(stream.poll_next().is_none());
3111 }
3112
3113 #[test]
3114 fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3115 let r = rt();
3116 let ops = super::KvAtomicOps::new(&r);
3117 let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3118
3119 let mut last_seen_lsn = 0;
3120 for value in 0..5 {
3121 ops.set(
3122 CollectionModel::Kv,
3123 "kv_default",
3124 "resume",
3125 crate::storage::schema::Value::Integer(value),
3126 None,
3127 false,
3128 )
3129 .unwrap();
3130 last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3131 }
3132 drop(stream);
3133
3134 for value in 5..55 {
3135 ops.set(
3136 CollectionModel::Kv,
3137 "kv_default",
3138 "resume",
3139 crate::storage::schema::Value::Integer(value),
3140 None,
3141 false,
3142 )
3143 .unwrap();
3144 }
3145
3146 let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3147 let mut lsns = Vec::new();
3148 while let Some(event) = resumed.poll_next() {
3149 lsns.push(event.lsn);
3150 if lsns.len() == 50 {
3151 break;
3152 }
3153 }
3154
3155 assert_eq!(lsns.len(), 50);
3156 assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3157 assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3158 assert!(resumed.poll_next().is_none());
3159 }
3160
3161 #[test]
3162 fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3163 let r = rt();
3164 let ops = super::KvAtomicOps::new(&r);
3165 let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3166
3167 for value in 0..10_000 {
3168 ops.set(
3169 CollectionModel::Kv,
3170 "kv_default",
3171 "slow",
3172 crate::storage::schema::Value::Integer(value),
3173 None,
3174 false,
3175 )
3176 .unwrap();
3177 }
3178
3179 let event = stream.poll_next().expect("tail event after drops");
3180 assert!(event.lsn > 1);
3181 assert!(event.dropped_event_count > 0);
3182 assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3183 assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3184 }
3185
3186 #[test]
3187 fn watch_stream_idle_timeout_closes_subscription() {
3188 let r = rt();
3189 r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3190 .unwrap();
3191
3192 let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3193 assert_eq!(r.stats().kv.watch_streams_active, 1);
3194 std::thread::sleep(std::time::Duration::from_millis(5));
3195
3196 assert!(stream.poll_next().is_none());
3197 assert_eq!(r.stats().kv.watch_streams_active, 0);
3198 }
3199
3200 #[test]
3201 fn watch_stream_does_not_emit_rolled_back_put() {
3202 let r = rt();
3203 let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3204
3205 r.execute_query("BEGIN").unwrap();
3206 r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3207 r.execute_query("ROLLBACK").unwrap();
3208
3209 assert!(stream.poll_next().is_none());
3210 }
3211}