Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604        self.vault_versions(collection, key)
605            .map(super::keyed_spine::latest_version)
606    }
607
608    fn get_vault_entry_version(
609        &self,
610        collection: &str,
611        key: &str,
612        version: i64,
613    ) -> RedDBResult<Option<VaultEntry>> {
614        Ok(self
615            .vault_versions(collection, key)?
616            .into_iter()
617            .find(|entry| entry.version == version))
618    }
619
620    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622        let store = self.runtime.inner.db.store();
623        let Some(manager) = store.get_collection(collection) else {
624            return Ok(Vec::new());
625        };
626        let entities = manager.query_all(|_| true);
627        let mut versions = Vec::new();
628        for entity in entities {
629            let crate::storage::EntityData::Row(ref row) = entity.data else {
630                continue;
631            };
632            let Some(version) =
633                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634            else {
635                continue;
636            };
637            if version.key != key {
638                continue;
639            }
640            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641            versions.push(VaultEntry::from_keyed_row(
642                version,
643                metadata,
644                entity.created_at,
645                entity.updated_at,
646                entity.sequence_id,
647            ));
648        }
649        Ok(versions)
650    }
651
652    fn latest_vault_entries(
653        &self,
654        collection: &str,
655        prefix: Option<&str>,
656    ) -> RedDBResult<Vec<VaultEntry>> {
657        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658        let store = self.runtime.inner.db.store();
659        let Some(manager) = store.get_collection(collection) else {
660            return Ok(Vec::new());
661        };
662        let mut versions = Vec::new();
663        for entity in manager.query_all(|_| true) {
664            let crate::storage::EntityData::Row(ref row) = entity.data else {
665                continue;
666            };
667            let Some(version) =
668                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669            else {
670                continue;
671            };
672            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673            let entry = VaultEntry::from_keyed_row(
674                version,
675                metadata,
676                entity.created_at,
677                entity.updated_at,
678                entity.sequence_id,
679            );
680            versions.push(entry);
681        }
682        Ok(super::keyed_spine::latest_versions(versions, prefix))
683    }
684
685    fn append_vault_version(
686        &self,
687        collection: &str,
688        key: &str,
689        value: crate::storage::schema::Value,
690        op: &str,
691        tombstone: bool,
692        tags: &[String],
693    ) -> RedDBResult<VaultEntry> {
694        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695        let version = self
696            .get_vault_entry(collection, key)?
697            .map(|entry| entry.version)
698            .unwrap_or(0)
699            + 1;
700        let stored_value = if tombstone {
701            crate::storage::schema::Value::Null
702        } else {
703            self.runtime.seal_vault_value(collection, value)?
704        };
705        let now = current_unix_ms() as i64;
706        let fields = vec![
707            (
708                "key".to_string(),
709                crate::storage::schema::Value::text(key.to_string()),
710            ),
711            ("value".to_string(), stored_value),
712            (
713                "version".to_string(),
714                crate::storage::schema::Value::Integer(version),
715            ),
716            (
717                "tombstone".to_string(),
718                crate::storage::schema::Value::Boolean(tombstone),
719            ),
720            (
721                "op".to_string(),
722                crate::storage::schema::Value::text(op.to_string()),
723            ),
724            (
725                "created_at_ms".to_string(),
726                crate::storage::schema::Value::Integer(now),
727            ),
728        ];
729        let mut row = crate::storage::RowData::new(Vec::new());
730        row.named = Some(fields.into_iter().collect());
731        let entity = crate::storage::UnifiedEntity::new(
732            crate::storage::EntityId::new(0),
733            crate::storage::EntityKind::TableRow {
734                table: std::sync::Arc::from(collection),
735                row_id: 0,
736            },
737            crate::storage::EntityData::Row(row),
738        );
739        let id = self
740            .runtime
741            .inner
742            .db
743            .store()
744            .insert(collection, entity)
745            .map_err(|err| RedDBError::Internal(err.to_string()))?;
746        if !tags.is_empty() {
747            self.runtime
748                .inner
749                .db
750                .store()
751                .set_metadata(
752                    collection,
753                    id,
754                    Metadata::with_fields(vault_tags_metadata(tags)),
755                )
756                .map_err(|err| RedDBError::Internal(err.to_string()))?;
757            self.runtime
758                .inner
759                .kv_tag_index
760                .replace(collection, key, id, tags);
761        }
762        self.get_vault_entry_version(collection, key, version)?
763            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764    }
765
766    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768        let versions = self.vault_versions(collection, key)?;
769        let store = self.runtime.inner.db.store();
770        let mut purged = 0usize;
771        for entry in versions {
772            if store
773                .delete(collection, entry.id)
774                .map_err(|err| RedDBError::Internal(err.to_string()))?
775            {
776                store.context_index().remove_entity(entry.id);
777                purged += 1;
778            }
779        }
780        Ok(purged)
781    }
782
783    fn ensure_declared_model(
784        &self,
785        model: crate::catalog::CollectionModel,
786        collection: &str,
787    ) -> RedDBResult<()> {
788        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789            return Ok(());
790        };
791        if contract.declared_model == model
792            || contract.declared_model == crate::catalog::CollectionModel::Mixed
793        {
794            return Ok(());
795        }
796        Err(RedDBError::InvalidOperation(format!(
797            "collection '{}' is declared as '{}' and does not allow '{}' operations",
798            collection,
799            keyed_model_name(contract.declared_model),
800            keyed_model_name(model)
801        )))
802    }
803}
804
805impl RedDBRuntime {
806    pub(crate) fn seal_vault_value(
807        &self,
808        collection: &str,
809        value: crate::storage::schema::Value,
810    ) -> RedDBResult<crate::storage::schema::Value> {
811        let key = self.vault_encryption_key(collection)?;
812        let plaintext = value.to_bytes();
813        let nonce_bytes = crate::auth::store::random_bytes(12);
814        let mut nonce = [0u8; 12];
815        nonce.copy_from_slice(&nonce_bytes[..12]);
816        let aad = format!("reddb.vault.{collection}");
817        let ciphertext =
818            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819        let mut payload = Vec::with_capacity(12 + ciphertext.len());
820        payload.extend_from_slice(&nonce);
821        payload.extend_from_slice(&ciphertext);
822        Ok(crate::storage::schema::Value::Secret(payload))
823    }
824
825    fn vault_key_available(&self, collection: &str) -> bool {
826        self.vault_encryption_key(collection).is_ok()
827    }
828
829    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832        })?;
833        if !auth_store.is_vault_backed() {
834            return Err(RedDBError::Query(
835                "vault sealed_unavailable: key provider is sealed".to_string(),
836            ));
837        }
838
839        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840            return decode_vault_key(&hex_key);
841        }
842        auth_store.vault_secret_key().ok_or_else(|| {
843            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844        })
845    }
846
847    fn unseal_vault_value(
848        &self,
849        collection: &str,
850        sealed: &crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let crate::storage::schema::Value::Secret(payload) = sealed else {
853            return Err(RedDBError::Query(
854                "vault unseal failed: stored value is not sealed".to_string(),
855            ));
856        };
857        if payload.len() < 12 {
858            return Err(RedDBError::Query(
859                "vault unseal failed: sealed payload is truncated".to_string(),
860            ));
861        }
862        let key = self.vault_encryption_key(collection)?;
863        let mut nonce = [0u8; 12];
864        nonce.copy_from_slice(&payload[..12]);
865        let aad = format!("reddb.vault.{collection}");
866        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867            &key,
868            &nonce,
869            aad.as_bytes(),
870            &payload[12..],
871        )
872        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873        let (value, consumed) =
874            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876            })?;
877        if consumed != plaintext.len() {
878            return Err(RedDBError::Query(
879                "vault unseal failed: trailing plaintext bytes".to_string(),
880            ));
881        }
882        Ok(value)
883    }
884
885    fn vault_target_resource(collection: &str, key: &str) -> String {
886        if collection == "red.vault" {
887            return format!("red.vault/{}", key.to_ascii_lowercase());
888        }
889        format!("{collection}.{key}")
890    }
891
892    fn current_vault_actor() -> String {
893        current_auth_identity()
894            .map(|(principal, _)| principal)
895            .unwrap_or_else(|| "anonymous".to_string())
896    }
897
898    fn vault_request_id() -> String {
899        let conn_id = current_connection_id();
900        if conn_id == 0 {
901            "embedded".to_string()
902        } else {
903            format!("conn-{conn_id}")
904        }
905    }
906
907    fn check_vault_capability(
908        &self,
909        action: &str,
910        collection: &str,
911        key: &str,
912    ) -> Result<(), String> {
913        let Some(auth_store) = self.inner.auth_store.read().clone() else {
914            return Ok(());
915        };
916        if !auth_store.iam_authorization_enabled() {
917            return Ok(());
918        }
919        let Some((principal, role)) = current_auth_identity() else {
920            return Err(
921                "IAM authorization is enabled; vault capability check requires an authenticated principal"
922                    .to_string(),
923            );
924        };
925        let tenant = current_tenant();
926        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
927        let mut resource = crate::auth::policies::ResourceRef::new(
928            "vault",
929            Self::vault_target_resource(collection, key),
930        );
931        if let Some(ref tenant) = tenant {
932            resource = resource.with_tenant(tenant.clone());
933        }
934        let ctx = crate::auth::policies::EvalContext {
935            principal_tenant: tenant.clone(),
936            current_tenant: tenant,
937            peer_ip: None,
938            mfa_present: false,
939            now_ms: crate::utils::now_unix_millis() as u128,
940            principal_is_admin_role: role == crate::auth::Role::Admin,
941            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
942            principal_is_platform_scoped: principal_id.tenant.is_none(),
943        };
944        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
945            Ok(())
946        } else {
947            Err(format!(
948                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
949                principal,
950                action,
951                Self::vault_target_resource(collection, key)
952            ))
953        }
954    }
955
956    fn check_system_vault_capability(
957        &self,
958        action: &str,
959        collection: &str,
960        key: &str,
961    ) -> Result<(), String> {
962        if collection != "red.vault" {
963            return Ok(());
964        }
965        self.check_vault_capability(action, collection, key)
966    }
967
968    fn audit_vault_unseal(
969        &self,
970        collection: &str,
971        key: &str,
972        outcome: crate::runtime::audit_log::Outcome,
973        reason: &str,
974        entry: Option<&VaultEntry>,
975    ) {
976        let actor = Self::current_vault_actor();
977        let request_id = Self::vault_request_id();
978        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
979            .principal(actor.clone())
980            .source(crate::runtime::audit_log::AuditAuthSource::Password)
981            .resource(format!(
982                "vault:{}",
983                Self::vault_target_resource(collection, key)
984            ))
985            .outcome(outcome)
986            .correlation_id(request_id.clone())
987            .fields([
988                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
989                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
990                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
991                crate::runtime::audit_log::AuditFieldEscaper::field(
992                    "target",
993                    Self::vault_target_resource(collection, key),
994                ),
995                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
996                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
997                crate::runtime::audit_log::AuditFieldEscaper::field(
998                    "connection_id",
999                    current_connection_id(),
1000                ),
1001            ]);
1002        if let Some(tenant) = current_tenant() {
1003            builder = builder.tenant(tenant);
1004        }
1005        if let Some(entry) = entry {
1006            builder = builder.fields([
1007                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1008                crate::runtime::audit_log::AuditFieldEscaper::field(
1009                    "sequence_id",
1010                    entry.sequence_id,
1011                ),
1012            ]);
1013        }
1014        self.audit_log().record_event(builder.build());
1015    }
1016
1017    fn audit_vault_lifecycle(
1018        &self,
1019        operation: &str,
1020        collection: &str,
1021        key: &str,
1022        outcome: crate::runtime::audit_log::Outcome,
1023        reason: &str,
1024        entry: Option<&VaultEntry>,
1025    ) {
1026        let actor = Self::current_vault_actor();
1027        let request_id = Self::vault_request_id();
1028        let mut builder =
1029            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1030                .principal(actor.clone())
1031                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1032                .resource(format!(
1033                    "vault:{}",
1034                    Self::vault_target_resource(collection, key)
1035                ))
1036                .outcome(outcome)
1037                .correlation_id(request_id.clone())
1038                .fields([
1039                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1040                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1041                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1042                    crate::runtime::audit_log::AuditFieldEscaper::field(
1043                        "target",
1044                        Self::vault_target_resource(collection, key),
1045                    ),
1046                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1047                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1048                    crate::runtime::audit_log::AuditFieldEscaper::field(
1049                        "connection_id",
1050                        current_connection_id(),
1051                    ),
1052                ]);
1053        if let Some(tenant) = current_tenant() {
1054            builder = builder.tenant(tenant);
1055        }
1056        if let Some(entry) = entry {
1057            builder = builder.fields([
1058                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1059                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1060                crate::runtime::audit_log::AuditFieldEscaper::field(
1061                    "sequence_id",
1062                    entry.sequence_id,
1063                ),
1064            ]);
1065        }
1066        self.audit_log().record_event(builder.build());
1067    }
1068
1069    fn emit_vault_control_event(
1070        &self,
1071        kind: crate::runtime::control_events::EventKind,
1072        outcome: crate::runtime::control_events::Outcome,
1073        action: &'static str,
1074        collection: &str,
1075        key: &str,
1076        reason: &str,
1077        entry: Option<&VaultEntry>,
1078        extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1079    ) -> RedDBResult<()> {
1080        use crate::runtime::control_events::{
1081            ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1082        };
1083        use std::borrow::Cow;
1084
1085        let tenant = current_tenant();
1086        let principal = current_auth_identity().map(|(principal, _)| principal);
1087        let actor_user = principal
1088            .as_ref()
1089            .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1090        let request_id = Self::vault_request_id();
1091        let actor = actor_user
1092            .as_ref()
1093            .map(ActorRef::User)
1094            .unwrap_or(ActorRef::Anonymous);
1095        let ctx = ControlEventCtx {
1096            actor,
1097            scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1098            request_id: Some(Cow::Borrowed(request_id.as_str())),
1099            trace_id: None,
1100        };
1101
1102        let target = Self::vault_target_resource(collection, key);
1103        let mut fields = std::collections::HashMap::new();
1104        fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1105        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1106        fields.insert("key".to_string(), Sensitivity::raw(key));
1107        fields.insert(
1108            "connection_id".to_string(),
1109            Sensitivity::raw(current_connection_id().to_string()),
1110        );
1111        if let Some(entry) = entry {
1112            fields.insert(
1113                "entity_id".to_string(),
1114                Sensitivity::raw(entry.id.raw().to_string()),
1115            );
1116            fields.insert(
1117                "sequence_id".to_string(),
1118                Sensitivity::raw(entry.sequence_id.to_string()),
1119            );
1120            fields.insert(
1121                "version".to_string(),
1122                Sensitivity::raw(entry.version.to_string()),
1123            );
1124            fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1125            fields.insert(
1126                "tombstone".to_string(),
1127                Sensitivity::raw(entry.tombstone.to_string()),
1128            );
1129            if !entry.tombstone {
1130                fields.insert(
1131                    "fingerprint".to_string(),
1132                    Sensitivity::raw(vault_fingerprint(&entry.value)),
1133                );
1134            }
1135            fields.insert(
1136                "tags".to_string(),
1137                Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1138            );
1139        }
1140        for (key, value) in extra_fields {
1141            fields.insert(key, value);
1142        }
1143
1144        let event = ControlEvent {
1145            kind,
1146            outcome,
1147            action: Cow::Borrowed(action),
1148            resource: Some(format!("vault:{target}")),
1149            reason: Some(reason.to_string()),
1150            matched_policy_id: None,
1151            fields,
1152        };
1153        let ledger = self.inner.control_event_ledger.read();
1154        match ledger.emit(&ctx, event) {
1155            Ok(_) => Ok(()),
1156            Err(err) if self.inner.control_event_config.require_persistence() => {
1157                Err(RedDBError::Internal(err.to_string()))
1158            }
1159            Err(_) => Ok(()),
1160        }
1161    }
1162
1163    pub(crate) fn resolve_vault_secret_value(
1164        &self,
1165        collection: &str,
1166        key: &str,
1167    ) -> RedDBResult<Value> {
1168        let ops = KvAtomicOps::new(self);
1169        let entry = ops.get_vault_entry(collection, key)?;
1170        if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1171            self.audit_vault_unseal(
1172                collection,
1173                key,
1174                crate::runtime::audit_log::Outcome::Denied,
1175                &reason,
1176                entry.as_ref(),
1177            );
1178            self.emit_vault_control_event(
1179                crate::runtime::control_events::EventKind::VaultRead,
1180                crate::runtime::control_events::Outcome::Denied,
1181                "vault:read",
1182                collection,
1183                key,
1184                &reason,
1185                entry.as_ref(),
1186                Vec::new(),
1187            )?;
1188            return Err(RedDBError::Query(reason));
1189        }
1190        let Some(entry) = entry else {
1191            let reason = "not_found";
1192            self.audit_vault_unseal(
1193                collection,
1194                key,
1195                crate::runtime::audit_log::Outcome::Denied,
1196                reason,
1197                None,
1198            );
1199            self.emit_vault_control_event(
1200                crate::runtime::control_events::EventKind::VaultRead,
1201                crate::runtime::control_events::Outcome::Denied,
1202                "vault:read",
1203                collection,
1204                key,
1205                reason,
1206                None,
1207                Vec::new(),
1208            )?;
1209            return Err(RedDBError::NotFound(format!(
1210                "vault secret '{}.{}' not found",
1211                collection, key
1212            )));
1213        };
1214        if entry.tombstone {
1215            let reason = "deleted";
1216            self.audit_vault_unseal(
1217                collection,
1218                key,
1219                crate::runtime::audit_log::Outcome::Denied,
1220                reason,
1221                Some(&entry),
1222            );
1223            self.emit_vault_control_event(
1224                crate::runtime::control_events::EventKind::VaultRead,
1225                crate::runtime::control_events::Outcome::Denied,
1226                "vault:read",
1227                collection,
1228                key,
1229                reason,
1230                Some(&entry),
1231                Vec::new(),
1232            )?;
1233            return Err(RedDBError::NotFound(format!(
1234                "vault secret '{}.{}' is deleted",
1235                collection, key
1236            )));
1237        }
1238        match self.unseal_vault_value(collection, &entry.value) {
1239            Ok(value) => {
1240                self.audit_vault_unseal(
1241                    collection,
1242                    key,
1243                    crate::runtime::audit_log::Outcome::Success,
1244                    "ok",
1245                    Some(&entry),
1246                );
1247                self.emit_vault_control_event(
1248                    crate::runtime::control_events::EventKind::VaultRead,
1249                    crate::runtime::control_events::Outcome::Allowed,
1250                    "vault:read",
1251                    collection,
1252                    key,
1253                    "ok",
1254                    Some(&entry),
1255                    Vec::new(),
1256                )?;
1257                Ok(value)
1258            }
1259            Err(err) => {
1260                let reason = err.to_string();
1261                self.audit_vault_unseal(
1262                    collection,
1263                    key,
1264                    crate::runtime::audit_log::Outcome::Error,
1265                    &reason,
1266                    Some(&entry),
1267                );
1268                self.emit_vault_control_event(
1269                    crate::runtime::control_events::EventKind::VaultRead,
1270                    crate::runtime::control_events::Outcome::Error,
1271                    "vault:read",
1272                    collection,
1273                    key,
1274                    &reason,
1275                    Some(&entry),
1276                    Vec::new(),
1277                )?;
1278                Err(err)
1279            }
1280        }
1281    }
1282
1283    /// Dispatch a `KV PUT / GET / DELETE` command.
1284    pub fn execute_kv_command(
1285        &self,
1286        raw_query: &str,
1287        cmd: &crate::storage::query::ast::KvCommand,
1288    ) -> RedDBResult<RuntimeQueryResult> {
1289        use crate::storage::query::ast::KvCommand;
1290
1291        let ops = KvAtomicOps::new(self);
1292
1293        match cmd {
1294            KvCommand::Put {
1295                model,
1296                collection,
1297                key,
1298                value,
1299                ttl_ms,
1300                tags,
1301                if_not_exists,
1302            } => {
1303                if *model == crate::catalog::CollectionModel::Vault {
1304                    self.check_system_vault_capability("vault:write", collection, key)
1305                        .map_err(RedDBError::Query)?;
1306                }
1307                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1308                let (created, id) = ops.set_with_tags_for_model(
1309                    *model,
1310                    collection,
1311                    key,
1312                    value.clone(),
1313                    *ttl_ms,
1314                    tags,
1315                    *if_not_exists,
1316                )?;
1317
1318                let mut result = UnifiedResult::with_columns(vec![
1319                    "ok".into(),
1320                    "collection".into(),
1321                    "key".into(),
1322                    "id".into(),
1323                    "created".into(),
1324                    "tags".into(),
1325                ]);
1326                let mut record = UnifiedRecord::new();
1327                record.set("ok", Value::Boolean(true));
1328                record.set("collection", Value::text(collection.clone()));
1329                record.set("key", Value::text(key.clone()));
1330                record.set("id", Value::Integer(id.raw() as i64));
1331                record.set("created", Value::Boolean(created));
1332                record.set("tags", kv_tags_value(tags));
1333                result.push(record);
1334
1335                Ok(RuntimeQueryResult {
1336                    query: raw_query.to_string(),
1337                    mode: crate::storage::query::modes::QueryMode::Sql,
1338                    statement: if *model == crate::catalog::CollectionModel::Vault {
1339                        "vault_put"
1340                    } else {
1341                        "kv_put"
1342                    },
1343                    engine: if *model == crate::catalog::CollectionModel::Vault {
1344                        "vault"
1345                    } else {
1346                        "kv"
1347                    },
1348                    result,
1349                    affected_rows: 1,
1350                    statement_type: if created { "insert" } else { "update" },
1351                })
1352            }
1353            KvCommand::InvalidateTags { collection, tags } => {
1354                let invalidated = ops.invalidate_tags(collection, tags)?;
1355
1356                let mut result = UnifiedResult::with_columns(vec![
1357                    "ok".into(),
1358                    "collection".into(),
1359                    "invalidated".into(),
1360                    "tags".into(),
1361                ]);
1362                let mut record = UnifiedRecord::new();
1363                record.set("ok", Value::Boolean(true));
1364                record.set("collection", Value::text(collection.clone()));
1365                record.set("invalidated", Value::Integer(invalidated as i64));
1366                record.set("tags", kv_tags_value(tags));
1367                result.push(record);
1368
1369                Ok(RuntimeQueryResult {
1370                    query: raw_query.to_string(),
1371                    mode: crate::storage::query::modes::QueryMode::Sql,
1372                    statement: "kv_invalidate_tags",
1373                    engine: "kv",
1374                    result,
1375                    affected_rows: invalidated as u64,
1376                    statement_type: "delete",
1377                })
1378            }
1379
1380            KvCommand::Rotate {
1381                collection,
1382                key,
1383                value,
1384                tags,
1385            } => {
1386                self.check_system_vault_capability("vault:write", collection, key)
1387                    .map_err(RedDBError::Query)?;
1388                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1389                let entry = ops.append_vault_version(
1390                    collection,
1391                    key,
1392                    value.clone(),
1393                    "rotate",
1394                    false,
1395                    tags,
1396                )?;
1397                self.record_kv_watch_event(
1398                    crate::replication::cdc::ChangeOperation::Update,
1399                    collection,
1400                    key,
1401                    entry.id.raw(),
1402                    None,
1403                    Some(vault_entry_metadata_json(&entry)),
1404                );
1405                self.audit_vault_lifecycle(
1406                    "rotate",
1407                    collection,
1408                    key,
1409                    crate::runtime::audit_log::Outcome::Success,
1410                    "ok",
1411                    Some(&entry),
1412                );
1413                self.emit_vault_control_event(
1414                    crate::runtime::control_events::EventKind::VaultRotate,
1415                    crate::runtime::control_events::Outcome::Allowed,
1416                    "vault:rotate",
1417                    collection,
1418                    key,
1419                    "ok",
1420                    Some(&entry),
1421                    Vec::new(),
1422                )?;
1423                Ok(vault_write_result(
1424                    raw_query,
1425                    "vault_rotate",
1426                    "update",
1427                    collection,
1428                    key,
1429                    &entry,
1430                    1,
1431                ))
1432            }
1433
1434            KvCommand::List {
1435                model,
1436                collection,
1437                prefix,
1438                limit,
1439                offset,
1440            } => {
1441                if *model != crate::catalog::CollectionModel::Vault {
1442                    return Err(RedDBError::InvalidOperation(
1443                        "LIST is not supported through normal KV command execution".to_string(),
1444                    ));
1445                }
1446                let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1447                entries.sort_by(|left, right| left.key.cmp(&right.key));
1448                let mut result = UnifiedResult::with_columns(vec![
1449                    "collection".into(),
1450                    "key".into(),
1451                    "version".into(),
1452                    "fingerprint".into(),
1453                    "tags".into(),
1454                    "created_at".into(),
1455                    "updated_at".into(),
1456                    "status".into(),
1457                    "tombstone".into(),
1458                    "op".into(),
1459                ]);
1460                let mut visible = Vec::new();
1461                for entry in entries {
1462                    match self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1463                    {
1464                        Ok(()) => {
1465                            self.emit_vault_control_event(
1466                                crate::runtime::control_events::EventKind::VaultMetadataRead,
1467                                crate::runtime::control_events::Outcome::Allowed,
1468                                "vault:read_metadata",
1469                                collection,
1470                                &entry.key,
1471                                "ok",
1472                                Some(&entry),
1473                                Vec::new(),
1474                            )?;
1475                            visible.push(entry);
1476                        }
1477                        Err(reason) => {
1478                            self.emit_vault_control_event(
1479                                crate::runtime::control_events::EventKind::VaultMetadataRead,
1480                                crate::runtime::control_events::Outcome::Denied,
1481                                "vault:read_metadata",
1482                                collection,
1483                                &entry.key,
1484                                &reason,
1485                                Some(&entry),
1486                                Vec::new(),
1487                            )?;
1488                        }
1489                    }
1490                }
1491                for entry in visible
1492                    .into_iter()
1493                    .skip(*offset)
1494                    .take(limit.unwrap_or(usize::MAX))
1495                {
1496                    push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1497                }
1498                Ok(RuntimeQueryResult {
1499                    query: raw_query.to_string(),
1500                    mode: crate::storage::query::modes::QueryMode::Sql,
1501                    statement: "vault_list",
1502                    engine: "vault",
1503                    result,
1504                    affected_rows: 0,
1505                    statement_type: "select",
1506                })
1507            }
1508
1509            KvCommand::History { collection, key } => {
1510                let latest = ops.get_vault_entry(collection, key)?;
1511                if let Err(reason) =
1512                    self.check_vault_capability("vault:read_metadata", collection, key)
1513                {
1514                    self.emit_vault_control_event(
1515                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1516                        crate::runtime::control_events::Outcome::Denied,
1517                        "vault:read_metadata",
1518                        collection,
1519                        key,
1520                        &reason,
1521                        latest.as_ref(),
1522                        Vec::new(),
1523                    )?;
1524                    return Err(RedDBError::Query(reason));
1525                }
1526                let versions =
1527                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1528                let result = vault_history_result(collection, key, &versions);
1529                self.emit_vault_control_event(
1530                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1531                    crate::runtime::control_events::Outcome::Allowed,
1532                    "vault:read_metadata",
1533                    collection,
1534                    key,
1535                    "ok",
1536                    latest.as_ref(),
1537                    Vec::new(),
1538                )?;
1539                Ok(RuntimeQueryResult {
1540                    query: raw_query.to_string(),
1541                    mode: crate::storage::query::modes::QueryMode::Sql,
1542                    statement: "vault_history",
1543                    engine: "vault",
1544                    result,
1545                    affected_rows: 0,
1546                    statement_type: "select",
1547                })
1548            }
1549
1550            KvCommand::Purge { collection, key } => {
1551                let entry = ops.get_vault_entry(collection, key)?;
1552                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1553                    self.audit_vault_lifecycle(
1554                        "purge",
1555                        collection,
1556                        key,
1557                        crate::runtime::audit_log::Outcome::Denied,
1558                        &reason,
1559                        entry.as_ref(),
1560                    );
1561                    self.emit_vault_control_event(
1562                        crate::runtime::control_events::EventKind::VaultPurge,
1563                        crate::runtime::control_events::Outcome::Denied,
1564                        "vault:purge",
1565                        collection,
1566                        key,
1567                        &reason,
1568                        entry.as_ref(),
1569                        Vec::new(),
1570                    )?;
1571                    return Err(RedDBError::Query(reason));
1572                }
1573                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1574                let purged = ops.purge_vault_versions(collection, key)?;
1575                self.audit_vault_lifecycle(
1576                    "purge",
1577                    collection,
1578                    key,
1579                    crate::runtime::audit_log::Outcome::Success,
1580                    "ok",
1581                    entry.as_ref(),
1582                );
1583                self.emit_vault_control_event(
1584                    crate::runtime::control_events::EventKind::VaultPurge,
1585                    crate::runtime::control_events::Outcome::Allowed,
1586                    "vault:purge",
1587                    collection,
1588                    key,
1589                    "ok",
1590                    entry.as_ref(),
1591                    vec![(
1592                        "purged".to_string(),
1593                        crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1594                    )],
1595                )?;
1596                let mut result = UnifiedResult::with_columns(vec![
1597                    "ok".into(),
1598                    "collection".into(),
1599                    "key".into(),
1600                    "purged".into(),
1601                ]);
1602                let mut record = UnifiedRecord::new();
1603                record.set("ok", Value::Boolean(true));
1604                record.set("collection", Value::text(collection.clone()));
1605                record.set("key", Value::text(key.clone()));
1606                record.set("purged", Value::Integer(purged as i64));
1607                result.push(record);
1608                Ok(RuntimeQueryResult {
1609                    query: raw_query.to_string(),
1610                    mode: crate::storage::query::modes::QueryMode::Sql,
1611                    statement: "vault_purge",
1612                    engine: "vault",
1613                    result,
1614                    affected_rows: purged as u64,
1615                    statement_type: "delete",
1616                })
1617            }
1618
1619            KvCommand::Get {
1620                model,
1621                collection,
1622                key,
1623            } => {
1624                if *model == crate::catalog::CollectionModel::Vault {
1625                    let entry = ops.get_vault_entry(collection, key)?;
1626                    if let Err(reason) =
1627                        self.check_vault_capability("vault:read_metadata", collection, key)
1628                    {
1629                        self.emit_vault_control_event(
1630                            crate::runtime::control_events::EventKind::VaultMetadataRead,
1631                            crate::runtime::control_events::Outcome::Denied,
1632                            "vault:read_metadata",
1633                            collection,
1634                            key,
1635                            &reason,
1636                            entry.as_ref(),
1637                            Vec::new(),
1638                        )?;
1639                        return Err(RedDBError::Query(reason));
1640                    }
1641                    let key_available = self.vault_key_available(collection);
1642                    let result =
1643                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1644                    self.emit_vault_control_event(
1645                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1646                        crate::runtime::control_events::Outcome::Allowed,
1647                        "vault:read_metadata",
1648                        collection,
1649                        key,
1650                        "ok",
1651                        entry.as_ref(),
1652                        Vec::new(),
1653                    )?;
1654                    return Ok(RuntimeQueryResult {
1655                        query: raw_query.to_string(),
1656                        mode: crate::storage::query::modes::QueryMode::Sql,
1657                        statement: "vault_get",
1658                        engine: "vault",
1659                        result,
1660                        affected_rows: 0,
1661                        statement_type: "select",
1662                    });
1663                }
1664
1665                let entity = ops.get_entity(*model, collection, key)?;
1666                let value = entity.as_ref().and_then(kv_value_from_entity);
1667                if *model == crate::catalog::CollectionModel::Kv {
1668                    self.inner.kv_stats.incr_gets();
1669                }
1670                let mut result = UnifiedResult::with_columns(vec![
1671                    "rid".into(),
1672                    "collection".into(),
1673                    "kind".into(),
1674                    "tenant".into(),
1675                    "created_at".into(),
1676                    "updated_at".into(),
1677                    "key".into(),
1678                    "value".into(),
1679                    "tags".into(),
1680                ]);
1681                let mut record = UnifiedRecord::new();
1682                if let Some(entity) = entity.as_ref() {
1683                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1684                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1685                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1686                } else {
1687                    record.set("rid", Value::Null);
1688                    record.set("created_at", Value::Null);
1689                    record.set("updated_at", Value::Null);
1690                }
1691                record.set("collection", Value::text(collection.clone()));
1692                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1693                record.set("tenant", Value::Null);
1694                record.set("key", Value::text(key.clone()));
1695                record.set(
1696                    "value",
1697                    value.unwrap_or(crate::storage::schema::Value::Null),
1698                );
1699                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1700                result.push(record);
1701
1702                Ok(RuntimeQueryResult {
1703                    query: raw_query.to_string(),
1704                    mode: crate::storage::query::modes::QueryMode::Sql,
1705                    statement: "kv_get",
1706                    engine: "kv",
1707                    result,
1708                    affected_rows: 0,
1709                    statement_type: "select",
1710                })
1711            }
1712            KvCommand::Watch {
1713                model,
1714                collection,
1715                key,
1716                prefix,
1717                from_lsn,
1718            } => {
1719                let watch_key = if *prefix {
1720                    format!("{key}.*")
1721                } else {
1722                    key.clone()
1723                };
1724                let endpoint = match from_lsn {
1725                    Some(lsn) => format!(
1726                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1727                        keyed_model_name(*model)
1728                    ),
1729                    None => format!(
1730                        "/collections/{collection}/{}/{watch_key}/watch",
1731                        keyed_model_name(*model)
1732                    ),
1733                };
1734                let mut result = UnifiedResult::with_columns(vec![
1735                    "collection".into(),
1736                    "key".into(),
1737                    "prefix".into(),
1738                    "from_lsn".into(),
1739                    "watch_url".into(),
1740                    "streaming".into(),
1741                ]);
1742                let mut record = UnifiedRecord::new();
1743                record.set("collection", Value::text(collection.clone()));
1744                record.set("key", Value::text(watch_key));
1745                record.set("prefix", Value::Boolean(*prefix));
1746                record.set(
1747                    "from_lsn",
1748                    from_lsn
1749                        .map(Value::UnsignedInteger)
1750                        .unwrap_or(crate::storage::schema::Value::Null),
1751                );
1752                record.set("watch_url", Value::text(endpoint));
1753                record.set("streaming", Value::Boolean(true));
1754                result.push(record);
1755
1756                Ok(RuntimeQueryResult {
1757                    query: raw_query.to_string(),
1758                    mode: crate::storage::query::modes::QueryMode::Sql,
1759                    statement: "kv_watch",
1760                    engine: keyed_model_name(*model),
1761                    result,
1762                    affected_rows: 0,
1763                    statement_type: "stream",
1764                })
1765            }
1766
1767            KvCommand::Unseal {
1768                collection,
1769                key,
1770                version,
1771            } => {
1772                let latest = ops.get_vault_entry(collection, key)?;
1773                let entry = match version {
1774                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1775                    None => latest.clone(),
1776                };
1777                let action = match (version, latest.as_ref()) {
1778                    (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1779                    (Some(_), _) => "vault:unseal_history",
1780                    _ => "vault:read",
1781                };
1782                let event_kind = if action == "vault:read" {
1783                    crate::runtime::control_events::EventKind::VaultRead
1784                } else {
1785                    crate::runtime::control_events::EventKind::VaultUnseal
1786                };
1787                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1788                    self.audit_vault_unseal(
1789                        collection,
1790                        key,
1791                        crate::runtime::audit_log::Outcome::Denied,
1792                        &reason,
1793                        entry.as_ref(),
1794                    );
1795                    self.emit_vault_control_event(
1796                        event_kind,
1797                        crate::runtime::control_events::Outcome::Denied,
1798                        action,
1799                        collection,
1800                        key,
1801                        &reason,
1802                        entry.as_ref(),
1803                        Vec::new(),
1804                    )?;
1805                    return Err(RedDBError::Query(reason));
1806                }
1807                let Some(entry) = entry else {
1808                    let reason = "not_found";
1809                    self.audit_vault_unseal(
1810                        collection,
1811                        key,
1812                        crate::runtime::audit_log::Outcome::Denied,
1813                        reason,
1814                        None,
1815                    );
1816                    self.emit_vault_control_event(
1817                        event_kind,
1818                        crate::runtime::control_events::Outcome::Denied,
1819                        action,
1820                        collection,
1821                        key,
1822                        reason,
1823                        None,
1824                        Vec::new(),
1825                    )?;
1826                    return Err(RedDBError::NotFound(format!(
1827                        "vault secret '{}.{}' not found",
1828                        collection, key
1829                    )));
1830                };
1831                if entry.tombstone {
1832                    let reason = "deleted";
1833                    self.audit_vault_unseal(
1834                        collection,
1835                        key,
1836                        crate::runtime::audit_log::Outcome::Denied,
1837                        reason,
1838                        Some(&entry),
1839                    );
1840                    self.emit_vault_control_event(
1841                        event_kind,
1842                        crate::runtime::control_events::Outcome::Denied,
1843                        action,
1844                        collection,
1845                        key,
1846                        reason,
1847                        Some(&entry),
1848                        Vec::new(),
1849                    )?;
1850                    return Err(RedDBError::NotFound(format!(
1851                        "vault secret '{}.{}' is deleted",
1852                        collection, key
1853                    )));
1854                }
1855                match self.unseal_vault_value(collection, &entry.value) {
1856                    Ok(value) => {
1857                        self.audit_vault_unseal(
1858                            collection,
1859                            key,
1860                            crate::runtime::audit_log::Outcome::Success,
1861                            "ok",
1862                            Some(&entry),
1863                        );
1864                        self.emit_vault_control_event(
1865                            event_kind,
1866                            crate::runtime::control_events::Outcome::Allowed,
1867                            action,
1868                            collection,
1869                            key,
1870                            "ok",
1871                            Some(&entry),
1872                            Vec::new(),
1873                        )?;
1874                        let mut result = UnifiedResult::with_columns(vec![
1875                            "collection".into(),
1876                            "key".into(),
1877                            "value".into(),
1878                        ]);
1879                        let mut record = UnifiedRecord::new();
1880                        record.set("collection", Value::text(collection.clone()));
1881                        record.set("key", Value::text(key.clone()));
1882                        record.set("value", value);
1883                        result.push(record);
1884                        Ok(RuntimeQueryResult {
1885                            query: raw_query.to_string(),
1886                            mode: crate::storage::query::modes::QueryMode::Sql,
1887                            statement: "vault_unseal",
1888                            engine: "vault",
1889                            result,
1890                            affected_rows: 0,
1891                            statement_type: "select",
1892                        })
1893                    }
1894                    Err(err) => {
1895                        let reason = err.to_string();
1896                        self.audit_vault_unseal(
1897                            collection,
1898                            key,
1899                            crate::runtime::audit_log::Outcome::Error,
1900                            &reason,
1901                            Some(&entry),
1902                        );
1903                        self.emit_vault_control_event(
1904                            event_kind,
1905                            crate::runtime::control_events::Outcome::Error,
1906                            action,
1907                            collection,
1908                            key,
1909                            &reason,
1910                            Some(&entry),
1911                            Vec::new(),
1912                        )?;
1913                        Err(err)
1914                    }
1915                }
1916            }
1917
1918            KvCommand::Incr {
1919                model,
1920                collection,
1921                key,
1922                by,
1923                ttl_ms,
1924            } => {
1925                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1926                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1927
1928                let mut result = UnifiedResult::with_columns(vec![
1929                    "ok".into(),
1930                    "collection".into(),
1931                    "key".into(),
1932                    "value".into(),
1933                ]);
1934                let mut record = UnifiedRecord::new();
1935                record.set("ok", Value::Boolean(true));
1936                record.set("collection", Value::text(collection.clone()));
1937                record.set("key", Value::text(key.clone()));
1938                record.set("value", Value::Integer(new_value));
1939                result.push(record);
1940
1941                Ok(RuntimeQueryResult {
1942                    query: raw_query.to_string(),
1943                    mode: crate::storage::query::modes::QueryMode::Sql,
1944                    statement: "kv_incr",
1945                    engine: "kv",
1946                    result,
1947                    affected_rows: 1,
1948                    statement_type: "update",
1949                })
1950            }
1951
1952            KvCommand::Cas {
1953                model,
1954                collection,
1955                key,
1956                expected,
1957                new_value,
1958                ttl_ms,
1959            } => {
1960                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1961                let (ok, current) = ops.cas(
1962                    *model,
1963                    collection,
1964                    key,
1965                    expected.as_ref(),
1966                    new_value.clone(),
1967                    *ttl_ms,
1968                )?;
1969
1970                let mut result = UnifiedResult::with_columns(vec![
1971                    "ok".into(),
1972                    "collection".into(),
1973                    "key".into(),
1974                    "current".into(),
1975                ]);
1976                let mut record = UnifiedRecord::new();
1977                record.set("ok", Value::Boolean(ok));
1978                record.set("collection", Value::text(collection.clone()));
1979                record.set("key", Value::text(key.clone()));
1980                record.set(
1981                    "current",
1982                    current.unwrap_or(crate::storage::schema::Value::Null),
1983                );
1984                result.push(record);
1985
1986                Ok(RuntimeQueryResult {
1987                    query: raw_query.to_string(),
1988                    mode: crate::storage::query::modes::QueryMode::Sql,
1989                    statement: "kv_cas",
1990                    engine: "kv",
1991                    result,
1992                    affected_rows: if ok { 1 } else { 0 },
1993                    statement_type: "update",
1994                })
1995            }
1996
1997            KvCommand::Delete {
1998                model,
1999                collection,
2000                key,
2001            } => {
2002                if *model == crate::catalog::CollectionModel::Vault {
2003                    self.check_system_vault_capability("vault:write", collection, key)
2004                        .map_err(RedDBError::Query)?;
2005                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2006                    let entry = ops.append_vault_version(
2007                        collection,
2008                        key,
2009                        Value::Null,
2010                        "delete",
2011                        true,
2012                        &[],
2013                    )?;
2014                    self.record_kv_watch_event(
2015                        crate::replication::cdc::ChangeOperation::Delete,
2016                        collection,
2017                        key,
2018                        entry.id.raw(),
2019                        None,
2020                        Some(vault_entry_metadata_json(&entry)),
2021                    );
2022                    self.audit_vault_lifecycle(
2023                        "delete",
2024                        collection,
2025                        key,
2026                        crate::runtime::audit_log::Outcome::Success,
2027                        "ok",
2028                        Some(&entry),
2029                    );
2030                    return Ok(vault_write_result(
2031                        raw_query,
2032                        "vault_delete",
2033                        "delete",
2034                        collection,
2035                        key,
2036                        &entry,
2037                        1,
2038                    ));
2039                }
2040                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2041                let deleted = ops.delete(*model, collection, key)?;
2042
2043                let mut result = UnifiedResult::with_columns(vec![
2044                    "ok".into(),
2045                    "collection".into(),
2046                    "key".into(),
2047                    "deleted".into(),
2048                ]);
2049                let mut record = UnifiedRecord::new();
2050                record.set("ok", Value::Boolean(true));
2051                record.set("collection", Value::text(collection.clone()));
2052                record.set("key", Value::text(key.clone()));
2053                record.set("deleted", Value::Boolean(deleted));
2054                result.push(record);
2055
2056                Ok(RuntimeQueryResult {
2057                    query: raw_query.to_string(),
2058                    mode: crate::storage::query::modes::QueryMode::Sql,
2059                    statement: "kv_delete",
2060                    engine: "kv",
2061                    result,
2062                    affected_rows: if deleted { 1 } else { 0 },
2063                    statement_type: "delete",
2064                })
2065            }
2066        }
2067    }
2068
2069    pub fn vault_watch_events_since(
2070        &self,
2071        collection: &str,
2072        key: &str,
2073        since_lsn: u64,
2074        max_count: usize,
2075    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2076        self.kv_watch_events_since(collection, key, since_lsn, max_count)
2077            .into_iter()
2078            .filter(|event| {
2079                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2080                    .is_ok()
2081            })
2082            .map(vault_filter_watch_event)
2083            .collect()
2084    }
2085
2086    pub fn vault_watch_events_since_prefix(
2087        &self,
2088        collection: &str,
2089        prefix: &str,
2090        since_lsn: u64,
2091        max_count: usize,
2092    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2093        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2094            .into_iter()
2095            .filter(|event| {
2096                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2097                    .is_ok()
2098            })
2099            .map(vault_filter_watch_event)
2100            .collect()
2101    }
2102
2103    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2104        let auth_store = match self.inner.auth_store.read().clone() {
2105            Some(store) => store,
2106            None => return Ok(()),
2107        };
2108        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2109            Some(identity) => identity,
2110            None => return Ok(()),
2111        };
2112        if role < crate::auth::Role::Write {
2113            return Err(RedDBError::Query(format!(
2114                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2115            )));
2116        }
2117        if !auth_store.iam_authorization_enabled() {
2118            return Ok(());
2119        }
2120
2121        let tenant = crate::runtime::impl_core::current_tenant();
2122        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2123        let mut resource =
2124            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2125        if let Some(tenant) = tenant.clone() {
2126            resource = resource.with_tenant(tenant);
2127        }
2128        let ctx = crate::auth::policies::EvalContext {
2129            principal_tenant: tenant.clone(),
2130            current_tenant: tenant,
2131            peer_ip: None,
2132            mfa_present: false,
2133            now_ms: current_unix_ms(),
2134            principal_is_admin_role: role == crate::auth::Role::Admin,
2135            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2136            principal_is_platform_scoped: principal.tenant.is_none(),
2137        };
2138        if auth_store.check_policy_authz_with_role(
2139            &principal,
2140            "kv:invalidate",
2141            &resource,
2142            &ctx,
2143            role,
2144        ) {
2145            Ok(())
2146        } else {
2147            Err(RedDBError::Query(format!(
2148                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2149            )))
2150        }
2151    }
2152}
2153
2154fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2155    let ttl_ms = ttl_ms?;
2156    Some(Metadata::with_fields(
2157        [(
2158            "_ttl_ms".to_string(),
2159            if ttl_ms <= i64::MAX as u64 {
2160                MetadataValue::Int(ttl_ms as i64)
2161            } else {
2162                MetadataValue::Timestamp(ttl_ms)
2163            },
2164        )]
2165        .into_iter()
2166        .collect(),
2167    ))
2168}
2169
2170fn vault_write_result(
2171    raw_query: &str,
2172    statement: &'static str,
2173    statement_type: &'static str,
2174    collection: &str,
2175    key: &str,
2176    entry: &VaultEntry,
2177    affected_rows: u64,
2178) -> RuntimeQueryResult {
2179    let mut result = UnifiedResult::with_columns(vec![
2180        "ok".into(),
2181        "collection".into(),
2182        "key".into(),
2183        "version".into(),
2184        "fingerprint".into(),
2185        "tombstone".into(),
2186        "op".into(),
2187        "id".into(),
2188    ]);
2189    let mut record = UnifiedRecord::new();
2190    record.set("ok", Value::Boolean(true));
2191    record.set("collection", Value::text(collection.to_string()));
2192    record.set("key", Value::text(key.to_string()));
2193    record.set("version", Value::Integer(entry.version));
2194    if entry.tombstone {
2195        record.set("fingerprint", Value::Null);
2196    } else {
2197        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2198    }
2199    record.set("tombstone", Value::Boolean(entry.tombstone));
2200    record.set("op", Value::text(entry.op.clone()));
2201    record.set("id", Value::Integer(entry.id.raw() as i64));
2202    result.push(record);
2203    RuntimeQueryResult {
2204        query: raw_query.to_string(),
2205        mode: crate::storage::query::modes::QueryMode::Sql,
2206        statement,
2207        engine: "vault",
2208        result,
2209        affected_rows,
2210        statement_type,
2211    }
2212}
2213
2214fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2215    let mut result = UnifiedResult::with_columns(vec![
2216        "collection".into(),
2217        "key".into(),
2218        "version".into(),
2219        "fingerprint".into(),
2220        "tags".into(),
2221        "created_at".into(),
2222        "updated_at".into(),
2223        "status".into(),
2224        "tombstone".into(),
2225        "op".into(),
2226    ]);
2227    for entry in versions {
2228        push_vault_metadata_record(&mut result, collection, key, entry);
2229    }
2230    result
2231}
2232
2233fn push_vault_metadata_record(
2234    result: &mut UnifiedResult,
2235    collection: &str,
2236    key: &str,
2237    entry: &VaultEntry,
2238) {
2239    let mut record = UnifiedRecord::new();
2240    record.set("collection", Value::text(collection.to_string()));
2241    record.set("key", Value::text(key.to_string()));
2242    record.set("version", Value::Integer(entry.version));
2243    if entry.tombstone {
2244        record.set("fingerprint", Value::Null);
2245        record.set("status", Value::text("deleted"));
2246    } else {
2247        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2248        record.set("status", Value::text("sealed"));
2249    }
2250    record.set("tags", vault_tags_value(&entry.metadata));
2251    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2252    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2253    record.set("tombstone", Value::Boolean(entry.tombstone));
2254    record.set("op", Value::text(entry.op.clone()));
2255    result.push(record);
2256}
2257
2258fn vault_metadata_result(
2259    collection: &str,
2260    key: &str,
2261    entry: Option<&VaultEntry>,
2262    key_available: bool,
2263) -> UnifiedResult {
2264    let mut result = UnifiedResult::with_columns(vec![
2265        "collection".into(),
2266        "key".into(),
2267        "version".into(),
2268        "fingerprint".into(),
2269        "tags".into(),
2270        "created_at".into(),
2271        "updated_at".into(),
2272        "value".into(),
2273        "status".into(),
2274        "tombstone".into(),
2275        "op".into(),
2276    ]);
2277    let mut record = UnifiedRecord::new();
2278    record.set("collection", Value::text(collection.to_string()));
2279    record.set("key", Value::text(key.to_string()));
2280    match entry {
2281        Some(entry) => {
2282            record.set("version", Value::Integer(entry.version));
2283            if entry.tombstone {
2284                record.set("fingerprint", Value::Null);
2285            } else {
2286                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2287            }
2288            record.set("tags", vault_tags_value(&entry.metadata));
2289            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2290            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2291            record.set("value", Value::text("***"));
2292            record.set(
2293                "status",
2294                Value::text(if entry.tombstone {
2295                    "deleted"
2296                } else if key_available {
2297                    "sealed"
2298                } else {
2299                    "sealed_unavailable"
2300                }),
2301            );
2302            record.set("tombstone", Value::Boolean(entry.tombstone));
2303            record.set("op", Value::text(entry.op.clone()));
2304        }
2305        None => {
2306            record.set("version", Value::Null);
2307            record.set("fingerprint", Value::Null);
2308            record.set("tags", Value::Array(Vec::new()));
2309            record.set("created_at", Value::Null);
2310            record.set("updated_at", Value::Null);
2311            record.set("value", Value::text(""));
2312            record.set("status", Value::text("missing"));
2313            record.set("tombstone", Value::Boolean(false));
2314            record.set("op", Value::Null);
2315        }
2316    }
2317    result.push(record);
2318    result
2319}
2320
2321fn vault_fingerprint(value: &Value) -> String {
2322    match value {
2323        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2324        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2325    }
2326}
2327
2328fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2329    let mut object = crate::json::Map::new();
2330    object.insert(
2331        "key".to_string(),
2332        crate::json::Value::String(entry.key.clone()),
2333    );
2334    object.insert(
2335        "version".to_string(),
2336        crate::json::Value::Number(entry.version as f64),
2337    );
2338    object.insert(
2339        "fingerprint".to_string(),
2340        if entry.tombstone {
2341            crate::json::Value::Null
2342        } else {
2343            crate::json::Value::String(vault_fingerprint(&entry.value))
2344        },
2345    );
2346    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2347    object.insert(
2348        "actor".to_string(),
2349        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2350    );
2351    object.insert(
2352        "sequence_id".to_string(),
2353        crate::json::Value::Number(entry.sequence_id as f64),
2354    );
2355    object.insert(
2356        "tombstone".to_string(),
2357        crate::json::Value::Bool(entry.tombstone),
2358    );
2359    object.insert(
2360        "op".to_string(),
2361        crate::json::Value::String(entry.op.clone()),
2362    );
2363    crate::json::Value::Object(object)
2364}
2365
2366fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2367    match vault_tags_value(metadata) {
2368        Value::Array(values) => crate::json::Value::Array(
2369            values
2370                .into_iter()
2371                .filter_map(|value| match value {
2372                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2373                    _ => None,
2374                })
2375                .collect(),
2376        ),
2377        _ => crate::json::Value::Array(Vec::new()),
2378    }
2379}
2380
2381fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2382    [(
2383        "tags".to_string(),
2384        MetadataValue::Array(
2385            tags.iter()
2386                .map(|tag| MetadataValue::String(tag.clone()))
2387                .collect(),
2388        ),
2389    )]
2390    .into_iter()
2391    .collect()
2392}
2393
2394fn vault_filter_watch_event(
2395    mut event: crate::replication::cdc::KvWatchEvent,
2396) -> crate::replication::cdc::KvWatchEvent {
2397    event.before = event.before.and_then(vault_metadata_json_only);
2398    event.after = event.after.and_then(vault_metadata_json_only);
2399    event
2400}
2401
2402fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2403    let object = value.as_object()?;
2404    let mut out = crate::json::Map::new();
2405    for field in [
2406        "key",
2407        "version",
2408        "fingerprint",
2409        "tags",
2410        "actor",
2411        "sequence_id",
2412        "tombstone",
2413        "op",
2414    ] {
2415        if let Some(value) = object.get(field) {
2416            out.insert(field.to_string(), value.clone());
2417        }
2418    }
2419    Some(crate::json::Value::Object(out))
2420}
2421
2422fn vault_tags_value(metadata: &Metadata) -> Value {
2423    match metadata.get("tags") {
2424        Some(MetadataValue::Array(values)) => Value::Array(
2425            values
2426                .iter()
2427                .filter_map(|value| match value {
2428                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2429                    _ => None,
2430                })
2431                .collect(),
2432        ),
2433        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2434            Value::Array(vec![Value::text(tag.clone())])
2435        }
2436        _ => Value::Array(Vec::new()),
2437    }
2438}
2439
2440fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2441    let bytes = hex::decode(hex_key)
2442        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2443    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2444        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2445    })?;
2446    Ok(key)
2447}
2448
2449fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2450    if tags.is_empty() {
2451        return None;
2452    }
2453    let values = tags
2454        .iter()
2455        .map(|tag| MetadataValue::String(tag.clone()))
2456        .collect();
2457    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2458}
2459
2460fn kv_tags_value(tags: &[String]) -> Value {
2461    let json = crate::json::Value::Array(
2462        tags.iter()
2463            .map(|tag| crate::json::Value::String(tag.clone()))
2464            .collect(),
2465    );
2466    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2467}
2468
2469fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2470    if let crate::storage::EntityData::Row(ref row) = entity.data {
2471        if let Some(ref named) = row.named {
2472            return named.get("value").cloned();
2473        }
2474    }
2475    None
2476}
2477
2478fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2479    let now = current_unix_ms();
2480    crate::physical::CollectionContract {
2481        name: name.to_string(),
2482        declared_model: crate::catalog::CollectionModel::Kv,
2483        schema_mode: crate::catalog::SchemaMode::Dynamic,
2484        origin: crate::physical::ContractOrigin::Implicit,
2485        version: 1,
2486        created_at_unix_ms: now,
2487        updated_at_unix_ms: now,
2488        default_ttl_ms: None,
2489        vector_dimension: None,
2490        vector_metric: None,
2491        context_index_fields: Vec::new(),
2492        declared_columns: Vec::new(),
2493        table_def: None,
2494        timestamps_enabled: false,
2495        context_index_enabled: false,
2496        metrics_raw_retention_ms: None,
2497        metrics_rollup_policies: Vec::new(),
2498        metrics_tenant_identity: None,
2499        metrics_namespace: None,
2500        append_only: false,
2501        subscriptions: Vec::new(),
2502        session_key: None,
2503        session_gap_ms: None,
2504        retention_duration_ms: None,
2505    }
2506}
2507
2508fn current_unix_ms() -> u128 {
2509    std::time::SystemTime::now()
2510        .duration_since(std::time::UNIX_EPOCH)
2511        .unwrap_or_default()
2512        .as_millis()
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517    use crate::api::RedDBOptions;
2518    use crate::catalog::CollectionModel;
2519    use crate::runtime::RedDBRuntime;
2520
2521    fn rt() -> RedDBRuntime {
2522        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2523    }
2524
2525    #[test]
2526    fn incr_missing_key_initialises_at_by() {
2527        let r = rt();
2528        let ops = super::KvAtomicOps::new(&r);
2529        let v = ops
2530            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2531            .unwrap();
2532        assert_eq!(v, 5);
2533    }
2534
2535    #[test]
2536    fn kv_runtime_stats_count_public_ops() {
2537        let r = rt();
2538        let ops = super::KvAtomicOps::new(&r);
2539
2540        ops.set(
2541            CollectionModel::Kv,
2542            "kv_default",
2543            "profile",
2544            crate::storage::schema::Value::text("alice"),
2545            None,
2546            false,
2547        )
2548        .unwrap();
2549        ops.get(CollectionModel::Kv, "kv_default", "profile")
2550            .unwrap();
2551        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2552            .unwrap();
2553        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2554            .unwrap();
2555        ops.cas(
2556            CollectionModel::Kv,
2557            "kv_default",
2558            "profile",
2559            None,
2560            crate::storage::schema::Value::text("created"),
2561            None,
2562        )
2563        .unwrap();
2564        ops.cas(
2565            CollectionModel::Kv,
2566            "kv_default",
2567            "profile",
2568            Some(&crate::storage::schema::Value::text("different")),
2569            crate::storage::schema::Value::text("ignored"),
2570            None,
2571        )
2572        .unwrap();
2573
2574        let stats = r.stats().kv;
2575        assert_eq!(stats.puts, 1);
2576        assert_eq!(stats.gets, 1);
2577        assert_eq!(stats.deletes, 1);
2578        assert_eq!(stats.incrs, 1);
2579        assert_eq!(stats.cas_success, 1);
2580        assert_eq!(stats.cas_conflict, 1);
2581    }
2582
2583    #[test]
2584    fn kv_invalidate_tags_removes_matching_entries_only() {
2585        let r = rt();
2586
2587        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2588            .unwrap();
2589
2590        let miss = r
2591            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2592            .unwrap();
2593        assert_eq!(miss.affected_rows, 0);
2594        assert!(matches!(
2595            r.execute_query("KV GET sessions.blob")
2596                .unwrap()
2597                .result
2598                .records[0]
2599                .get("value"),
2600            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2601        ));
2602
2603        let hit = r
2604            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2605            .unwrap();
2606        assert_eq!(hit.affected_rows, 1);
2607        assert!(matches!(
2608            r.execute_query("KV GET sessions.blob")
2609                .unwrap()
2610                .result
2611                .records[0]
2612                .get("value"),
2613            Some(crate::storage::schema::Value::Null)
2614        ));
2615    }
2616
2617    #[test]
2618    fn kv_runtime_stats_count_watch_streams_and_events() {
2619        let r = rt();
2620        let ops = super::KvAtomicOps::new(&r);
2621        assert_eq!(r.stats().kv.watch_streams_active, 0);
2622
2623        {
2624            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2625            assert_eq!(r.stats().kv.watch_streams_active, 1);
2626
2627            ops.set(
2628                CollectionModel::Kv,
2629                "kv_default",
2630                "watched",
2631                crate::storage::schema::Value::Integer(1),
2632                None,
2633                false,
2634            )
2635            .unwrap();
2636            let event = stream.poll_next().expect("watch event");
2637            assert_eq!(event.key, "watched");
2638            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2639
2640            stream.record_drop_count(3);
2641            assert_eq!(r.stats().kv.watch_drops, 3);
2642        }
2643
2644        assert_eq!(r.stats().kv.watch_streams_active, 0);
2645    }
2646
2647    #[test]
2648    fn incr_existing_integer_accumulates() {
2649        let r = rt();
2650        let ops = super::KvAtomicOps::new(&r);
2651        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2652            .unwrap();
2653        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2654            .unwrap();
2655        let v = ops
2656            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2657            .unwrap();
2658        assert_eq!(v, 3);
2659    }
2660
2661    #[test]
2662    fn decr_via_negative_by() {
2663        let r = rt();
2664        let ops = super::KvAtomicOps::new(&r);
2665        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2666            .unwrap();
2667        let v = ops
2668            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2669            .unwrap();
2670        assert_eq!(v, 7);
2671    }
2672
2673    #[test]
2674    fn concurrent_incr_single_key_is_atomic() {
2675        const THREADS: usize = 8;
2676        const ITERS: usize = 1000;
2677
2678        let runtime = std::sync::Arc::new(rt());
2679        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2680        let mut handles = Vec::new();
2681
2682        for _ in 0..THREADS {
2683            let runtime = std::sync::Arc::clone(&runtime);
2684            let barrier = std::sync::Arc::clone(&barrier);
2685            handles.push(std::thread::spawn(move || {
2686                let ops = super::KvAtomicOps::new(&runtime);
2687                barrier.wait();
2688                for _ in 0..ITERS {
2689                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2690                        .unwrap();
2691                }
2692            }));
2693        }
2694
2695        for handle in handles {
2696            handle.join().expect("worker should finish");
2697        }
2698
2699        let ops = super::KvAtomicOps::new(&runtime);
2700        assert_eq!(
2701            ops.get(CollectionModel::Kv, "kv_default", "counter")
2702                .unwrap(),
2703            Some(crate::storage::schema::Value::Integer(
2704                (THREADS * ITERS) as i64
2705            ))
2706        );
2707    }
2708
2709    #[test]
2710    fn incr_on_string_value_returns_error() {
2711        let r = rt();
2712        let ops = super::KvAtomicOps::new(&r);
2713        ops.set(
2714            CollectionModel::Kv,
2715            "kv_default",
2716            "name",
2717            crate::storage::schema::Value::text("alice"),
2718            None,
2719            false,
2720        )
2721        .unwrap();
2722        let err = ops
2723            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2724            .unwrap_err();
2725        assert!(err.to_string().contains("non-integer"));
2726    }
2727
2728    // --- CAS tests ---
2729
2730    #[test]
2731    fn cas_matching_value_succeeds() {
2732        let r = rt();
2733        let ops = super::KvAtomicOps::new(&r);
2734        ops.set(
2735            CollectionModel::Kv,
2736            "kv_default",
2737            "lock",
2738            crate::storage::schema::Value::text("free"),
2739            None,
2740            false,
2741        )
2742        .unwrap();
2743        let (ok, prev) = ops
2744            .cas(
2745                CollectionModel::Kv,
2746                "kv_default",
2747                "lock",
2748                Some(&crate::storage::schema::Value::text("free")),
2749                crate::storage::schema::Value::text("held"),
2750                None,
2751            )
2752            .unwrap();
2753        assert!(ok);
2754        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2755        // Value actually changed.
2756        assert_eq!(
2757            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2758            Some(crate::storage::schema::Value::text("held"))
2759        );
2760    }
2761
2762    #[test]
2763    fn concurrent_cas_allows_one_success_per_round() {
2764        const THREADS: usize = 8;
2765        const ROUNDS: usize = 100;
2766
2767        let runtime = std::sync::Arc::new(rt());
2768        let ops = super::KvAtomicOps::new(&runtime);
2769        ops.set(
2770            CollectionModel::Kv,
2771            "kv_default",
2772            "cas_counter",
2773            crate::storage::schema::Value::Integer(0),
2774            None,
2775            false,
2776        )
2777        .unwrap();
2778
2779        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2780        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2781        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2782        let mut handles = Vec::new();
2783
2784        for _ in 0..THREADS {
2785            let runtime = std::sync::Arc::clone(&runtime);
2786            let start_round = std::sync::Arc::clone(&start_round);
2787            let finish_round = std::sync::Arc::clone(&finish_round);
2788            let successes = std::sync::Arc::clone(&successes);
2789            handles.push(std::thread::spawn(move || {
2790                let ops = super::KvAtomicOps::new(&runtime);
2791                for round in 0..ROUNDS {
2792                    start_round.wait();
2793                    let (ok, _) = ops
2794                        .cas(
2795                            CollectionModel::Kv,
2796                            "kv_default",
2797                            "cas_counter",
2798                            Some(&crate::storage::schema::Value::Integer(round as i64)),
2799                            crate::storage::schema::Value::Integer((round + 1) as i64),
2800                            None,
2801                        )
2802                        .unwrap();
2803                    if ok {
2804                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2805                    }
2806                    finish_round.wait();
2807                }
2808            }));
2809        }
2810
2811        for handle in handles {
2812            handle.join().expect("worker should finish");
2813        }
2814
2815        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2816        assert_eq!(
2817            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2818                .unwrap(),
2819            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2820        );
2821    }
2822
2823    #[test]
2824    fn cas_mismatching_value_fails() {
2825        let r = rt();
2826        let ops = super::KvAtomicOps::new(&r);
2827        ops.set(
2828            CollectionModel::Kv,
2829            "kv_default",
2830            "lock",
2831            crate::storage::schema::Value::text("free"),
2832            None,
2833            false,
2834        )
2835        .unwrap();
2836        let (ok, current) = ops
2837            .cas(
2838                CollectionModel::Kv,
2839                "kv_default",
2840                "lock",
2841                Some(&crate::storage::schema::Value::text("held")),
2842                crate::storage::schema::Value::text("worker-7"),
2843                None,
2844            )
2845            .unwrap();
2846        assert!(!ok);
2847        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2848        // Value unchanged.
2849        assert_eq!(
2850            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2851            Some(crate::storage::schema::Value::text("free"))
2852        );
2853    }
2854
2855    #[test]
2856    fn cas_expect_null_on_missing_key_creates() {
2857        let r = rt();
2858        let ops = super::KvAtomicOps::new(&r);
2859        let (ok, prev) = ops
2860            .cas(
2861                CollectionModel::Kv,
2862                "kv_default",
2863                "new_key",
2864                None,
2865                crate::storage::schema::Value::text("created"),
2866                None,
2867            )
2868            .unwrap();
2869        assert!(ok);
2870        assert_eq!(prev, None);
2871        assert_eq!(
2872            ops.get(CollectionModel::Kv, "kv_default", "new_key")
2873                .unwrap(),
2874            Some(crate::storage::schema::Value::text("created"))
2875        );
2876    }
2877
2878    #[test]
2879    fn cas_expect_null_on_existing_key_fails() {
2880        let r = rt();
2881        let ops = super::KvAtomicOps::new(&r);
2882        ops.set(
2883            CollectionModel::Kv,
2884            "kv_default",
2885            "taken",
2886            crate::storage::schema::Value::text("worker-1"),
2887            None,
2888            false,
2889        )
2890        .unwrap();
2891        let (ok, current) = ops
2892            .cas(
2893                CollectionModel::Kv,
2894                "kv_default",
2895                "taken",
2896                None,
2897                crate::storage::schema::Value::text("worker-2"),
2898                None,
2899            )
2900            .unwrap();
2901        assert!(!ok);
2902        assert_eq!(
2903            current,
2904            Some(crate::storage::schema::Value::text("worker-1"))
2905        );
2906    }
2907
2908    #[test]
2909    fn cas_via_sql_roundtrip() {
2910        let r = rt();
2911        // Seed value.
2912        r.execute_query("KV PUT lock = 'free'").unwrap();
2913        // CAS: free → held — should succeed.
2914        let res = r
2915            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2916            .unwrap();
2917        let row = &res.result.records[0];
2918        assert_eq!(
2919            row.get("ok"),
2920            Some(&crate::storage::schema::Value::Boolean(true))
2921        );
2922        // CAS: free → held again — should fail (value is now 'held').
2923        let res2 = r
2924            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2925            .unwrap();
2926        let row2 = &res2.result.records[0];
2927        assert_eq!(
2928            row2.get("ok"),
2929            Some(&crate::storage::schema::Value::Boolean(false))
2930        );
2931    }
2932
2933    #[test]
2934    fn cas_expect_null_via_sql() {
2935        let r = rt();
2936        let res = r
2937            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2938            .unwrap();
2939        let row = &res.result.records[0];
2940        assert_eq!(
2941            row.get("ok"),
2942            Some(&crate::storage::schema::Value::Boolean(true))
2943        );
2944        // Second call must fail.
2945        let res2 = r
2946            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2947            .unwrap();
2948        let row2 = &res2.result.records[0];
2949        assert_eq!(
2950            row2.get("ok"),
2951            Some(&crate::storage::schema::Value::Boolean(false))
2952        );
2953    }
2954
2955    #[test]
2956    fn incr_via_sql_roundtrip() {
2957        let r = rt();
2958        let res = r.execute_query("KV INCR hits").unwrap();
2959        let row = &res.result.records[0];
2960        assert_eq!(
2961            row.get("value"),
2962            Some(&crate::storage::schema::Value::Integer(1))
2963        );
2964        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2965        let row2 = &res2.result.records[0];
2966        assert_eq!(
2967            row2.get("value"),
2968            Some(&crate::storage::schema::Value::Integer(5))
2969        );
2970    }
2971
2972    #[test]
2973    fn concurrent_self_referential_update_is_atomic() {
2974        const THREADS: usize = 8;
2975        const ITERS: usize = 100;
2976
2977        let runtime = std::sync::Arc::new(rt());
2978        runtime
2979            .execute_query("CREATE TABLE counters (id INT, n INT)")
2980            .unwrap();
2981        runtime
2982            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
2983            .unwrap();
2984
2985        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2986        let mut handles = Vec::new();
2987        for _ in 0..THREADS {
2988            let runtime = std::sync::Arc::clone(&runtime);
2989            let barrier = std::sync::Arc::clone(&barrier);
2990            handles.push(std::thread::spawn(move || {
2991                barrier.wait();
2992                for _ in 0..ITERS {
2993                    runtime
2994                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
2995                        .unwrap();
2996                }
2997            }));
2998        }
2999
3000        for handle in handles {
3001            handle.join().expect("worker should finish");
3002        }
3003
3004        let selected = runtime
3005            .execute_query("SELECT n FROM counters WHERE id = 1")
3006            .unwrap();
3007        assert_eq!(
3008            selected.result.records[0].get("n"),
3009            Some(&crate::storage::schema::Value::Integer(
3010                (THREADS * ITERS) as i64
3011            ))
3012        );
3013    }
3014
3015    #[test]
3016    fn watch_stream_delivers_key_events_in_lsn_order() {
3017        let r = rt();
3018        let ops = super::KvAtomicOps::new(&r);
3019        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3020
3021        ops.set(
3022            CollectionModel::Kv,
3023            "kv_default",
3024            "seq",
3025            crate::storage::schema::Value::Integer(1),
3026            None,
3027            false,
3028        )
3029        .unwrap();
3030        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3031            .unwrap();
3032        ops.delete(CollectionModel::Kv, "kv_default", "seq")
3033            .unwrap();
3034        ops.set(
3035            CollectionModel::Kv,
3036            "kv_default",
3037            "seq",
3038            crate::storage::schema::Value::Integer(9),
3039            None,
3040            false,
3041        )
3042        .unwrap();
3043
3044        let mut events = Vec::new();
3045        while let Some(event) = stream.poll_next() {
3046            events.push(event);
3047            if events.len() == 4 {
3048                break;
3049            }
3050        }
3051
3052        assert_eq!(events.len(), 4);
3053        assert_eq!(
3054            events[0].op,
3055            crate::replication::cdc::ChangeOperation::Insert
3056        );
3057        assert_eq!(
3058            events[1].op,
3059            crate::replication::cdc::ChangeOperation::Update
3060        );
3061        assert_eq!(
3062            events[2].op,
3063            crate::replication::cdc::ChangeOperation::Delete
3064        );
3065        assert_eq!(
3066            events[3].op,
3067            crate::replication::cdc::ChangeOperation::Insert
3068        );
3069        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3070    }
3071
3072    #[test]
3073    fn watch_prefix_stream_delivers_matching_events_only() {
3074        let r = rt();
3075        let ops = super::KvAtomicOps::new(&r);
3076        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3077
3078        ops.set(
3079            CollectionModel::Kv,
3080            "kv_default",
3081            "acct:1",
3082            crate::storage::schema::Value::Integer(1),
3083            None,
3084            false,
3085        )
3086        .unwrap();
3087        ops.set(
3088            CollectionModel::Kv,
3089            "kv_default",
3090            "session:1",
3091            crate::storage::schema::Value::Integer(2),
3092            None,
3093            false,
3094        )
3095        .unwrap();
3096        ops.set(
3097            CollectionModel::Kv,
3098            "kv_default",
3099            "acct:2",
3100            crate::storage::schema::Value::Integer(3),
3101            None,
3102            false,
3103        )
3104        .unwrap();
3105
3106        let first = stream.poll_next().expect("first prefix event");
3107        let second = stream.poll_next().expect("second prefix event");
3108        assert_eq!(first.key, "acct:1");
3109        assert_eq!(second.key, "acct:2");
3110        assert!(stream.poll_next().is_none());
3111    }
3112
3113    #[test]
3114    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3115        let r = rt();
3116        let ops = super::KvAtomicOps::new(&r);
3117        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3118
3119        let mut last_seen_lsn = 0;
3120        for value in 0..5 {
3121            ops.set(
3122                CollectionModel::Kv,
3123                "kv_default",
3124                "resume",
3125                crate::storage::schema::Value::Integer(value),
3126                None,
3127                false,
3128            )
3129            .unwrap();
3130            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3131        }
3132        drop(stream);
3133
3134        for value in 5..55 {
3135            ops.set(
3136                CollectionModel::Kv,
3137                "kv_default",
3138                "resume",
3139                crate::storage::schema::Value::Integer(value),
3140                None,
3141                false,
3142            )
3143            .unwrap();
3144        }
3145
3146        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3147        let mut lsns = Vec::new();
3148        while let Some(event) = resumed.poll_next() {
3149            lsns.push(event.lsn);
3150            if lsns.len() == 50 {
3151                break;
3152            }
3153        }
3154
3155        assert_eq!(lsns.len(), 50);
3156        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3157        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3158        assert!(resumed.poll_next().is_none());
3159    }
3160
3161    #[test]
3162    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3163        let r = rt();
3164        let ops = super::KvAtomicOps::new(&r);
3165        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3166
3167        for value in 0..10_000 {
3168            ops.set(
3169                CollectionModel::Kv,
3170                "kv_default",
3171                "slow",
3172                crate::storage::schema::Value::Integer(value),
3173                None,
3174                false,
3175            )
3176            .unwrap();
3177        }
3178
3179        let event = stream.poll_next().expect("tail event after drops");
3180        assert!(event.lsn > 1);
3181        assert!(event.dropped_event_count > 0);
3182        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3183        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3184    }
3185
3186    #[test]
3187    fn watch_stream_idle_timeout_closes_subscription() {
3188        let r = rt();
3189        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3190            .unwrap();
3191
3192        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3193        assert_eq!(r.stats().kv.watch_streams_active, 1);
3194        std::thread::sleep(std::time::Duration::from_millis(5));
3195
3196        assert!(stream.poll_next().is_none());
3197        assert_eq!(r.stats().kv.watch_streams_active, 0);
3198    }
3199
3200    #[test]
3201    fn watch_stream_does_not_emit_rolled_back_put() {
3202        let r = rt();
3203        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3204
3205        r.execute_query("BEGIN").unwrap();
3206        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3207        r.execute_query("ROLLBACK").unwrap();
3208
3209        assert!(stream.poll_next().is_none());
3210    }
3211}