Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn latest_kv_entries(
604        &self,
605        collection: &str,
606        prefix: Option<&str>,
607    ) -> RedDBResult<Vec<(String, crate::storage::UnifiedEntity)>> {
608        self.ensure_declared_model(crate::catalog::CollectionModel::Kv, collection)?;
609        let store = self.runtime.inner.db.store();
610        let Some(manager) = store.get_collection(collection) else {
611            return Ok(Vec::new());
612        };
613        let mut entries: std::collections::BTreeMap<String, crate::storage::UnifiedEntity> =
614            std::collections::BTreeMap::new();
615        for entity in manager.query_all(|_| true) {
616            let key = match &entity.data {
617                crate::storage::EntityData::Row(row) => row
618                    .named
619                    .as_ref()
620                    .and_then(|named| named.get("key"))
621                    .and_then(|value| match value {
622                        crate::storage::schema::Value::Text(key) => Some(key.to_string()),
623                        _ => None,
624                    }),
625                _ => None,
626            };
627            let Some(key) = key else {
628                continue;
629            };
630            if prefix.is_some_and(|prefix| !key.starts_with(prefix)) {
631                continue;
632            }
633            let should_replace = match entries.get(&key) {
634                Some(existing) => entity.id.raw() >= existing.id.raw(),
635                None => true,
636            };
637            if should_replace {
638                entries.insert(key, entity);
639            }
640        }
641        Ok(entries.into_iter().collect())
642    }
643
644    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
645        self.vault_versions(collection, key)
646            .map(super::keyed_spine::latest_version)
647    }
648
649    fn get_vault_entry_version(
650        &self,
651        collection: &str,
652        key: &str,
653        version: i64,
654    ) -> RedDBResult<Option<VaultEntry>> {
655        Ok(self
656            .vault_versions(collection, key)?
657            .into_iter()
658            .find(|entry| entry.version == version))
659    }
660
661    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
662        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
663        let store = self.runtime.inner.db.store();
664        let Some(manager) = store.get_collection(collection) else {
665            return Ok(Vec::new());
666        };
667        let entities = manager.query_all(|_| true);
668        let mut versions = Vec::new();
669        for entity in entities {
670            let crate::storage::EntityData::Row(ref row) = entity.data else {
671                continue;
672            };
673            let Some(version) =
674                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
675            else {
676                continue;
677            };
678            if version.key != key {
679                continue;
680            }
681            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
682            versions.push(VaultEntry::from_keyed_row(
683                version,
684                metadata,
685                entity.created_at,
686                entity.updated_at,
687                entity.sequence_id,
688            ));
689        }
690        Ok(versions)
691    }
692
693    fn latest_vault_entries(
694        &self,
695        collection: &str,
696        prefix: Option<&str>,
697    ) -> RedDBResult<Vec<VaultEntry>> {
698        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
699        let store = self.runtime.inner.db.store();
700        let Some(manager) = store.get_collection(collection) else {
701            return Ok(Vec::new());
702        };
703        let mut versions = Vec::new();
704        for entity in manager.query_all(|_| true) {
705            let crate::storage::EntityData::Row(ref row) = entity.data else {
706                continue;
707            };
708            let Some(version) =
709                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
710            else {
711                continue;
712            };
713            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
714            let entry = VaultEntry::from_keyed_row(
715                version,
716                metadata,
717                entity.created_at,
718                entity.updated_at,
719                entity.sequence_id,
720            );
721            versions.push(entry);
722        }
723        Ok(super::keyed_spine::latest_versions(versions, prefix))
724    }
725
726    fn append_vault_version(
727        &self,
728        collection: &str,
729        key: &str,
730        value: crate::storage::schema::Value,
731        op: &str,
732        tombstone: bool,
733        tags: &[String],
734    ) -> RedDBResult<VaultEntry> {
735        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
736        let version = self
737            .get_vault_entry(collection, key)?
738            .map(|entry| entry.version)
739            .unwrap_or(0)
740            + 1;
741        let stored_value = if tombstone {
742            crate::storage::schema::Value::Null
743        } else {
744            self.runtime.seal_vault_value(collection, value)?
745        };
746        let now = current_unix_ms() as i64;
747        let fields = vec![
748            (
749                "key".to_string(),
750                crate::storage::schema::Value::text(key.to_string()),
751            ),
752            ("value".to_string(), stored_value),
753            (
754                "version".to_string(),
755                crate::storage::schema::Value::Integer(version),
756            ),
757            (
758                "tombstone".to_string(),
759                crate::storage::schema::Value::Boolean(tombstone),
760            ),
761            (
762                "op".to_string(),
763                crate::storage::schema::Value::text(op.to_string()),
764            ),
765            (
766                "created_at_ms".to_string(),
767                crate::storage::schema::Value::Integer(now),
768            ),
769        ];
770        let mut row = crate::storage::RowData::new(Vec::new());
771        row.named = Some(fields.into_iter().collect());
772        let entity = crate::storage::UnifiedEntity::new(
773            crate::storage::EntityId::new(0),
774            crate::storage::EntityKind::TableRow {
775                table: std::sync::Arc::from(collection),
776                row_id: 0,
777            },
778            crate::storage::EntityData::Row(row),
779        );
780        let id = self
781            .runtime
782            .inner
783            .db
784            .store()
785            .insert(collection, entity)
786            .map_err(|err| RedDBError::Internal(err.to_string()))?;
787        if !tags.is_empty() {
788            self.runtime
789                .inner
790                .db
791                .store()
792                .set_metadata(
793                    collection,
794                    id,
795                    Metadata::with_fields(vault_tags_metadata(tags)),
796                )
797                .map_err(|err| RedDBError::Internal(err.to_string()))?;
798            self.runtime
799                .inner
800                .kv_tag_index
801                .replace(collection, key, id, tags);
802        }
803        self.get_vault_entry_version(collection, key, version)?
804            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
805    }
806
807    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
808        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
809        let versions = self.vault_versions(collection, key)?;
810        let store = self.runtime.inner.db.store();
811        let mut purged = 0usize;
812        for entry in versions {
813            if store
814                .delete(collection, entry.id)
815                .map_err(|err| RedDBError::Internal(err.to_string()))?
816            {
817                store.context_index().remove_entity(entry.id);
818                purged += 1;
819            }
820        }
821        Ok(purged)
822    }
823
824    fn ensure_declared_model(
825        &self,
826        model: crate::catalog::CollectionModel,
827        collection: &str,
828    ) -> RedDBResult<()> {
829        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
830            return Ok(());
831        };
832        if contract.declared_model == model
833            || contract.declared_model == crate::catalog::CollectionModel::Mixed
834        {
835            return Ok(());
836        }
837        Err(RedDBError::InvalidOperation(format!(
838            "collection '{}' is declared as '{}' and does not allow '{}' operations",
839            collection,
840            keyed_model_name(contract.declared_model),
841            keyed_model_name(model)
842        )))
843    }
844}
845
846impl RedDBRuntime {
847    pub(crate) fn seal_vault_value(
848        &self,
849        collection: &str,
850        value: crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let key = self.vault_encryption_key(collection)?;
853        let plaintext = value.to_bytes();
854        let nonce_bytes = crate::auth::store::random_bytes(12);
855        let mut nonce = [0u8; 12];
856        nonce.copy_from_slice(&nonce_bytes[..12]);
857        let aad = format!("reddb.vault.{collection}");
858        let ciphertext =
859            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
860        let mut payload = Vec::with_capacity(12 + ciphertext.len());
861        payload.extend_from_slice(&nonce);
862        payload.extend_from_slice(&ciphertext);
863        Ok(crate::storage::schema::Value::Secret(payload))
864    }
865
866    fn vault_key_available(&self, collection: &str) -> bool {
867        self.vault_encryption_key(collection).is_ok()
868    }
869
870    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
871        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
872            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
873        })?;
874        if !auth_store.is_vault_backed() {
875            return Err(RedDBError::Query(
876                "vault sealed_unavailable: key provider is sealed".to_string(),
877            ));
878        }
879
880        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
881            return decode_vault_key(&hex_key);
882        }
883        auth_store.vault_secret_key().ok_or_else(|| {
884            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
885        })
886    }
887
888    fn unseal_vault_value(
889        &self,
890        collection: &str,
891        sealed: &crate::storage::schema::Value,
892    ) -> RedDBResult<crate::storage::schema::Value> {
893        let crate::storage::schema::Value::Secret(payload) = sealed else {
894            return Err(RedDBError::Query(
895                "vault unseal failed: stored value is not sealed".to_string(),
896            ));
897        };
898        if payload.len() < 12 {
899            return Err(RedDBError::Query(
900                "vault unseal failed: sealed payload is truncated".to_string(),
901            ));
902        }
903        let key = self.vault_encryption_key(collection)?;
904        let mut nonce = [0u8; 12];
905        nonce.copy_from_slice(&payload[..12]);
906        let aad = format!("reddb.vault.{collection}");
907        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
908            &key,
909            &nonce,
910            aad.as_bytes(),
911            &payload[12..],
912        )
913        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
914        let (value, consumed) =
915            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
916                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
917            })?;
918        if consumed != plaintext.len() {
919            return Err(RedDBError::Query(
920                "vault unseal failed: trailing plaintext bytes".to_string(),
921            ));
922        }
923        Ok(value)
924    }
925
926    /// Internal peek of a vault entry's unsealed value, bypassing audit and
927    /// policy checks. Used by `SecretRefGuard` to detect chained `secret_ref`
928    /// indirection at config write time without emitting vault read events.
929    /// Returns `Ok(None)` if the entry is absent, tombstoned, or its sealed
930    /// payload cannot be decrypted with the available key material.
931    pub(crate) fn peek_vault_unsealed(
932        &self,
933        collection: &str,
934        key: &str,
935    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
936        let ops = KvAtomicOps::new(self);
937        let Some(entry) = ops.get_vault_entry(collection, key)? else {
938            return Ok(None);
939        };
940        if entry.tombstone {
941            return Ok(None);
942        }
943        Ok(self.unseal_vault_value(collection, &entry.value).ok())
944    }
945
946    fn vault_target_resource(collection: &str, key: &str) -> String {
947        if collection == "red.vault" {
948            return format!("red.vault/{}", key.to_ascii_lowercase());
949        }
950        format!("{collection}.{key}")
951    }
952
953    fn current_vault_actor() -> String {
954        current_auth_identity()
955            .map(|(principal, _)| principal)
956            .unwrap_or_else(|| "anonymous".to_string())
957    }
958
959    fn vault_request_id() -> String {
960        let conn_id = current_connection_id();
961        if conn_id == 0 {
962            "embedded".to_string()
963        } else {
964            format!("conn-{conn_id}")
965        }
966    }
967
968    fn check_vault_capability(
969        &self,
970        action: &str,
971        collection: &str,
972        key: &str,
973    ) -> Result<(), String> {
974        let Some(auth_store) = self.inner.auth_store.read().clone() else {
975            return Ok(());
976        };
977        if !auth_store.iam_authorization_enabled() {
978            return Ok(());
979        }
980        let Some((principal, role)) = current_auth_identity() else {
981            return Err(
982                "IAM authorization is enabled; vault capability check requires an authenticated principal"
983                    .to_string(),
984            );
985        };
986        let tenant = current_tenant();
987        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
988        let mut resource = crate::auth::policies::ResourceRef::new(
989            "vault",
990            Self::vault_target_resource(collection, key),
991        );
992        if let Some(ref tenant) = tenant {
993            resource = resource.with_tenant(tenant.clone());
994        }
995        let ctx = crate::auth::policies::EvalContext {
996            principal_tenant: tenant.clone(),
997            current_tenant: tenant,
998            peer_ip: None,
999            mfa_present: false,
1000            now_ms: crate::utils::now_unix_millis() as u128,
1001            principal_is_admin_role: role == crate::auth::Role::Admin,
1002            principal_is_platform_scoped: principal_id.tenant.is_none(),
1003        };
1004        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1005            Ok(())
1006        } else {
1007            Err(format!(
1008                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
1009                principal,
1010                action,
1011                Self::vault_target_resource(collection, key)
1012            ))
1013        }
1014    }
1015
1016    fn check_system_vault_capability(
1017        &self,
1018        action: &str,
1019        collection: &str,
1020        key: &str,
1021    ) -> Result<(), String> {
1022        if collection != "red.vault" {
1023            return Ok(());
1024        }
1025        self.check_vault_capability(action, collection, key)
1026    }
1027
1028    fn audit_vault_unseal(
1029        &self,
1030        collection: &str,
1031        key: &str,
1032        outcome: crate::runtime::audit_log::Outcome,
1033        reason: &str,
1034        entry: Option<&VaultEntry>,
1035    ) {
1036        let actor = Self::current_vault_actor();
1037        let request_id = Self::vault_request_id();
1038        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
1039            .principal(actor.clone())
1040            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1041            .resource(format!(
1042                "vault:{}",
1043                Self::vault_target_resource(collection, key)
1044            ))
1045            .outcome(outcome)
1046            .correlation_id(request_id.clone())
1047            .fields([
1048                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1049                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1050                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1051                crate::runtime::audit_log::AuditFieldEscaper::field(
1052                    "target",
1053                    Self::vault_target_resource(collection, key),
1054                ),
1055                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1056                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1057                crate::runtime::audit_log::AuditFieldEscaper::field(
1058                    "connection_id",
1059                    current_connection_id(),
1060                ),
1061            ]);
1062        if let Some(tenant) = current_tenant() {
1063            builder = builder.tenant(tenant);
1064        }
1065        if let Some(entry) = entry {
1066            builder = builder.fields([
1067                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1068                crate::runtime::audit_log::AuditFieldEscaper::field(
1069                    "sequence_id",
1070                    entry.sequence_id,
1071                ),
1072            ]);
1073        }
1074        self.audit_log().record_event(builder.build());
1075    }
1076
1077    fn audit_vault_lifecycle(
1078        &self,
1079        operation: &str,
1080        collection: &str,
1081        key: &str,
1082        outcome: crate::runtime::audit_log::Outcome,
1083        reason: &str,
1084        entry: Option<&VaultEntry>,
1085    ) {
1086        let actor = Self::current_vault_actor();
1087        let request_id = Self::vault_request_id();
1088        let mut builder =
1089            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1090                .principal(actor.clone())
1091                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1092                .resource(format!(
1093                    "vault:{}",
1094                    Self::vault_target_resource(collection, key)
1095                ))
1096                .outcome(outcome)
1097                .correlation_id(request_id.clone())
1098                .fields([
1099                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1100                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1101                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1102                    crate::runtime::audit_log::AuditFieldEscaper::field(
1103                        "target",
1104                        Self::vault_target_resource(collection, key),
1105                    ),
1106                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1107                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1108                    crate::runtime::audit_log::AuditFieldEscaper::field(
1109                        "connection_id",
1110                        current_connection_id(),
1111                    ),
1112                ]);
1113        if let Some(tenant) = current_tenant() {
1114            builder = builder.tenant(tenant);
1115        }
1116        if let Some(entry) = entry {
1117            builder = builder.fields([
1118                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1119                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1120                crate::runtime::audit_log::AuditFieldEscaper::field(
1121                    "sequence_id",
1122                    entry.sequence_id,
1123                ),
1124            ]);
1125        }
1126        self.audit_log().record_event(builder.build());
1127    }
1128
1129    fn emit_vault_control_event(
1130        &self,
1131        kind: crate::runtime::control_events::EventKind,
1132        outcome: crate::runtime::control_events::Outcome,
1133        action: &'static str,
1134        collection: &str,
1135        key: &str,
1136        reason: &str,
1137        entry: Option<&VaultEntry>,
1138        extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1139    ) -> RedDBResult<()> {
1140        use crate::runtime::control_events::{
1141            ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1142        };
1143        use std::borrow::Cow;
1144
1145        let tenant = current_tenant();
1146        let principal = current_auth_identity().map(|(principal, _)| principal);
1147        let actor_user = principal
1148            .as_ref()
1149            .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1150        let request_id = Self::vault_request_id();
1151        let actor = actor_user
1152            .as_ref()
1153            .map(ActorRef::User)
1154            .unwrap_or(ActorRef::Anonymous);
1155        let ctx = ControlEventCtx {
1156            actor,
1157            scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1158            request_id: Some(Cow::Borrowed(request_id.as_str())),
1159            trace_id: None,
1160        };
1161
1162        let target = Self::vault_target_resource(collection, key);
1163        let mut fields = std::collections::HashMap::new();
1164        fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1165        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1166        fields.insert("key".to_string(), Sensitivity::raw(key));
1167        fields.insert(
1168            "connection_id".to_string(),
1169            Sensitivity::raw(current_connection_id().to_string()),
1170        );
1171        if let Some(entry) = entry {
1172            fields.insert(
1173                "entity_id".to_string(),
1174                Sensitivity::raw(entry.id.raw().to_string()),
1175            );
1176            fields.insert(
1177                "sequence_id".to_string(),
1178                Sensitivity::raw(entry.sequence_id.to_string()),
1179            );
1180            fields.insert(
1181                "version".to_string(),
1182                Sensitivity::raw(entry.version.to_string()),
1183            );
1184            fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1185            fields.insert(
1186                "tombstone".to_string(),
1187                Sensitivity::raw(entry.tombstone.to_string()),
1188            );
1189            if !entry.tombstone {
1190                fields.insert(
1191                    "fingerprint".to_string(),
1192                    Sensitivity::raw(vault_fingerprint(&entry.value)),
1193                );
1194            }
1195            fields.insert(
1196                "tags".to_string(),
1197                Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1198            );
1199        }
1200        for (key, value) in extra_fields {
1201            fields.insert(key, value);
1202        }
1203
1204        let event = ControlEvent {
1205            kind,
1206            outcome,
1207            action: Cow::Borrowed(action),
1208            resource: Some(format!("vault:{target}")),
1209            reason: Some(reason.to_string()),
1210            matched_policy_id: None,
1211            fields,
1212        };
1213        let ledger = self.inner.control_event_ledger.read();
1214        match ledger.emit(&ctx, event) {
1215            Ok(_) => Ok(()),
1216            Err(err) if self.inner.control_event_config.require_persistence() => {
1217                Err(RedDBError::Internal(err.to_string()))
1218            }
1219            Err(_) => Ok(()),
1220        }
1221    }
1222
1223    pub(crate) fn resolve_vault_secret_value(
1224        &self,
1225        collection: &str,
1226        key: &str,
1227    ) -> RedDBResult<Value> {
1228        let ops = KvAtomicOps::new(self);
1229        let entry = ops.get_vault_entry(collection, key)?;
1230        if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1231            self.audit_vault_unseal(
1232                collection,
1233                key,
1234                crate::runtime::audit_log::Outcome::Denied,
1235                &reason,
1236                entry.as_ref(),
1237            );
1238            self.emit_vault_control_event(
1239                crate::runtime::control_events::EventKind::VaultRead,
1240                crate::runtime::control_events::Outcome::Denied,
1241                "vault:read",
1242                collection,
1243                key,
1244                &reason,
1245                entry.as_ref(),
1246                Vec::new(),
1247            )?;
1248            return Err(RedDBError::Query(reason));
1249        }
1250        let Some(entry) = entry else {
1251            let reason = "not_found";
1252            self.audit_vault_unseal(
1253                collection,
1254                key,
1255                crate::runtime::audit_log::Outcome::Denied,
1256                reason,
1257                None,
1258            );
1259            self.emit_vault_control_event(
1260                crate::runtime::control_events::EventKind::VaultRead,
1261                crate::runtime::control_events::Outcome::Denied,
1262                "vault:read",
1263                collection,
1264                key,
1265                reason,
1266                None,
1267                Vec::new(),
1268            )?;
1269            return Err(RedDBError::NotFound(format!(
1270                "vault secret '{}.{}' not found",
1271                collection, key
1272            )));
1273        };
1274        if entry.tombstone {
1275            let reason = "deleted";
1276            self.audit_vault_unseal(
1277                collection,
1278                key,
1279                crate::runtime::audit_log::Outcome::Denied,
1280                reason,
1281                Some(&entry),
1282            );
1283            self.emit_vault_control_event(
1284                crate::runtime::control_events::EventKind::VaultRead,
1285                crate::runtime::control_events::Outcome::Denied,
1286                "vault:read",
1287                collection,
1288                key,
1289                reason,
1290                Some(&entry),
1291                Vec::new(),
1292            )?;
1293            return Err(RedDBError::NotFound(format!(
1294                "vault secret '{}.{}' is deleted",
1295                collection, key
1296            )));
1297        }
1298        match self.unseal_vault_value(collection, &entry.value) {
1299            Ok(value) => {
1300                self.audit_vault_unseal(
1301                    collection,
1302                    key,
1303                    crate::runtime::audit_log::Outcome::Success,
1304                    "ok",
1305                    Some(&entry),
1306                );
1307                self.emit_vault_control_event(
1308                    crate::runtime::control_events::EventKind::VaultRead,
1309                    crate::runtime::control_events::Outcome::Allowed,
1310                    "vault:read",
1311                    collection,
1312                    key,
1313                    "ok",
1314                    Some(&entry),
1315                    Vec::new(),
1316                )?;
1317                Ok(value)
1318            }
1319            Err(err) => {
1320                let reason = err.to_string();
1321                self.audit_vault_unseal(
1322                    collection,
1323                    key,
1324                    crate::runtime::audit_log::Outcome::Error,
1325                    &reason,
1326                    Some(&entry),
1327                );
1328                self.emit_vault_control_event(
1329                    crate::runtime::control_events::EventKind::VaultRead,
1330                    crate::runtime::control_events::Outcome::Error,
1331                    "vault:read",
1332                    collection,
1333                    key,
1334                    &reason,
1335                    Some(&entry),
1336                    Vec::new(),
1337                )?;
1338                Err(err)
1339            }
1340        }
1341    }
1342
1343    /// Dispatch a `KV PUT / GET / DELETE` command.
1344    pub fn execute_kv_command(
1345        &self,
1346        raw_query: &str,
1347        cmd: &crate::storage::query::ast::KvCommand,
1348    ) -> RedDBResult<RuntimeQueryResult> {
1349        use crate::storage::query::ast::KvCommand;
1350
1351        let ops = KvAtomicOps::new(self);
1352
1353        match cmd {
1354            KvCommand::Put {
1355                model,
1356                collection,
1357                key,
1358                value,
1359                ttl_ms,
1360                tags,
1361                if_not_exists,
1362            } => {
1363                if *model == crate::catalog::CollectionModel::Vault {
1364                    self.check_system_vault_capability("vault:write", collection, key)
1365                        .map_err(RedDBError::Query)?;
1366                }
1367                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1368                let (created, id) = ops.set_with_tags_for_model(
1369                    *model,
1370                    collection,
1371                    key,
1372                    value.clone(),
1373                    *ttl_ms,
1374                    tags,
1375                    *if_not_exists,
1376                )?;
1377
1378                let mut result = UnifiedResult::with_columns(vec![
1379                    "ok".into(),
1380                    "collection".into(),
1381                    "key".into(),
1382                    "id".into(),
1383                    "created".into(),
1384                    "tags".into(),
1385                ]);
1386                let mut record = UnifiedRecord::new();
1387                record.set("ok", Value::Boolean(true));
1388                record.set("collection", Value::text(collection.clone()));
1389                record.set("key", Value::text(key.clone()));
1390                record.set("id", Value::Integer(id.raw() as i64));
1391                record.set("created", Value::Boolean(created));
1392                record.set("tags", kv_tags_value(tags));
1393                result.push(record);
1394
1395                Ok(RuntimeQueryResult {
1396                    query: raw_query.to_string(),
1397                    mode: crate::storage::query::modes::QueryMode::Sql,
1398                    statement: if *model == crate::catalog::CollectionModel::Vault {
1399                        "vault_put"
1400                    } else {
1401                        "kv_put"
1402                    },
1403                    engine: if *model == crate::catalog::CollectionModel::Vault {
1404                        "vault"
1405                    } else {
1406                        "kv"
1407                    },
1408                    result,
1409                    affected_rows: 1,
1410                    statement_type: if created { "insert" } else { "update" },
1411                    bookmark: None,
1412                })
1413            }
1414            KvCommand::InvalidateTags { collection, tags } => {
1415                let invalidated = ops.invalidate_tags(collection, tags)?;
1416
1417                let mut result = UnifiedResult::with_columns(vec![
1418                    "ok".into(),
1419                    "collection".into(),
1420                    "invalidated".into(),
1421                    "tags".into(),
1422                ]);
1423                let mut record = UnifiedRecord::new();
1424                record.set("ok", Value::Boolean(true));
1425                record.set("collection", Value::text(collection.clone()));
1426                record.set("invalidated", Value::Integer(invalidated as i64));
1427                record.set("tags", kv_tags_value(tags));
1428                result.push(record);
1429
1430                Ok(RuntimeQueryResult {
1431                    query: raw_query.to_string(),
1432                    mode: crate::storage::query::modes::QueryMode::Sql,
1433                    statement: "kv_invalidate_tags",
1434                    engine: "kv",
1435                    result,
1436                    affected_rows: invalidated as u64,
1437                    statement_type: "delete",
1438                    bookmark: None,
1439                })
1440            }
1441
1442            KvCommand::Rotate {
1443                collection,
1444                key,
1445                value,
1446                tags,
1447            } => {
1448                self.check_system_vault_capability("vault:write", collection, key)
1449                    .map_err(RedDBError::Query)?;
1450                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1451                let entry = ops.append_vault_version(
1452                    collection,
1453                    key,
1454                    value.clone(),
1455                    "rotate",
1456                    false,
1457                    tags,
1458                )?;
1459                self.record_kv_watch_event(
1460                    crate::replication::cdc::ChangeOperation::Update,
1461                    collection,
1462                    key,
1463                    entry.id.raw(),
1464                    None,
1465                    Some(vault_entry_metadata_json(&entry)),
1466                );
1467                self.audit_vault_lifecycle(
1468                    "rotate",
1469                    collection,
1470                    key,
1471                    crate::runtime::audit_log::Outcome::Success,
1472                    "ok",
1473                    Some(&entry),
1474                );
1475                self.emit_vault_control_event(
1476                    crate::runtime::control_events::EventKind::VaultRotate,
1477                    crate::runtime::control_events::Outcome::Allowed,
1478                    "vault:rotate",
1479                    collection,
1480                    key,
1481                    "ok",
1482                    Some(&entry),
1483                    Vec::new(),
1484                )?;
1485                Ok(vault_write_result(
1486                    raw_query,
1487                    "vault_rotate",
1488                    "update",
1489                    collection,
1490                    key,
1491                    &entry,
1492                    1,
1493                ))
1494            }
1495
1496            KvCommand::List {
1497                model,
1498                collection,
1499                prefix,
1500                limit,
1501                offset,
1502                as_json,
1503            } => {
1504                if *model == crate::catalog::CollectionModel::Vault {
1505                    if *as_json {
1506                        return Err(RedDBError::Query(
1507                            "LIST VAULT AS JSON is not supported; vault list only exposes metadata"
1508                                .to_string(),
1509                        ));
1510                    }
1511                    let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1512                    entries.sort_by(|left, right| left.key.cmp(&right.key));
1513                    let mut result = UnifiedResult::with_columns(vec![
1514                        "collection".into(),
1515                        "key".into(),
1516                        "version".into(),
1517                        "fingerprint".into(),
1518                        "tags".into(),
1519                        "created_at".into(),
1520                        "updated_at".into(),
1521                        "status".into(),
1522                        "tombstone".into(),
1523                        "op".into(),
1524                    ]);
1525                    let mut visible = Vec::new();
1526                    for entry in entries {
1527                        match self.check_vault_capability(
1528                            "vault:read_metadata",
1529                            collection,
1530                            &entry.key,
1531                        ) {
1532                            Ok(()) => {
1533                                self.emit_vault_control_event(
1534                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1535                                    crate::runtime::control_events::Outcome::Allowed,
1536                                    "vault:read_metadata",
1537                                    collection,
1538                                    &entry.key,
1539                                    "ok",
1540                                    Some(&entry),
1541                                    Vec::new(),
1542                                )?;
1543                                visible.push(entry);
1544                            }
1545                            Err(reason) => {
1546                                self.emit_vault_control_event(
1547                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1548                                    crate::runtime::control_events::Outcome::Denied,
1549                                    "vault:read_metadata",
1550                                    collection,
1551                                    &entry.key,
1552                                    &reason,
1553                                    Some(&entry),
1554                                    Vec::new(),
1555                                )?;
1556                            }
1557                        }
1558                    }
1559                    for entry in visible
1560                        .into_iter()
1561                        .skip(*offset)
1562                        .take(limit.unwrap_or(usize::MAX))
1563                    {
1564                        push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1565                    }
1566                    Ok(RuntimeQueryResult {
1567                        query: raw_query.to_string(),
1568                        mode: crate::storage::query::modes::QueryMode::Sql,
1569                        statement: "vault_list",
1570                        engine: "vault",
1571                        result,
1572                        affected_rows: 0,
1573                        statement_type: "select",
1574                        bookmark: None,
1575                    })
1576                } else {
1577                    let mut result = UnifiedResult::with_columns(vec![
1578                        "rid".into(),
1579                        "collection".into(),
1580                        "kind".into(),
1581                        "tenant".into(),
1582                        "created_at".into(),
1583                        "updated_at".into(),
1584                        "key".into(),
1585                        "value".into(),
1586                        "tags".into(),
1587                    ]);
1588                    let entries = ops.latest_kv_entries(collection, prefix.as_deref())?;
1589                    if *as_json {
1590                        let mut tree = crate::json::Value::Object(crate::json::Map::new());
1591                        for (key, entity) in entries
1592                            .into_iter()
1593                            .skip(*offset)
1594                            .take(limit.unwrap_or(usize::MAX))
1595                        {
1596                            let Some(value) = kv_value_from_entity(&entity) else {
1597                                continue;
1598                            };
1599                            let relative = match prefix {
1600                                Some(pfx) if key == *pfx => "",
1601                                Some(pfx) => key
1602                                    .strip_prefix(pfx.as_str())
1603                                    .map(|tail| tail.strip_prefix('.').unwrap_or(tail))
1604                                    .unwrap_or(key.as_str()),
1605                                None => key.as_str(),
1606                            };
1607                            insert_kv_json_path(
1608                                &mut tree,
1609                                relative,
1610                                crate::presentation::entity_json::storage_value_to_json(&value),
1611                            );
1612                        }
1613                        Ok(kv_list_json_result(raw_query, collection, prefix, tree))
1614                    } else {
1615                        for (key, entity) in entries
1616                            .into_iter()
1617                            .skip(*offset)
1618                            .take(limit.unwrap_or(usize::MAX))
1619                        {
1620                            let mut record = UnifiedRecord::new();
1621                            record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1622                            record.set("collection", Value::text(collection.clone()));
1623                            record.set("kind", Value::text("kv"));
1624                            record.set("tenant", Value::Null);
1625                            record.set("created_at", Value::UnsignedInteger(entity.created_at));
1626                            record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1627                            record.set("key", Value::text(key.clone()));
1628                            record.set(
1629                                "value",
1630                                kv_value_from_entity(&entity)
1631                                    .unwrap_or(crate::storage::schema::Value::Null),
1632                            );
1633                            record.set("tags", kv_tags_value(&ops.tags_for_key(collection, &key)));
1634                            result.push(record);
1635                        }
1636                        Ok(RuntimeQueryResult {
1637                            query: raw_query.to_string(),
1638                            mode: crate::storage::query::modes::QueryMode::Sql,
1639                            statement: "kv_list",
1640                            engine: "kv",
1641                            result,
1642                            affected_rows: 0,
1643                            statement_type: "select",
1644                            bookmark: None,
1645                        })
1646                    }
1647                }
1648            }
1649
1650            KvCommand::History { collection, key } => {
1651                let latest = ops.get_vault_entry(collection, key)?;
1652                if let Err(reason) =
1653                    self.check_vault_capability("vault:read_metadata", collection, key)
1654                {
1655                    self.emit_vault_control_event(
1656                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1657                        crate::runtime::control_events::Outcome::Denied,
1658                        "vault:read_metadata",
1659                        collection,
1660                        key,
1661                        &reason,
1662                        latest.as_ref(),
1663                        Vec::new(),
1664                    )?;
1665                    return Err(RedDBError::Query(reason));
1666                }
1667                let versions =
1668                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1669                let result = vault_history_result(collection, key, &versions);
1670                self.emit_vault_control_event(
1671                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1672                    crate::runtime::control_events::Outcome::Allowed,
1673                    "vault:read_metadata",
1674                    collection,
1675                    key,
1676                    "ok",
1677                    latest.as_ref(),
1678                    Vec::new(),
1679                )?;
1680                Ok(RuntimeQueryResult {
1681                    query: raw_query.to_string(),
1682                    mode: crate::storage::query::modes::QueryMode::Sql,
1683                    statement: "vault_history",
1684                    engine: "vault",
1685                    result,
1686                    affected_rows: 0,
1687                    statement_type: "select",
1688                    bookmark: None,
1689                })
1690            }
1691
1692            KvCommand::Purge { collection, key } => {
1693                let entry = ops.get_vault_entry(collection, key)?;
1694                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1695                    self.audit_vault_lifecycle(
1696                        "purge",
1697                        collection,
1698                        key,
1699                        crate::runtime::audit_log::Outcome::Denied,
1700                        &reason,
1701                        entry.as_ref(),
1702                    );
1703                    self.emit_vault_control_event(
1704                        crate::runtime::control_events::EventKind::VaultPurge,
1705                        crate::runtime::control_events::Outcome::Denied,
1706                        "vault:purge",
1707                        collection,
1708                        key,
1709                        &reason,
1710                        entry.as_ref(),
1711                        Vec::new(),
1712                    )?;
1713                    return Err(RedDBError::Query(reason));
1714                }
1715                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1716                let purged = ops.purge_vault_versions(collection, key)?;
1717                self.audit_vault_lifecycle(
1718                    "purge",
1719                    collection,
1720                    key,
1721                    crate::runtime::audit_log::Outcome::Success,
1722                    "ok",
1723                    entry.as_ref(),
1724                );
1725                self.emit_vault_control_event(
1726                    crate::runtime::control_events::EventKind::VaultPurge,
1727                    crate::runtime::control_events::Outcome::Allowed,
1728                    "vault:purge",
1729                    collection,
1730                    key,
1731                    "ok",
1732                    entry.as_ref(),
1733                    vec![(
1734                        "purged".to_string(),
1735                        crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1736                    )],
1737                )?;
1738                let mut result = UnifiedResult::with_columns(vec![
1739                    "ok".into(),
1740                    "collection".into(),
1741                    "key".into(),
1742                    "purged".into(),
1743                ]);
1744                let mut record = UnifiedRecord::new();
1745                record.set("ok", Value::Boolean(true));
1746                record.set("collection", Value::text(collection.clone()));
1747                record.set("key", Value::text(key.clone()));
1748                record.set("purged", Value::Integer(purged as i64));
1749                result.push(record);
1750                Ok(RuntimeQueryResult {
1751                    query: raw_query.to_string(),
1752                    mode: crate::storage::query::modes::QueryMode::Sql,
1753                    statement: "vault_purge",
1754                    engine: "vault",
1755                    result,
1756                    affected_rows: purged as u64,
1757                    statement_type: "delete",
1758                    bookmark: None,
1759                })
1760            }
1761
1762            KvCommand::Get {
1763                model,
1764                collection,
1765                key,
1766            } => {
1767                if *model == crate::catalog::CollectionModel::Vault {
1768                    let entry = ops.get_vault_entry(collection, key)?;
1769                    if let Err(reason) =
1770                        self.check_vault_capability("vault:read_metadata", collection, key)
1771                    {
1772                        self.emit_vault_control_event(
1773                            crate::runtime::control_events::EventKind::VaultMetadataRead,
1774                            crate::runtime::control_events::Outcome::Denied,
1775                            "vault:read_metadata",
1776                            collection,
1777                            key,
1778                            &reason,
1779                            entry.as_ref(),
1780                            Vec::new(),
1781                        )?;
1782                        return Err(RedDBError::Query(reason));
1783                    }
1784                    let key_available = self.vault_key_available(collection);
1785                    let result =
1786                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1787                    self.emit_vault_control_event(
1788                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1789                        crate::runtime::control_events::Outcome::Allowed,
1790                        "vault:read_metadata",
1791                        collection,
1792                        key,
1793                        "ok",
1794                        entry.as_ref(),
1795                        Vec::new(),
1796                    )?;
1797                    return Ok(RuntimeQueryResult {
1798                        query: raw_query.to_string(),
1799                        mode: crate::storage::query::modes::QueryMode::Sql,
1800                        statement: "vault_get",
1801                        engine: "vault",
1802                        result,
1803                        affected_rows: 0,
1804                        statement_type: "select",
1805                        bookmark: None,
1806                    });
1807                }
1808
1809                let entity = ops.get_entity(*model, collection, key)?;
1810                let value = entity.as_ref().and_then(kv_value_from_entity);
1811                if *model == crate::catalog::CollectionModel::Kv {
1812                    self.inner.kv_stats.incr_gets();
1813                }
1814                let mut result = UnifiedResult::with_columns(vec![
1815                    "rid".into(),
1816                    "collection".into(),
1817                    "kind".into(),
1818                    "tenant".into(),
1819                    "created_at".into(),
1820                    "updated_at".into(),
1821                    "key".into(),
1822                    "value".into(),
1823                    "tags".into(),
1824                ]);
1825                let mut record = UnifiedRecord::new();
1826                if let Some(entity) = entity.as_ref() {
1827                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1828                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1829                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1830                } else {
1831                    record.set("rid", Value::Null);
1832                    record.set("created_at", Value::Null);
1833                    record.set("updated_at", Value::Null);
1834                }
1835                record.set("collection", Value::text(collection.clone()));
1836                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1837                record.set("tenant", Value::Null);
1838                record.set("key", Value::text(key.clone()));
1839                record.set(
1840                    "value",
1841                    value.unwrap_or(crate::storage::schema::Value::Null),
1842                );
1843                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1844                result.push(record);
1845
1846                Ok(RuntimeQueryResult {
1847                    query: raw_query.to_string(),
1848                    mode: crate::storage::query::modes::QueryMode::Sql,
1849                    statement: "kv_get",
1850                    engine: "kv",
1851                    result,
1852                    affected_rows: 0,
1853                    statement_type: "select",
1854                    bookmark: None,
1855                })
1856            }
1857            KvCommand::Watch {
1858                model,
1859                collection,
1860                key,
1861                prefix,
1862                from_lsn,
1863            } => {
1864                let watch_key = if *prefix {
1865                    format!("{key}.*")
1866                } else {
1867                    key.clone()
1868                };
1869                let endpoint = match from_lsn {
1870                    Some(lsn) => format!(
1871                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1872                        keyed_model_name(*model)
1873                    ),
1874                    None => format!(
1875                        "/collections/{collection}/{}/{watch_key}/watch",
1876                        keyed_model_name(*model)
1877                    ),
1878                };
1879                let mut result = UnifiedResult::with_columns(vec![
1880                    "collection".into(),
1881                    "key".into(),
1882                    "prefix".into(),
1883                    "from_lsn".into(),
1884                    "watch_url".into(),
1885                    "streaming".into(),
1886                ]);
1887                let mut record = UnifiedRecord::new();
1888                record.set("collection", Value::text(collection.clone()));
1889                record.set("key", Value::text(watch_key));
1890                record.set("prefix", Value::Boolean(*prefix));
1891                record.set(
1892                    "from_lsn",
1893                    from_lsn
1894                        .map(Value::UnsignedInteger)
1895                        .unwrap_or(crate::storage::schema::Value::Null),
1896                );
1897                record.set("watch_url", Value::text(endpoint));
1898                record.set("streaming", Value::Boolean(true));
1899                result.push(record);
1900
1901                Ok(RuntimeQueryResult {
1902                    query: raw_query.to_string(),
1903                    mode: crate::storage::query::modes::QueryMode::Sql,
1904                    statement: "kv_watch",
1905                    engine: keyed_model_name(*model),
1906                    result,
1907                    affected_rows: 0,
1908                    statement_type: "stream",
1909                    bookmark: None,
1910                })
1911            }
1912
1913            KvCommand::Unseal {
1914                collection,
1915                key,
1916                version,
1917            } => {
1918                let latest = ops.get_vault_entry(collection, key)?;
1919                let entry = match version {
1920                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1921                    None => latest.clone(),
1922                };
1923                let action = match (version, latest.as_ref()) {
1924                    (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1925                    (Some(_), _) => "vault:unseal_history",
1926                    _ => "vault:read",
1927                };
1928                let event_kind = if action == "vault:read" {
1929                    crate::runtime::control_events::EventKind::VaultRead
1930                } else {
1931                    crate::runtime::control_events::EventKind::VaultUnseal
1932                };
1933                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1934                    self.audit_vault_unseal(
1935                        collection,
1936                        key,
1937                        crate::runtime::audit_log::Outcome::Denied,
1938                        &reason,
1939                        entry.as_ref(),
1940                    );
1941                    self.emit_vault_control_event(
1942                        event_kind,
1943                        crate::runtime::control_events::Outcome::Denied,
1944                        action,
1945                        collection,
1946                        key,
1947                        &reason,
1948                        entry.as_ref(),
1949                        Vec::new(),
1950                    )?;
1951                    return Err(RedDBError::Query(reason));
1952                }
1953                let Some(entry) = entry else {
1954                    let reason = "not_found";
1955                    self.audit_vault_unseal(
1956                        collection,
1957                        key,
1958                        crate::runtime::audit_log::Outcome::Denied,
1959                        reason,
1960                        None,
1961                    );
1962                    self.emit_vault_control_event(
1963                        event_kind,
1964                        crate::runtime::control_events::Outcome::Denied,
1965                        action,
1966                        collection,
1967                        key,
1968                        reason,
1969                        None,
1970                        Vec::new(),
1971                    )?;
1972                    return Err(RedDBError::NotFound(format!(
1973                        "vault secret '{}.{}' not found",
1974                        collection, key
1975                    )));
1976                };
1977                if entry.tombstone {
1978                    let reason = "deleted";
1979                    self.audit_vault_unseal(
1980                        collection,
1981                        key,
1982                        crate::runtime::audit_log::Outcome::Denied,
1983                        reason,
1984                        Some(&entry),
1985                    );
1986                    self.emit_vault_control_event(
1987                        event_kind,
1988                        crate::runtime::control_events::Outcome::Denied,
1989                        action,
1990                        collection,
1991                        key,
1992                        reason,
1993                        Some(&entry),
1994                        Vec::new(),
1995                    )?;
1996                    return Err(RedDBError::NotFound(format!(
1997                        "vault secret '{}.{}' is deleted",
1998                        collection, key
1999                    )));
2000                }
2001                match self.unseal_vault_value(collection, &entry.value) {
2002                    Ok(value) => {
2003                        self.audit_vault_unseal(
2004                            collection,
2005                            key,
2006                            crate::runtime::audit_log::Outcome::Success,
2007                            "ok",
2008                            Some(&entry),
2009                        );
2010                        self.emit_vault_control_event(
2011                            event_kind,
2012                            crate::runtime::control_events::Outcome::Allowed,
2013                            action,
2014                            collection,
2015                            key,
2016                            "ok",
2017                            Some(&entry),
2018                            Vec::new(),
2019                        )?;
2020                        let mut result = UnifiedResult::with_columns(vec![
2021                            "collection".into(),
2022                            "key".into(),
2023                            "value".into(),
2024                        ]);
2025                        let mut record = UnifiedRecord::new();
2026                        record.set("collection", Value::text(collection.clone()));
2027                        record.set("key", Value::text(key.clone()));
2028                        record.set("value", value);
2029                        result.push(record);
2030                        Ok(RuntimeQueryResult {
2031                            query: raw_query.to_string(),
2032                            mode: crate::storage::query::modes::QueryMode::Sql,
2033                            statement: "vault_unseal",
2034                            engine: "vault",
2035                            result,
2036                            affected_rows: 0,
2037                            statement_type: "select",
2038                            bookmark: None,
2039                        })
2040                    }
2041                    Err(err) => {
2042                        let reason = err.to_string();
2043                        self.audit_vault_unseal(
2044                            collection,
2045                            key,
2046                            crate::runtime::audit_log::Outcome::Error,
2047                            &reason,
2048                            Some(&entry),
2049                        );
2050                        self.emit_vault_control_event(
2051                            event_kind,
2052                            crate::runtime::control_events::Outcome::Error,
2053                            action,
2054                            collection,
2055                            key,
2056                            &reason,
2057                            Some(&entry),
2058                            Vec::new(),
2059                        )?;
2060                        Err(err)
2061                    }
2062                }
2063            }
2064
2065            KvCommand::Incr {
2066                model,
2067                collection,
2068                key,
2069                by,
2070                ttl_ms,
2071            } => {
2072                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2073                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
2074
2075                let mut result = UnifiedResult::with_columns(vec![
2076                    "ok".into(),
2077                    "collection".into(),
2078                    "key".into(),
2079                    "value".into(),
2080                ]);
2081                let mut record = UnifiedRecord::new();
2082                record.set("ok", Value::Boolean(true));
2083                record.set("collection", Value::text(collection.clone()));
2084                record.set("key", Value::text(key.clone()));
2085                record.set("value", Value::Integer(new_value));
2086                result.push(record);
2087
2088                Ok(RuntimeQueryResult {
2089                    query: raw_query.to_string(),
2090                    mode: crate::storage::query::modes::QueryMode::Sql,
2091                    statement: "kv_incr",
2092                    engine: "kv",
2093                    result,
2094                    affected_rows: 1,
2095                    statement_type: "update",
2096                    bookmark: None,
2097                })
2098            }
2099
2100            KvCommand::Cas {
2101                model,
2102                collection,
2103                key,
2104                expected,
2105                new_value,
2106                ttl_ms,
2107            } => {
2108                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2109                let (ok, current) = ops.cas(
2110                    *model,
2111                    collection,
2112                    key,
2113                    expected.as_ref(),
2114                    new_value.clone(),
2115                    *ttl_ms,
2116                )?;
2117
2118                let mut result = UnifiedResult::with_columns(vec![
2119                    "ok".into(),
2120                    "collection".into(),
2121                    "key".into(),
2122                    "current".into(),
2123                ]);
2124                let mut record = UnifiedRecord::new();
2125                record.set("ok", Value::Boolean(ok));
2126                record.set("collection", Value::text(collection.clone()));
2127                record.set("key", Value::text(key.clone()));
2128                record.set(
2129                    "current",
2130                    current.unwrap_or(crate::storage::schema::Value::Null),
2131                );
2132                result.push(record);
2133
2134                Ok(RuntimeQueryResult {
2135                    query: raw_query.to_string(),
2136                    mode: crate::storage::query::modes::QueryMode::Sql,
2137                    statement: "kv_cas",
2138                    engine: "kv",
2139                    result,
2140                    affected_rows: if ok { 1 } else { 0 },
2141                    statement_type: "update",
2142                    bookmark: None,
2143                })
2144            }
2145
2146            KvCommand::Delete {
2147                model,
2148                collection,
2149                key,
2150            } => {
2151                if *model == crate::catalog::CollectionModel::Vault {
2152                    self.check_system_vault_capability("vault:write", collection, key)
2153                        .map_err(RedDBError::Query)?;
2154                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2155                    let entry = ops.append_vault_version(
2156                        collection,
2157                        key,
2158                        Value::Null,
2159                        "delete",
2160                        true,
2161                        &[],
2162                    )?;
2163                    self.record_kv_watch_event(
2164                        crate::replication::cdc::ChangeOperation::Delete,
2165                        collection,
2166                        key,
2167                        entry.id.raw(),
2168                        None,
2169                        Some(vault_entry_metadata_json(&entry)),
2170                    );
2171                    self.audit_vault_lifecycle(
2172                        "delete",
2173                        collection,
2174                        key,
2175                        crate::runtime::audit_log::Outcome::Success,
2176                        "ok",
2177                        Some(&entry),
2178                    );
2179                    return Ok(vault_write_result(
2180                        raw_query,
2181                        "vault_delete",
2182                        "delete",
2183                        collection,
2184                        key,
2185                        &entry,
2186                        1,
2187                    ));
2188                }
2189                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2190                let deleted = ops.delete(*model, collection, key)?;
2191
2192                let mut result = UnifiedResult::with_columns(vec![
2193                    "ok".into(),
2194                    "collection".into(),
2195                    "key".into(),
2196                    "deleted".into(),
2197                ]);
2198                let mut record = UnifiedRecord::new();
2199                record.set("ok", Value::Boolean(true));
2200                record.set("collection", Value::text(collection.clone()));
2201                record.set("key", Value::text(key.clone()));
2202                record.set("deleted", Value::Boolean(deleted));
2203                result.push(record);
2204
2205                Ok(RuntimeQueryResult {
2206                    query: raw_query.to_string(),
2207                    mode: crate::storage::query::modes::QueryMode::Sql,
2208                    statement: "kv_delete",
2209                    engine: "kv",
2210                    result,
2211                    affected_rows: if deleted { 1 } else { 0 },
2212                    statement_type: "delete",
2213                    bookmark: None,
2214                })
2215            }
2216        }
2217    }
2218
2219    pub fn vault_watch_events_since(
2220        &self,
2221        collection: &str,
2222        key: &str,
2223        since_lsn: u64,
2224        max_count: usize,
2225    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2226        self.kv_watch_events_since(collection, key, since_lsn, max_count)
2227            .into_iter()
2228            .filter(|event| {
2229                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2230                    .is_ok()
2231            })
2232            .map(vault_filter_watch_event)
2233            .collect()
2234    }
2235
2236    pub fn vault_watch_events_since_prefix(
2237        &self,
2238        collection: &str,
2239        prefix: &str,
2240        since_lsn: u64,
2241        max_count: usize,
2242    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2243        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2244            .into_iter()
2245            .filter(|event| {
2246                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2247                    .is_ok()
2248            })
2249            .map(vault_filter_watch_event)
2250            .collect()
2251    }
2252
2253    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2254        let auth_store = match self.inner.auth_store.read().clone() {
2255            Some(store) => store,
2256            None => return Ok(()),
2257        };
2258        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2259            Some(identity) => identity,
2260            None => return Ok(()),
2261        };
2262        if role < crate::auth::Role::Write {
2263            return Err(RedDBError::Query(format!(
2264                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2265            )));
2266        }
2267        if !auth_store.iam_authorization_enabled() {
2268            return Ok(());
2269        }
2270
2271        let tenant = crate::runtime::impl_core::current_tenant();
2272        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2273        let mut resource =
2274            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2275        if let Some(tenant) = tenant.clone() {
2276            resource = resource.with_tenant(tenant);
2277        }
2278        let ctx = crate::auth::policies::EvalContext {
2279            principal_tenant: tenant.clone(),
2280            current_tenant: tenant,
2281            peer_ip: None,
2282            mfa_present: false,
2283            now_ms: current_unix_ms(),
2284            principal_is_admin_role: role == crate::auth::Role::Admin,
2285            principal_is_platform_scoped: principal.tenant.is_none(),
2286        };
2287        if auth_store.check_policy_authz_with_role(
2288            &principal,
2289            "kv:invalidate",
2290            &resource,
2291            &ctx,
2292            role,
2293        ) {
2294            Ok(())
2295        } else {
2296            Err(RedDBError::Query(format!(
2297                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2298            )))
2299        }
2300    }
2301}
2302
2303fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2304    let ttl_ms = ttl_ms?;
2305    Some(Metadata::with_fields(
2306        [(
2307            "_ttl_ms".to_string(),
2308            if ttl_ms <= i64::MAX as u64 {
2309                MetadataValue::Int(ttl_ms as i64)
2310            } else {
2311                MetadataValue::Timestamp(ttl_ms)
2312            },
2313        )]
2314        .into_iter()
2315        .collect(),
2316    ))
2317}
2318
2319fn vault_write_result(
2320    raw_query: &str,
2321    statement: &'static str,
2322    statement_type: &'static str,
2323    collection: &str,
2324    key: &str,
2325    entry: &VaultEntry,
2326    affected_rows: u64,
2327) -> RuntimeQueryResult {
2328    let mut result = UnifiedResult::with_columns(vec![
2329        "ok".into(),
2330        "collection".into(),
2331        "key".into(),
2332        "version".into(),
2333        "fingerprint".into(),
2334        "tombstone".into(),
2335        "op".into(),
2336        "id".into(),
2337    ]);
2338    let mut record = UnifiedRecord::new();
2339    record.set("ok", Value::Boolean(true));
2340    record.set("collection", Value::text(collection.to_string()));
2341    record.set("key", Value::text(key.to_string()));
2342    record.set("version", Value::Integer(entry.version));
2343    if entry.tombstone {
2344        record.set("fingerprint", Value::Null);
2345    } else {
2346        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2347    }
2348    record.set("tombstone", Value::Boolean(entry.tombstone));
2349    record.set("op", Value::text(entry.op.clone()));
2350    record.set("id", Value::Integer(entry.id.raw() as i64));
2351    result.push(record);
2352    RuntimeQueryResult {
2353        query: raw_query.to_string(),
2354        mode: crate::storage::query::modes::QueryMode::Sql,
2355        statement,
2356        engine: "vault",
2357        result,
2358        affected_rows,
2359        statement_type,
2360        bookmark: None,
2361    }
2362}
2363
2364fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2365    let mut result = UnifiedResult::with_columns(vec![
2366        "collection".into(),
2367        "key".into(),
2368        "version".into(),
2369        "fingerprint".into(),
2370        "tags".into(),
2371        "created_at".into(),
2372        "updated_at".into(),
2373        "status".into(),
2374        "tombstone".into(),
2375        "op".into(),
2376    ]);
2377    for entry in versions {
2378        push_vault_metadata_record(&mut result, collection, key, entry);
2379    }
2380    result
2381}
2382
2383fn push_vault_metadata_record(
2384    result: &mut UnifiedResult,
2385    collection: &str,
2386    key: &str,
2387    entry: &VaultEntry,
2388) {
2389    let mut record = UnifiedRecord::new();
2390    record.set("collection", Value::text(collection.to_string()));
2391    record.set("key", Value::text(key.to_string()));
2392    record.set("version", Value::Integer(entry.version));
2393    if entry.tombstone {
2394        record.set("fingerprint", Value::Null);
2395        record.set("status", Value::text("deleted"));
2396    } else {
2397        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2398        record.set("status", Value::text("sealed"));
2399    }
2400    record.set("tags", vault_tags_value(&entry.metadata));
2401    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2402    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2403    record.set("tombstone", Value::Boolean(entry.tombstone));
2404    record.set("op", Value::text(entry.op.clone()));
2405    result.push(record);
2406}
2407
2408fn vault_metadata_result(
2409    collection: &str,
2410    key: &str,
2411    entry: Option<&VaultEntry>,
2412    key_available: bool,
2413) -> UnifiedResult {
2414    let mut result = UnifiedResult::with_columns(vec![
2415        "collection".into(),
2416        "key".into(),
2417        "version".into(),
2418        "fingerprint".into(),
2419        "tags".into(),
2420        "created_at".into(),
2421        "updated_at".into(),
2422        "value".into(),
2423        "status".into(),
2424        "tombstone".into(),
2425        "op".into(),
2426    ]);
2427    let mut record = UnifiedRecord::new();
2428    record.set("collection", Value::text(collection.to_string()));
2429    record.set("key", Value::text(key.to_string()));
2430    match entry {
2431        Some(entry) => {
2432            record.set("version", Value::Integer(entry.version));
2433            if entry.tombstone {
2434                record.set("fingerprint", Value::Null);
2435            } else {
2436                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2437            }
2438            record.set("tags", vault_tags_value(&entry.metadata));
2439            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2440            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2441            record.set("value", Value::text("***"));
2442            record.set(
2443                "status",
2444                Value::text(if entry.tombstone {
2445                    "deleted"
2446                } else if key_available {
2447                    "sealed"
2448                } else {
2449                    "sealed_unavailable"
2450                }),
2451            );
2452            record.set("tombstone", Value::Boolean(entry.tombstone));
2453            record.set("op", Value::text(entry.op.clone()));
2454        }
2455        None => {
2456            record.set("version", Value::Null);
2457            record.set("fingerprint", Value::Null);
2458            record.set("tags", Value::Array(Vec::new()));
2459            record.set("created_at", Value::Null);
2460            record.set("updated_at", Value::Null);
2461            record.set("value", Value::text(""));
2462            record.set("status", Value::text("missing"));
2463            record.set("tombstone", Value::Boolean(false));
2464            record.set("op", Value::Null);
2465        }
2466    }
2467    result.push(record);
2468    result
2469}
2470
2471fn vault_fingerprint(value: &Value) -> String {
2472    match value {
2473        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2474        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2475    }
2476}
2477
2478fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2479    let mut object = crate::json::Map::new();
2480    object.insert(
2481        "key".to_string(),
2482        crate::json::Value::String(entry.key.clone()),
2483    );
2484    object.insert(
2485        "version".to_string(),
2486        crate::json::Value::Number(entry.version as f64),
2487    );
2488    object.insert(
2489        "fingerprint".to_string(),
2490        if entry.tombstone {
2491            crate::json::Value::Null
2492        } else {
2493            crate::json::Value::String(vault_fingerprint(&entry.value))
2494        },
2495    );
2496    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2497    object.insert(
2498        "actor".to_string(),
2499        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2500    );
2501    object.insert(
2502        "sequence_id".to_string(),
2503        crate::json::Value::Number(entry.sequence_id as f64),
2504    );
2505    object.insert(
2506        "tombstone".to_string(),
2507        crate::json::Value::Bool(entry.tombstone),
2508    );
2509    object.insert(
2510        "op".to_string(),
2511        crate::json::Value::String(entry.op.clone()),
2512    );
2513    crate::json::Value::Object(object)
2514}
2515
2516fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2517    match vault_tags_value(metadata) {
2518        Value::Array(values) => crate::json::Value::Array(
2519            values
2520                .into_iter()
2521                .filter_map(|value| match value {
2522                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2523                    _ => None,
2524                })
2525                .collect(),
2526        ),
2527        _ => crate::json::Value::Array(Vec::new()),
2528    }
2529}
2530
2531fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2532    [(
2533        "tags".to_string(),
2534        MetadataValue::Array(
2535            tags.iter()
2536                .map(|tag| MetadataValue::String(tag.clone()))
2537                .collect(),
2538        ),
2539    )]
2540    .into_iter()
2541    .collect()
2542}
2543
2544fn vault_filter_watch_event(
2545    mut event: crate::replication::cdc::KvWatchEvent,
2546) -> crate::replication::cdc::KvWatchEvent {
2547    event.before = event.before.and_then(vault_metadata_json_only);
2548    event.after = event.after.and_then(vault_metadata_json_only);
2549    event
2550}
2551
2552fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2553    let object = value.as_object()?;
2554    let mut out = crate::json::Map::new();
2555    for field in [
2556        "key",
2557        "version",
2558        "fingerprint",
2559        "tags",
2560        "actor",
2561        "sequence_id",
2562        "tombstone",
2563        "op",
2564    ] {
2565        if let Some(value) = object.get(field) {
2566            out.insert(field.to_string(), value.clone());
2567        }
2568    }
2569    Some(crate::json::Value::Object(out))
2570}
2571
2572fn vault_tags_value(metadata: &Metadata) -> Value {
2573    match metadata.get("tags") {
2574        Some(MetadataValue::Array(values)) => Value::Array(
2575            values
2576                .iter()
2577                .filter_map(|value| match value {
2578                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2579                    _ => None,
2580                })
2581                .collect(),
2582        ),
2583        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2584            Value::Array(vec![Value::text(tag.clone())])
2585        }
2586        _ => Value::Array(Vec::new()),
2587    }
2588}
2589
2590fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2591    let bytes = hex::decode(hex_key)
2592        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2593    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2594        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2595    })?;
2596    Ok(key)
2597}
2598
2599fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2600    if tags.is_empty() {
2601        return None;
2602    }
2603    let values = tags
2604        .iter()
2605        .map(|tag| MetadataValue::String(tag.clone()))
2606        .collect();
2607    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2608}
2609
2610fn kv_tags_value(tags: &[String]) -> Value {
2611    let json = crate::json::Value::Array(
2612        tags.iter()
2613            .map(|tag| crate::json::Value::String(tag.clone()))
2614            .collect(),
2615    );
2616    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2617}
2618
2619fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2620    if let crate::storage::EntityData::Row(ref row) = entity.data {
2621        if let Some(ref named) = row.named {
2622            return named.get("value").cloned();
2623        }
2624    }
2625    None
2626}
2627
2628fn insert_kv_json_path(root: &mut crate::json::Value, path: &str, value: crate::json::Value) {
2629    let segments: Vec<&str> = path
2630        .split('.')
2631        .filter(|segment| !segment.is_empty())
2632        .collect();
2633    insert_kv_json_segments(root, &segments, value);
2634}
2635
2636fn insert_kv_json_segments(
2637    root: &mut crate::json::Value,
2638    segments: &[&str],
2639    value: crate::json::Value,
2640) {
2641    if segments.is_empty() {
2642        *root = value;
2643        return;
2644    }
2645
2646    if !matches!(root, crate::json::Value::Object(_)) {
2647        *root = crate::json::Value::Object(crate::json::Map::new());
2648    }
2649
2650    let crate::json::Value::Object(map) = root else {
2651        return;
2652    };
2653    if segments.len() == 1 {
2654        map.insert(segments[0].to_string(), value);
2655        return;
2656    }
2657    let entry = map
2658        .entry(segments[0].to_string())
2659        .or_insert_with(|| crate::json::Value::Object(crate::json::Map::new()));
2660    insert_kv_json_segments(entry, &segments[1..], value);
2661}
2662
2663fn kv_list_json_result(
2664    raw_query: &str,
2665    collection: &str,
2666    prefix: &Option<String>,
2667    value: crate::json::Value,
2668) -> RuntimeQueryResult {
2669    let mut result =
2670        UnifiedResult::with_columns(vec!["collection".into(), "prefix".into(), "value".into()]);
2671    let mut record = UnifiedRecord::new();
2672    record.set("collection", Value::text(collection.to_string()));
2673    record.set(
2674        "prefix",
2675        prefix
2676            .as_ref()
2677            .map(|prefix| Value::text(prefix.clone()))
2678            .unwrap_or(Value::Null),
2679    );
2680    record.set("value", Value::Json(value.to_string_compact().into_bytes()));
2681    result.push(record);
2682    RuntimeQueryResult {
2683        query: raw_query.to_string(),
2684        mode: crate::storage::query::modes::QueryMode::Sql,
2685        statement: "kv_list_json",
2686        engine: "kv",
2687        result,
2688        affected_rows: 0,
2689        statement_type: "select",
2690        bookmark: None,
2691    }
2692}
2693
2694fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2695    let now = current_unix_ms();
2696    crate::physical::CollectionContract {
2697        name: name.to_string(),
2698        declared_model: crate::catalog::CollectionModel::Kv,
2699        schema_mode: crate::catalog::SchemaMode::Dynamic,
2700        origin: crate::physical::ContractOrigin::Implicit,
2701        version: 1,
2702        created_at_unix_ms: now,
2703        updated_at_unix_ms: now,
2704        default_ttl_ms: None,
2705        vector_dimension: None,
2706        vector_metric: None,
2707        context_index_fields: Vec::new(),
2708        declared_columns: Vec::new(),
2709        table_def: None,
2710        timestamps_enabled: false,
2711        context_index_enabled: false,
2712        metrics_raw_retention_ms: None,
2713        metrics_rollup_policies: Vec::new(),
2714        metrics_tenant_identity: None,
2715        metrics_namespace: None,
2716        append_only: false,
2717        subscriptions: Vec::new(),
2718        analytics_config: Vec::new(),
2719        session_key: None,
2720        session_gap_ms: None,
2721        retention_duration_ms: None,
2722        analytical_storage: None,
2723
2724        ai_policy: None,
2725    }
2726}
2727
2728fn current_unix_ms() -> u128 {
2729    std::time::SystemTime::now()
2730        .duration_since(std::time::UNIX_EPOCH)
2731        .unwrap_or_default()
2732        .as_millis()
2733}
2734
2735#[cfg(test)]
2736mod tests {
2737    use crate::api::RedDBOptions;
2738    use crate::catalog::CollectionModel;
2739    use crate::runtime::RedDBRuntime;
2740
2741    fn rt() -> RedDBRuntime {
2742        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2743    }
2744
2745    #[test]
2746    fn incr_missing_key_initialises_at_by() {
2747        let r = rt();
2748        let ops = super::KvAtomicOps::new(&r);
2749        let v = ops
2750            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2751            .unwrap();
2752        assert_eq!(v, 5);
2753    }
2754
2755    #[test]
2756    fn kv_runtime_stats_count_public_ops() {
2757        let r = rt();
2758        let ops = super::KvAtomicOps::new(&r);
2759
2760        ops.set(
2761            CollectionModel::Kv,
2762            "kv_default",
2763            "profile",
2764            crate::storage::schema::Value::text("alice"),
2765            None,
2766            false,
2767        )
2768        .unwrap();
2769        ops.get(CollectionModel::Kv, "kv_default", "profile")
2770            .unwrap();
2771        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2772            .unwrap();
2773        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2774            .unwrap();
2775        ops.cas(
2776            CollectionModel::Kv,
2777            "kv_default",
2778            "profile",
2779            None,
2780            crate::storage::schema::Value::text("created"),
2781            None,
2782        )
2783        .unwrap();
2784        ops.cas(
2785            CollectionModel::Kv,
2786            "kv_default",
2787            "profile",
2788            Some(&crate::storage::schema::Value::text("different")),
2789            crate::storage::schema::Value::text("ignored"),
2790            None,
2791        )
2792        .unwrap();
2793
2794        let stats = r.stats().kv;
2795        assert_eq!(stats.puts, 1);
2796        assert_eq!(stats.gets, 1);
2797        assert_eq!(stats.deletes, 1);
2798        assert_eq!(stats.incrs, 1);
2799        assert_eq!(stats.cas_success, 1);
2800        assert_eq!(stats.cas_conflict, 1);
2801    }
2802
2803    #[test]
2804    fn kv_invalidate_tags_removes_matching_entries_only() {
2805        let r = rt();
2806
2807        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2808            .unwrap();
2809
2810        let miss = r
2811            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2812            .unwrap();
2813        assert_eq!(miss.affected_rows, 0);
2814        assert!(matches!(
2815            r.execute_query("KV GET sessions.blob")
2816                .unwrap()
2817                .result
2818                .records[0]
2819                .get("value"),
2820            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2821        ));
2822
2823        let hit = r
2824            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2825            .unwrap();
2826        assert_eq!(hit.affected_rows, 1);
2827        assert!(matches!(
2828            r.execute_query("KV GET sessions.blob")
2829                .unwrap()
2830                .result
2831                .records[0]
2832                .get("value"),
2833            Some(crate::storage::schema::Value::Null)
2834        ));
2835    }
2836
2837    #[test]
2838    fn kv_runtime_stats_count_watch_streams_and_events() {
2839        let r = rt();
2840        let ops = super::KvAtomicOps::new(&r);
2841        assert_eq!(r.stats().kv.watch_streams_active, 0);
2842
2843        {
2844            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2845            assert_eq!(r.stats().kv.watch_streams_active, 1);
2846
2847            ops.set(
2848                CollectionModel::Kv,
2849                "kv_default",
2850                "watched",
2851                crate::storage::schema::Value::Integer(1),
2852                None,
2853                false,
2854            )
2855            .unwrap();
2856            let event = stream.poll_next().expect("watch event");
2857            assert_eq!(event.key, "watched");
2858            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2859
2860            stream.record_drop_count(3);
2861            assert_eq!(r.stats().kv.watch_drops, 3);
2862        }
2863
2864        assert_eq!(r.stats().kv.watch_streams_active, 0);
2865    }
2866
2867    #[test]
2868    fn incr_existing_integer_accumulates() {
2869        let r = rt();
2870        let ops = super::KvAtomicOps::new(&r);
2871        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2872            .unwrap();
2873        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2874            .unwrap();
2875        let v = ops
2876            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2877            .unwrap();
2878        assert_eq!(v, 3);
2879    }
2880
2881    #[test]
2882    fn decr_via_negative_by() {
2883        let r = rt();
2884        let ops = super::KvAtomicOps::new(&r);
2885        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2886            .unwrap();
2887        let v = ops
2888            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2889            .unwrap();
2890        assert_eq!(v, 7);
2891    }
2892
2893    #[test]
2894    fn concurrent_incr_single_key_is_atomic() {
2895        const THREADS: usize = 8;
2896        const ITERS: usize = 1000;
2897
2898        let runtime = std::sync::Arc::new(rt());
2899        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2900        let mut handles = Vec::new();
2901
2902        for _ in 0..THREADS {
2903            let runtime = std::sync::Arc::clone(&runtime);
2904            let barrier = std::sync::Arc::clone(&barrier);
2905            handles.push(std::thread::spawn(move || {
2906                let ops = super::KvAtomicOps::new(&runtime);
2907                barrier.wait();
2908                for _ in 0..ITERS {
2909                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2910                        .unwrap();
2911                }
2912            }));
2913        }
2914
2915        for handle in handles {
2916            handle.join().expect("worker should finish");
2917        }
2918
2919        let ops = super::KvAtomicOps::new(&runtime);
2920        assert_eq!(
2921            ops.get(CollectionModel::Kv, "kv_default", "counter")
2922                .unwrap(),
2923            Some(crate::storage::schema::Value::Integer(
2924                (THREADS * ITERS) as i64
2925            ))
2926        );
2927    }
2928
2929    #[test]
2930    fn incr_on_string_value_returns_error() {
2931        let r = rt();
2932        let ops = super::KvAtomicOps::new(&r);
2933        ops.set(
2934            CollectionModel::Kv,
2935            "kv_default",
2936            "name",
2937            crate::storage::schema::Value::text("alice"),
2938            None,
2939            false,
2940        )
2941        .unwrap();
2942        let err = ops
2943            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2944            .unwrap_err();
2945        assert!(err.to_string().contains("non-integer"));
2946    }
2947
2948    // --- CAS tests ---
2949
2950    #[test]
2951    fn cas_matching_value_succeeds() {
2952        let r = rt();
2953        let ops = super::KvAtomicOps::new(&r);
2954        ops.set(
2955            CollectionModel::Kv,
2956            "kv_default",
2957            "lock",
2958            crate::storage::schema::Value::text("free"),
2959            None,
2960            false,
2961        )
2962        .unwrap();
2963        let (ok, prev) = ops
2964            .cas(
2965                CollectionModel::Kv,
2966                "kv_default",
2967                "lock",
2968                Some(&crate::storage::schema::Value::text("free")),
2969                crate::storage::schema::Value::text("held"),
2970                None,
2971            )
2972            .unwrap();
2973        assert!(ok);
2974        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2975        // Value actually changed.
2976        assert_eq!(
2977            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2978            Some(crate::storage::schema::Value::text("held"))
2979        );
2980    }
2981
2982    #[test]
2983    fn concurrent_cas_allows_one_success_per_round() {
2984        const THREADS: usize = 8;
2985        const ROUNDS: usize = 100;
2986
2987        let runtime = std::sync::Arc::new(rt());
2988        let ops = super::KvAtomicOps::new(&runtime);
2989        ops.set(
2990            CollectionModel::Kv,
2991            "kv_default",
2992            "cas_counter",
2993            crate::storage::schema::Value::Integer(0),
2994            None,
2995            false,
2996        )
2997        .unwrap();
2998
2999        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3000        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3001        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
3002        let mut handles = Vec::new();
3003
3004        for _ in 0..THREADS {
3005            let runtime = std::sync::Arc::clone(&runtime);
3006            let start_round = std::sync::Arc::clone(&start_round);
3007            let finish_round = std::sync::Arc::clone(&finish_round);
3008            let successes = std::sync::Arc::clone(&successes);
3009            handles.push(std::thread::spawn(move || {
3010                let ops = super::KvAtomicOps::new(&runtime);
3011                for round in 0..ROUNDS {
3012                    start_round.wait();
3013                    let (ok, _) = ops
3014                        .cas(
3015                            CollectionModel::Kv,
3016                            "kv_default",
3017                            "cas_counter",
3018                            Some(&crate::storage::schema::Value::Integer(round as i64)),
3019                            crate::storage::schema::Value::Integer((round + 1) as i64),
3020                            None,
3021                        )
3022                        .unwrap();
3023                    if ok {
3024                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3025                    }
3026                    finish_round.wait();
3027                }
3028            }));
3029        }
3030
3031        for handle in handles {
3032            handle.join().expect("worker should finish");
3033        }
3034
3035        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
3036        assert_eq!(
3037            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
3038                .unwrap(),
3039            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
3040        );
3041    }
3042
3043    #[test]
3044    fn cas_mismatching_value_fails() {
3045        let r = rt();
3046        let ops = super::KvAtomicOps::new(&r);
3047        ops.set(
3048            CollectionModel::Kv,
3049            "kv_default",
3050            "lock",
3051            crate::storage::schema::Value::text("free"),
3052            None,
3053            false,
3054        )
3055        .unwrap();
3056        let (ok, current) = ops
3057            .cas(
3058                CollectionModel::Kv,
3059                "kv_default",
3060                "lock",
3061                Some(&crate::storage::schema::Value::text("held")),
3062                crate::storage::schema::Value::text("worker-7"),
3063                None,
3064            )
3065            .unwrap();
3066        assert!(!ok);
3067        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
3068        // Value unchanged.
3069        assert_eq!(
3070            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3071            Some(crate::storage::schema::Value::text("free"))
3072        );
3073    }
3074
3075    #[test]
3076    fn cas_expect_null_on_missing_key_creates() {
3077        let r = rt();
3078        let ops = super::KvAtomicOps::new(&r);
3079        let (ok, prev) = ops
3080            .cas(
3081                CollectionModel::Kv,
3082                "kv_default",
3083                "new_key",
3084                None,
3085                crate::storage::schema::Value::text("created"),
3086                None,
3087            )
3088            .unwrap();
3089        assert!(ok);
3090        assert_eq!(prev, None);
3091        assert_eq!(
3092            ops.get(CollectionModel::Kv, "kv_default", "new_key")
3093                .unwrap(),
3094            Some(crate::storage::schema::Value::text("created"))
3095        );
3096    }
3097
3098    #[test]
3099    fn cas_expect_null_on_existing_key_fails() {
3100        let r = rt();
3101        let ops = super::KvAtomicOps::new(&r);
3102        ops.set(
3103            CollectionModel::Kv,
3104            "kv_default",
3105            "taken",
3106            crate::storage::schema::Value::text("worker-1"),
3107            None,
3108            false,
3109        )
3110        .unwrap();
3111        let (ok, current) = ops
3112            .cas(
3113                CollectionModel::Kv,
3114                "kv_default",
3115                "taken",
3116                None,
3117                crate::storage::schema::Value::text("worker-2"),
3118                None,
3119            )
3120            .unwrap();
3121        assert!(!ok);
3122        assert_eq!(
3123            current,
3124            Some(crate::storage::schema::Value::text("worker-1"))
3125        );
3126    }
3127
3128    #[test]
3129    fn cas_via_sql_roundtrip() {
3130        let r = rt();
3131        // Seed value.
3132        r.execute_query("KV PUT lock = 'free'").unwrap();
3133        // CAS: free → held — should succeed.
3134        let res = r
3135            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3136            .unwrap();
3137        let row = &res.result.records[0];
3138        assert_eq!(
3139            row.get("ok"),
3140            Some(&crate::storage::schema::Value::Boolean(true))
3141        );
3142        // CAS: free → held again — should fail (value is now 'held').
3143        let res2 = r
3144            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3145            .unwrap();
3146        let row2 = &res2.result.records[0];
3147        assert_eq!(
3148            row2.get("ok"),
3149            Some(&crate::storage::schema::Value::Boolean(false))
3150        );
3151    }
3152
3153    #[test]
3154    fn cas_expect_null_via_sql() {
3155        let r = rt();
3156        let res = r
3157            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
3158            .unwrap();
3159        let row = &res.result.records[0];
3160        assert_eq!(
3161            row.get("ok"),
3162            Some(&crate::storage::schema::Value::Boolean(true))
3163        );
3164        // Second call must fail.
3165        let res2 = r
3166            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
3167            .unwrap();
3168        let row2 = &res2.result.records[0];
3169        assert_eq!(
3170            row2.get("ok"),
3171            Some(&crate::storage::schema::Value::Boolean(false))
3172        );
3173    }
3174
3175    #[test]
3176    fn incr_via_sql_roundtrip() {
3177        let r = rt();
3178        let res = r.execute_query("KV INCR hits").unwrap();
3179        let row = &res.result.records[0];
3180        assert_eq!(
3181            row.get("value"),
3182            Some(&crate::storage::schema::Value::Integer(1))
3183        );
3184        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
3185        let row2 = &res2.result.records[0];
3186        assert_eq!(
3187            row2.get("value"),
3188            Some(&crate::storage::schema::Value::Integer(5))
3189        );
3190    }
3191
3192    #[test]
3193    fn concurrent_self_referential_update_is_atomic() {
3194        const THREADS: usize = 8;
3195        const ITERS: usize = 100;
3196
3197        let runtime = std::sync::Arc::new(rt());
3198        runtime
3199            .execute_query("CREATE TABLE counters (id INT, n INT)")
3200            .unwrap();
3201        runtime
3202            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3203            .unwrap();
3204
3205        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3206        let mut handles = Vec::new();
3207        for _ in 0..THREADS {
3208            let runtime = std::sync::Arc::clone(&runtime);
3209            let barrier = std::sync::Arc::clone(&barrier);
3210            handles.push(std::thread::spawn(move || {
3211                barrier.wait();
3212                for _ in 0..ITERS {
3213                    runtime
3214                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3215                        .unwrap();
3216                }
3217            }));
3218        }
3219
3220        for handle in handles {
3221            handle.join().expect("worker should finish");
3222        }
3223
3224        let selected = runtime
3225            .execute_query("SELECT n FROM counters WHERE id = 1")
3226            .unwrap();
3227        assert_eq!(
3228            selected.result.records[0].get("n"),
3229            Some(&crate::storage::schema::Value::Integer(
3230                (THREADS * ITERS) as i64
3231            ))
3232        );
3233    }
3234
3235    #[test]
3236    fn watch_stream_delivers_key_events_in_lsn_order() {
3237        let r = rt();
3238        let ops = super::KvAtomicOps::new(&r);
3239        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3240
3241        ops.set(
3242            CollectionModel::Kv,
3243            "kv_default",
3244            "seq",
3245            crate::storage::schema::Value::Integer(1),
3246            None,
3247            false,
3248        )
3249        .unwrap();
3250        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3251            .unwrap();
3252        ops.delete(CollectionModel::Kv, "kv_default", "seq")
3253            .unwrap();
3254        ops.set(
3255            CollectionModel::Kv,
3256            "kv_default",
3257            "seq",
3258            crate::storage::schema::Value::Integer(9),
3259            None,
3260            false,
3261        )
3262        .unwrap();
3263
3264        let mut events = Vec::new();
3265        while let Some(event) = stream.poll_next() {
3266            events.push(event);
3267            if events.len() == 4 {
3268                break;
3269            }
3270        }
3271
3272        assert_eq!(events.len(), 4);
3273        assert_eq!(
3274            events[0].op,
3275            crate::replication::cdc::ChangeOperation::Insert
3276        );
3277        assert_eq!(
3278            events[1].op,
3279            crate::replication::cdc::ChangeOperation::Update
3280        );
3281        assert_eq!(
3282            events[2].op,
3283            crate::replication::cdc::ChangeOperation::Delete
3284        );
3285        assert_eq!(
3286            events[3].op,
3287            crate::replication::cdc::ChangeOperation::Insert
3288        );
3289        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3290    }
3291
3292    #[test]
3293    fn watch_prefix_stream_delivers_matching_events_only() {
3294        let r = rt();
3295        let ops = super::KvAtomicOps::new(&r);
3296        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3297
3298        ops.set(
3299            CollectionModel::Kv,
3300            "kv_default",
3301            "acct:1",
3302            crate::storage::schema::Value::Integer(1),
3303            None,
3304            false,
3305        )
3306        .unwrap();
3307        ops.set(
3308            CollectionModel::Kv,
3309            "kv_default",
3310            "session:1",
3311            crate::storage::schema::Value::Integer(2),
3312            None,
3313            false,
3314        )
3315        .unwrap();
3316        ops.set(
3317            CollectionModel::Kv,
3318            "kv_default",
3319            "acct:2",
3320            crate::storage::schema::Value::Integer(3),
3321            None,
3322            false,
3323        )
3324        .unwrap();
3325
3326        let first = stream.poll_next().expect("first prefix event");
3327        let second = stream.poll_next().expect("second prefix event");
3328        assert_eq!(first.key, "acct:1");
3329        assert_eq!(second.key, "acct:2");
3330        assert!(stream.poll_next().is_none());
3331    }
3332
3333    #[test]
3334    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3335        let r = rt();
3336        let ops = super::KvAtomicOps::new(&r);
3337        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3338
3339        let mut last_seen_lsn = 0;
3340        for value in 0..5 {
3341            ops.set(
3342                CollectionModel::Kv,
3343                "kv_default",
3344                "resume",
3345                crate::storage::schema::Value::Integer(value),
3346                None,
3347                false,
3348            )
3349            .unwrap();
3350            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3351        }
3352        drop(stream);
3353
3354        for value in 5..55 {
3355            ops.set(
3356                CollectionModel::Kv,
3357                "kv_default",
3358                "resume",
3359                crate::storage::schema::Value::Integer(value),
3360                None,
3361                false,
3362            )
3363            .unwrap();
3364        }
3365
3366        let expected_lsns: Vec<u64> = r
3367            .kv_watch_events_since("kv_default", "resume", last_seen_lsn, 200)
3368            .into_iter()
3369            .map(|event| event.lsn)
3370            .collect();
3371        assert!(!expected_lsns.is_empty());
3372
3373        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3374        let mut lsns = Vec::new();
3375        while let Some(event) = resumed.poll_next() {
3376            lsns.push(event.lsn);
3377            if lsns.len() == expected_lsns.len() {
3378                break;
3379            }
3380        }
3381
3382        assert_eq!(lsns, expected_lsns);
3383        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3384        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3385        assert!(resumed.poll_next().is_none());
3386    }
3387
3388    #[test]
3389    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3390        let r = rt();
3391        let ops = super::KvAtomicOps::new(&r);
3392        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3393
3394        for value in 0..1_200 {
3395            ops.set(
3396                CollectionModel::Kv,
3397                "kv_default",
3398                "slow",
3399                crate::storage::schema::Value::Integer(value),
3400                None,
3401                false,
3402            )
3403            .unwrap();
3404        }
3405
3406        let event = stream.poll_next().expect("tail event after drops");
3407        assert!(event.lsn > 1);
3408        assert!(event.dropped_event_count > 0);
3409        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3410        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3411    }
3412
3413    #[test]
3414    fn watch_stream_idle_timeout_closes_subscription() {
3415        let r = rt();
3416        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3417            .unwrap();
3418
3419        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3420        assert_eq!(r.stats().kv.watch_streams_active, 1);
3421        std::thread::sleep(std::time::Duration::from_millis(5));
3422
3423        assert!(stream.poll_next().is_none());
3424        assert_eq!(r.stats().kv.watch_streams_active, 0);
3425    }
3426
3427    #[test]
3428    fn watch_stream_does_not_emit_rolled_back_put() {
3429        let r = rt();
3430        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3431
3432        r.execute_query("BEGIN").unwrap();
3433        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3434        r.execute_query("ROLLBACK").unwrap();
3435
3436        assert!(stream.poll_next().is_none());
3437    }
3438}