Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn latest_kv_entries(
604        &self,
605        collection: &str,
606        prefix: Option<&str>,
607    ) -> RedDBResult<Vec<(String, crate::storage::UnifiedEntity)>> {
608        self.ensure_declared_model(crate::catalog::CollectionModel::Kv, collection)?;
609        let store = self.runtime.inner.db.store();
610        let Some(manager) = store.get_collection(collection) else {
611            return Ok(Vec::new());
612        };
613        let mut entries: std::collections::BTreeMap<String, crate::storage::UnifiedEntity> =
614            std::collections::BTreeMap::new();
615        for entity in manager.query_all(|_| true) {
616            let key = match &entity.data {
617                crate::storage::EntityData::Row(row) => row
618                    .named
619                    .as_ref()
620                    .and_then(|named| named.get("key"))
621                    .and_then(|value| match value {
622                        crate::storage::schema::Value::Text(key) => Some(key.to_string()),
623                        _ => None,
624                    }),
625                _ => None,
626            };
627            let Some(key) = key else {
628                continue;
629            };
630            if prefix.is_some_and(|prefix| !key.starts_with(prefix)) {
631                continue;
632            }
633            let should_replace = match entries.get(&key) {
634                Some(existing) => entity.id.raw() >= existing.id.raw(),
635                None => true,
636            };
637            if should_replace {
638                entries.insert(key, entity);
639            }
640        }
641        Ok(entries.into_iter().collect())
642    }
643
644    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
645        self.vault_versions(collection, key)
646            .map(super::keyed_spine::latest_version)
647    }
648
649    fn get_vault_entry_version(
650        &self,
651        collection: &str,
652        key: &str,
653        version: i64,
654    ) -> RedDBResult<Option<VaultEntry>> {
655        Ok(self
656            .vault_versions(collection, key)?
657            .into_iter()
658            .find(|entry| entry.version == version))
659    }
660
661    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
662        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
663        let store = self.runtime.inner.db.store();
664        let Some(manager) = store.get_collection(collection) else {
665            return Ok(Vec::new());
666        };
667        let entities = manager.query_all(|_| true);
668        let mut versions = Vec::new();
669        for entity in entities {
670            let crate::storage::EntityData::Row(ref row) = entity.data else {
671                continue;
672            };
673            let Some(version) =
674                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
675            else {
676                continue;
677            };
678            if version.key != key {
679                continue;
680            }
681            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
682            versions.push(VaultEntry::from_keyed_row(
683                version,
684                metadata,
685                entity.created_at,
686                entity.updated_at,
687                entity.sequence_id,
688            ));
689        }
690        Ok(versions)
691    }
692
693    fn latest_vault_entries(
694        &self,
695        collection: &str,
696        prefix: Option<&str>,
697    ) -> RedDBResult<Vec<VaultEntry>> {
698        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
699        let store = self.runtime.inner.db.store();
700        let Some(manager) = store.get_collection(collection) else {
701            return Ok(Vec::new());
702        };
703        let mut versions = Vec::new();
704        for entity in manager.query_all(|_| true) {
705            let crate::storage::EntityData::Row(ref row) = entity.data else {
706                continue;
707            };
708            let Some(version) =
709                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
710            else {
711                continue;
712            };
713            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
714            let entry = VaultEntry::from_keyed_row(
715                version,
716                metadata,
717                entity.created_at,
718                entity.updated_at,
719                entity.sequence_id,
720            );
721            versions.push(entry);
722        }
723        Ok(super::keyed_spine::latest_versions(versions, prefix))
724    }
725
726    fn append_vault_version(
727        &self,
728        collection: &str,
729        key: &str,
730        value: crate::storage::schema::Value,
731        op: &str,
732        tombstone: bool,
733        tags: &[String],
734    ) -> RedDBResult<VaultEntry> {
735        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
736        let version = self
737            .get_vault_entry(collection, key)?
738            .map(|entry| entry.version)
739            .unwrap_or(0)
740            + 1;
741        let stored_value = if tombstone {
742            crate::storage::schema::Value::Null
743        } else {
744            self.runtime.seal_vault_value(collection, value)?
745        };
746        let now = current_unix_ms() as i64;
747        let fields = vec![
748            (
749                "key".to_string(),
750                crate::storage::schema::Value::text(key.to_string()),
751            ),
752            ("value".to_string(), stored_value),
753            (
754                "version".to_string(),
755                crate::storage::schema::Value::Integer(version),
756            ),
757            (
758                "tombstone".to_string(),
759                crate::storage::schema::Value::Boolean(tombstone),
760            ),
761            (
762                "op".to_string(),
763                crate::storage::schema::Value::text(op.to_string()),
764            ),
765            (
766                "created_at_ms".to_string(),
767                crate::storage::schema::Value::Integer(now),
768            ),
769        ];
770        let mut row = crate::storage::RowData::new(Vec::new());
771        row.named = Some(fields.into_iter().collect());
772        let entity = crate::storage::UnifiedEntity::new(
773            crate::storage::EntityId::new(0),
774            crate::storage::EntityKind::TableRow {
775                table: std::sync::Arc::from(collection),
776                row_id: 0,
777            },
778            crate::storage::EntityData::Row(row),
779        );
780        let id = self
781            .runtime
782            .inner
783            .db
784            .store()
785            .insert(collection, entity)
786            .map_err(|err| RedDBError::Internal(err.to_string()))?;
787        if !tags.is_empty() {
788            self.runtime
789                .inner
790                .db
791                .store()
792                .set_metadata(
793                    collection,
794                    id,
795                    Metadata::with_fields(vault_tags_metadata(tags)),
796                )
797                .map_err(|err| RedDBError::Internal(err.to_string()))?;
798            self.runtime
799                .inner
800                .kv_tag_index
801                .replace(collection, key, id, tags);
802        }
803        self.get_vault_entry_version(collection, key, version)?
804            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
805    }
806
807    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
808        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
809        let versions = self.vault_versions(collection, key)?;
810        let store = self.runtime.inner.db.store();
811        let mut purged = 0usize;
812        for entry in versions {
813            if store
814                .delete(collection, entry.id)
815                .map_err(|err| RedDBError::Internal(err.to_string()))?
816            {
817                store.context_index().remove_entity(entry.id);
818                purged += 1;
819            }
820        }
821        Ok(purged)
822    }
823
824    fn ensure_declared_model(
825        &self,
826        model: crate::catalog::CollectionModel,
827        collection: &str,
828    ) -> RedDBResult<()> {
829        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
830            return Ok(());
831        };
832        if contract.declared_model == model
833            || contract.declared_model == crate::catalog::CollectionModel::Mixed
834        {
835            return Ok(());
836        }
837        Err(RedDBError::InvalidOperation(format!(
838            "collection '{}' is declared as '{}' and does not allow '{}' operations",
839            collection,
840            keyed_model_name(contract.declared_model),
841            keyed_model_name(model)
842        )))
843    }
844}
845
846impl RedDBRuntime {
847    pub(crate) fn seal_vault_value(
848        &self,
849        collection: &str,
850        value: crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let key = self.vault_encryption_key(collection)?;
853        let plaintext = value.to_bytes();
854        let nonce_bytes = crate::auth::store::random_bytes(12);
855        let mut nonce = [0u8; 12];
856        nonce.copy_from_slice(&nonce_bytes[..12]);
857        let aad = format!("reddb.vault.{collection}");
858        let ciphertext =
859            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
860        let mut payload = Vec::with_capacity(12 + ciphertext.len());
861        payload.extend_from_slice(&nonce);
862        payload.extend_from_slice(&ciphertext);
863        Ok(crate::storage::schema::Value::Secret(payload))
864    }
865
866    fn vault_key_available(&self, collection: &str) -> bool {
867        self.vault_encryption_key(collection).is_ok()
868    }
869
870    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
871        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
872            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
873        })?;
874        if !auth_store.is_vault_backed() {
875            return Err(RedDBError::Query(
876                "vault sealed_unavailable: key provider is sealed".to_string(),
877            ));
878        }
879
880        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
881            return decode_vault_key(&hex_key);
882        }
883        auth_store.vault_secret_key().ok_or_else(|| {
884            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
885        })
886    }
887
888    fn unseal_vault_value(
889        &self,
890        collection: &str,
891        sealed: &crate::storage::schema::Value,
892    ) -> RedDBResult<crate::storage::schema::Value> {
893        let crate::storage::schema::Value::Secret(payload) = sealed else {
894            return Err(RedDBError::Query(
895                "vault unseal failed: stored value is not sealed".to_string(),
896            ));
897        };
898        if payload.len() < 12 {
899            return Err(RedDBError::Query(
900                "vault unseal failed: sealed payload is truncated".to_string(),
901            ));
902        }
903        let key = self.vault_encryption_key(collection)?;
904        let mut nonce = [0u8; 12];
905        nonce.copy_from_slice(&payload[..12]);
906        let aad = format!("reddb.vault.{collection}");
907        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
908            &key,
909            &nonce,
910            aad.as_bytes(),
911            &payload[12..],
912        )
913        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
914        let (value, consumed) =
915            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
916                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
917            })?;
918        if consumed != plaintext.len() {
919            return Err(RedDBError::Query(
920                "vault unseal failed: trailing plaintext bytes".to_string(),
921            ));
922        }
923        Ok(value)
924    }
925
926    /// Internal peek of a vault entry's unsealed value, bypassing audit and
927    /// policy checks. Used by `SecretRefGuard` to detect chained `secret_ref`
928    /// indirection at config write time without emitting vault read events.
929    /// Returns `Ok(None)` if the entry is absent, tombstoned, or its sealed
930    /// payload cannot be decrypted with the available key material.
931    pub(crate) fn peek_vault_unsealed(
932        &self,
933        collection: &str,
934        key: &str,
935    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
936        let ops = KvAtomicOps::new(self);
937        let Some(entry) = ops.get_vault_entry(collection, key)? else {
938            return Ok(None);
939        };
940        if entry.tombstone {
941            return Ok(None);
942        }
943        Ok(self.unseal_vault_value(collection, &entry.value).ok())
944    }
945
946    fn vault_target_resource(collection: &str, key: &str) -> String {
947        if collection == "red.vault" {
948            return format!("red.vault/{}", key.to_ascii_lowercase());
949        }
950        format!("{collection}.{key}")
951    }
952
953    fn current_vault_actor() -> String {
954        current_auth_identity()
955            .map(|(principal, _)| principal)
956            .unwrap_or_else(|| "anonymous".to_string())
957    }
958
959    fn vault_request_id() -> String {
960        let conn_id = current_connection_id();
961        if conn_id == 0 {
962            "embedded".to_string()
963        } else {
964            format!("conn-{conn_id}")
965        }
966    }
967
968    fn check_vault_capability(
969        &self,
970        action: &str,
971        collection: &str,
972        key: &str,
973    ) -> Result<(), String> {
974        let Some(auth_store) = self.inner.auth_store.read().clone() else {
975            return Ok(());
976        };
977        if !auth_store.iam_authorization_enabled() {
978            return Ok(());
979        }
980        let Some((principal, role)) = current_auth_identity() else {
981            return Err(
982                "IAM authorization is enabled; vault capability check requires an authenticated principal"
983                    .to_string(),
984            );
985        };
986        let tenant = current_tenant();
987        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
988        let mut resource = crate::auth::policies::ResourceRef::new(
989            "vault",
990            Self::vault_target_resource(collection, key),
991        );
992        if let Some(ref tenant) = tenant {
993            resource = resource.with_tenant(tenant.clone());
994        }
995        let ctx = crate::auth::policies::EvalContext {
996            principal_tenant: tenant.clone(),
997            current_tenant: tenant,
998            peer_ip: None,
999            mfa_present: false,
1000            now_ms: crate::utils::now_unix_millis() as u128,
1001            principal_is_admin_role: role == crate::auth::Role::Admin,
1002            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
1003            principal_is_platform_scoped: principal_id.tenant.is_none(),
1004        };
1005        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1006            Ok(())
1007        } else {
1008            Err(format!(
1009                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
1010                principal,
1011                action,
1012                Self::vault_target_resource(collection, key)
1013            ))
1014        }
1015    }
1016
1017    fn check_system_vault_capability(
1018        &self,
1019        action: &str,
1020        collection: &str,
1021        key: &str,
1022    ) -> Result<(), String> {
1023        if collection != "red.vault" {
1024            return Ok(());
1025        }
1026        self.check_vault_capability(action, collection, key)
1027    }
1028
1029    fn audit_vault_unseal(
1030        &self,
1031        collection: &str,
1032        key: &str,
1033        outcome: crate::runtime::audit_log::Outcome,
1034        reason: &str,
1035        entry: Option<&VaultEntry>,
1036    ) {
1037        let actor = Self::current_vault_actor();
1038        let request_id = Self::vault_request_id();
1039        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
1040            .principal(actor.clone())
1041            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1042            .resource(format!(
1043                "vault:{}",
1044                Self::vault_target_resource(collection, key)
1045            ))
1046            .outcome(outcome)
1047            .correlation_id(request_id.clone())
1048            .fields([
1049                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1050                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1051                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1052                crate::runtime::audit_log::AuditFieldEscaper::field(
1053                    "target",
1054                    Self::vault_target_resource(collection, key),
1055                ),
1056                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1057                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1058                crate::runtime::audit_log::AuditFieldEscaper::field(
1059                    "connection_id",
1060                    current_connection_id(),
1061                ),
1062            ]);
1063        if let Some(tenant) = current_tenant() {
1064            builder = builder.tenant(tenant);
1065        }
1066        if let Some(entry) = entry {
1067            builder = builder.fields([
1068                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1069                crate::runtime::audit_log::AuditFieldEscaper::field(
1070                    "sequence_id",
1071                    entry.sequence_id,
1072                ),
1073            ]);
1074        }
1075        self.audit_log().record_event(builder.build());
1076    }
1077
1078    fn audit_vault_lifecycle(
1079        &self,
1080        operation: &str,
1081        collection: &str,
1082        key: &str,
1083        outcome: crate::runtime::audit_log::Outcome,
1084        reason: &str,
1085        entry: Option<&VaultEntry>,
1086    ) {
1087        let actor = Self::current_vault_actor();
1088        let request_id = Self::vault_request_id();
1089        let mut builder =
1090            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1091                .principal(actor.clone())
1092                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1093                .resource(format!(
1094                    "vault:{}",
1095                    Self::vault_target_resource(collection, key)
1096                ))
1097                .outcome(outcome)
1098                .correlation_id(request_id.clone())
1099                .fields([
1100                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1101                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1102                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1103                    crate::runtime::audit_log::AuditFieldEscaper::field(
1104                        "target",
1105                        Self::vault_target_resource(collection, key),
1106                    ),
1107                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1108                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1109                    crate::runtime::audit_log::AuditFieldEscaper::field(
1110                        "connection_id",
1111                        current_connection_id(),
1112                    ),
1113                ]);
1114        if let Some(tenant) = current_tenant() {
1115            builder = builder.tenant(tenant);
1116        }
1117        if let Some(entry) = entry {
1118            builder = builder.fields([
1119                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1120                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1121                crate::runtime::audit_log::AuditFieldEscaper::field(
1122                    "sequence_id",
1123                    entry.sequence_id,
1124                ),
1125            ]);
1126        }
1127        self.audit_log().record_event(builder.build());
1128    }
1129
1130    fn emit_vault_control_event(
1131        &self,
1132        kind: crate::runtime::control_events::EventKind,
1133        outcome: crate::runtime::control_events::Outcome,
1134        action: &'static str,
1135        collection: &str,
1136        key: &str,
1137        reason: &str,
1138        entry: Option<&VaultEntry>,
1139        extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1140    ) -> RedDBResult<()> {
1141        use crate::runtime::control_events::{
1142            ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1143        };
1144        use std::borrow::Cow;
1145
1146        let tenant = current_tenant();
1147        let principal = current_auth_identity().map(|(principal, _)| principal);
1148        let actor_user = principal
1149            .as_ref()
1150            .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1151        let request_id = Self::vault_request_id();
1152        let actor = actor_user
1153            .as_ref()
1154            .map(ActorRef::User)
1155            .unwrap_or(ActorRef::Anonymous);
1156        let ctx = ControlEventCtx {
1157            actor,
1158            scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1159            request_id: Some(Cow::Borrowed(request_id.as_str())),
1160            trace_id: None,
1161        };
1162
1163        let target = Self::vault_target_resource(collection, key);
1164        let mut fields = std::collections::HashMap::new();
1165        fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1166        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1167        fields.insert("key".to_string(), Sensitivity::raw(key));
1168        fields.insert(
1169            "connection_id".to_string(),
1170            Sensitivity::raw(current_connection_id().to_string()),
1171        );
1172        if let Some(entry) = entry {
1173            fields.insert(
1174                "entity_id".to_string(),
1175                Sensitivity::raw(entry.id.raw().to_string()),
1176            );
1177            fields.insert(
1178                "sequence_id".to_string(),
1179                Sensitivity::raw(entry.sequence_id.to_string()),
1180            );
1181            fields.insert(
1182                "version".to_string(),
1183                Sensitivity::raw(entry.version.to_string()),
1184            );
1185            fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1186            fields.insert(
1187                "tombstone".to_string(),
1188                Sensitivity::raw(entry.tombstone.to_string()),
1189            );
1190            if !entry.tombstone {
1191                fields.insert(
1192                    "fingerprint".to_string(),
1193                    Sensitivity::raw(vault_fingerprint(&entry.value)),
1194                );
1195            }
1196            fields.insert(
1197                "tags".to_string(),
1198                Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1199            );
1200        }
1201        for (key, value) in extra_fields {
1202            fields.insert(key, value);
1203        }
1204
1205        let event = ControlEvent {
1206            kind,
1207            outcome,
1208            action: Cow::Borrowed(action),
1209            resource: Some(format!("vault:{target}")),
1210            reason: Some(reason.to_string()),
1211            matched_policy_id: None,
1212            fields,
1213        };
1214        let ledger = self.inner.control_event_ledger.read();
1215        match ledger.emit(&ctx, event) {
1216            Ok(_) => Ok(()),
1217            Err(err) if self.inner.control_event_config.require_persistence() => {
1218                Err(RedDBError::Internal(err.to_string()))
1219            }
1220            Err(_) => Ok(()),
1221        }
1222    }
1223
1224    pub(crate) fn resolve_vault_secret_value(
1225        &self,
1226        collection: &str,
1227        key: &str,
1228    ) -> RedDBResult<Value> {
1229        let ops = KvAtomicOps::new(self);
1230        let entry = ops.get_vault_entry(collection, key)?;
1231        if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1232            self.audit_vault_unseal(
1233                collection,
1234                key,
1235                crate::runtime::audit_log::Outcome::Denied,
1236                &reason,
1237                entry.as_ref(),
1238            );
1239            self.emit_vault_control_event(
1240                crate::runtime::control_events::EventKind::VaultRead,
1241                crate::runtime::control_events::Outcome::Denied,
1242                "vault:read",
1243                collection,
1244                key,
1245                &reason,
1246                entry.as_ref(),
1247                Vec::new(),
1248            )?;
1249            return Err(RedDBError::Query(reason));
1250        }
1251        let Some(entry) = entry else {
1252            let reason = "not_found";
1253            self.audit_vault_unseal(
1254                collection,
1255                key,
1256                crate::runtime::audit_log::Outcome::Denied,
1257                reason,
1258                None,
1259            );
1260            self.emit_vault_control_event(
1261                crate::runtime::control_events::EventKind::VaultRead,
1262                crate::runtime::control_events::Outcome::Denied,
1263                "vault:read",
1264                collection,
1265                key,
1266                reason,
1267                None,
1268                Vec::new(),
1269            )?;
1270            return Err(RedDBError::NotFound(format!(
1271                "vault secret '{}.{}' not found",
1272                collection, key
1273            )));
1274        };
1275        if entry.tombstone {
1276            let reason = "deleted";
1277            self.audit_vault_unseal(
1278                collection,
1279                key,
1280                crate::runtime::audit_log::Outcome::Denied,
1281                reason,
1282                Some(&entry),
1283            );
1284            self.emit_vault_control_event(
1285                crate::runtime::control_events::EventKind::VaultRead,
1286                crate::runtime::control_events::Outcome::Denied,
1287                "vault:read",
1288                collection,
1289                key,
1290                reason,
1291                Some(&entry),
1292                Vec::new(),
1293            )?;
1294            return Err(RedDBError::NotFound(format!(
1295                "vault secret '{}.{}' is deleted",
1296                collection, key
1297            )));
1298        }
1299        match self.unseal_vault_value(collection, &entry.value) {
1300            Ok(value) => {
1301                self.audit_vault_unseal(
1302                    collection,
1303                    key,
1304                    crate::runtime::audit_log::Outcome::Success,
1305                    "ok",
1306                    Some(&entry),
1307                );
1308                self.emit_vault_control_event(
1309                    crate::runtime::control_events::EventKind::VaultRead,
1310                    crate::runtime::control_events::Outcome::Allowed,
1311                    "vault:read",
1312                    collection,
1313                    key,
1314                    "ok",
1315                    Some(&entry),
1316                    Vec::new(),
1317                )?;
1318                Ok(value)
1319            }
1320            Err(err) => {
1321                let reason = err.to_string();
1322                self.audit_vault_unseal(
1323                    collection,
1324                    key,
1325                    crate::runtime::audit_log::Outcome::Error,
1326                    &reason,
1327                    Some(&entry),
1328                );
1329                self.emit_vault_control_event(
1330                    crate::runtime::control_events::EventKind::VaultRead,
1331                    crate::runtime::control_events::Outcome::Error,
1332                    "vault:read",
1333                    collection,
1334                    key,
1335                    &reason,
1336                    Some(&entry),
1337                    Vec::new(),
1338                )?;
1339                Err(err)
1340            }
1341        }
1342    }
1343
1344    /// Dispatch a `KV PUT / GET / DELETE` command.
1345    pub fn execute_kv_command(
1346        &self,
1347        raw_query: &str,
1348        cmd: &crate::storage::query::ast::KvCommand,
1349    ) -> RedDBResult<RuntimeQueryResult> {
1350        use crate::storage::query::ast::KvCommand;
1351
1352        let ops = KvAtomicOps::new(self);
1353
1354        match cmd {
1355            KvCommand::Put {
1356                model,
1357                collection,
1358                key,
1359                value,
1360                ttl_ms,
1361                tags,
1362                if_not_exists,
1363            } => {
1364                if *model == crate::catalog::CollectionModel::Vault {
1365                    self.check_system_vault_capability("vault:write", collection, key)
1366                        .map_err(RedDBError::Query)?;
1367                }
1368                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1369                let (created, id) = ops.set_with_tags_for_model(
1370                    *model,
1371                    collection,
1372                    key,
1373                    value.clone(),
1374                    *ttl_ms,
1375                    tags,
1376                    *if_not_exists,
1377                )?;
1378
1379                let mut result = UnifiedResult::with_columns(vec![
1380                    "ok".into(),
1381                    "collection".into(),
1382                    "key".into(),
1383                    "id".into(),
1384                    "created".into(),
1385                    "tags".into(),
1386                ]);
1387                let mut record = UnifiedRecord::new();
1388                record.set("ok", Value::Boolean(true));
1389                record.set("collection", Value::text(collection.clone()));
1390                record.set("key", Value::text(key.clone()));
1391                record.set("id", Value::Integer(id.raw() as i64));
1392                record.set("created", Value::Boolean(created));
1393                record.set("tags", kv_tags_value(tags));
1394                result.push(record);
1395
1396                Ok(RuntimeQueryResult {
1397                    query: raw_query.to_string(),
1398                    mode: crate::storage::query::modes::QueryMode::Sql,
1399                    statement: if *model == crate::catalog::CollectionModel::Vault {
1400                        "vault_put"
1401                    } else {
1402                        "kv_put"
1403                    },
1404                    engine: if *model == crate::catalog::CollectionModel::Vault {
1405                        "vault"
1406                    } else {
1407                        "kv"
1408                    },
1409                    result,
1410                    affected_rows: 1,
1411                    statement_type: if created { "insert" } else { "update" },
1412                    bookmark: None,
1413                })
1414            }
1415            KvCommand::InvalidateTags { collection, tags } => {
1416                let invalidated = ops.invalidate_tags(collection, tags)?;
1417
1418                let mut result = UnifiedResult::with_columns(vec![
1419                    "ok".into(),
1420                    "collection".into(),
1421                    "invalidated".into(),
1422                    "tags".into(),
1423                ]);
1424                let mut record = UnifiedRecord::new();
1425                record.set("ok", Value::Boolean(true));
1426                record.set("collection", Value::text(collection.clone()));
1427                record.set("invalidated", Value::Integer(invalidated as i64));
1428                record.set("tags", kv_tags_value(tags));
1429                result.push(record);
1430
1431                Ok(RuntimeQueryResult {
1432                    query: raw_query.to_string(),
1433                    mode: crate::storage::query::modes::QueryMode::Sql,
1434                    statement: "kv_invalidate_tags",
1435                    engine: "kv",
1436                    result,
1437                    affected_rows: invalidated as u64,
1438                    statement_type: "delete",
1439                    bookmark: None,
1440                })
1441            }
1442
1443            KvCommand::Rotate {
1444                collection,
1445                key,
1446                value,
1447                tags,
1448            } => {
1449                self.check_system_vault_capability("vault:write", collection, key)
1450                    .map_err(RedDBError::Query)?;
1451                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1452                let entry = ops.append_vault_version(
1453                    collection,
1454                    key,
1455                    value.clone(),
1456                    "rotate",
1457                    false,
1458                    tags,
1459                )?;
1460                self.record_kv_watch_event(
1461                    crate::replication::cdc::ChangeOperation::Update,
1462                    collection,
1463                    key,
1464                    entry.id.raw(),
1465                    None,
1466                    Some(vault_entry_metadata_json(&entry)),
1467                );
1468                self.audit_vault_lifecycle(
1469                    "rotate",
1470                    collection,
1471                    key,
1472                    crate::runtime::audit_log::Outcome::Success,
1473                    "ok",
1474                    Some(&entry),
1475                );
1476                self.emit_vault_control_event(
1477                    crate::runtime::control_events::EventKind::VaultRotate,
1478                    crate::runtime::control_events::Outcome::Allowed,
1479                    "vault:rotate",
1480                    collection,
1481                    key,
1482                    "ok",
1483                    Some(&entry),
1484                    Vec::new(),
1485                )?;
1486                Ok(vault_write_result(
1487                    raw_query,
1488                    "vault_rotate",
1489                    "update",
1490                    collection,
1491                    key,
1492                    &entry,
1493                    1,
1494                ))
1495            }
1496
1497            KvCommand::List {
1498                model,
1499                collection,
1500                prefix,
1501                limit,
1502                offset,
1503                as_json,
1504            } => {
1505                if *model == crate::catalog::CollectionModel::Vault {
1506                    if *as_json {
1507                        return Err(RedDBError::Query(
1508                            "LIST VAULT AS JSON is not supported; vault list only exposes metadata"
1509                                .to_string(),
1510                        ));
1511                    }
1512                    let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1513                    entries.sort_by(|left, right| left.key.cmp(&right.key));
1514                    let mut result = UnifiedResult::with_columns(vec![
1515                        "collection".into(),
1516                        "key".into(),
1517                        "version".into(),
1518                        "fingerprint".into(),
1519                        "tags".into(),
1520                        "created_at".into(),
1521                        "updated_at".into(),
1522                        "status".into(),
1523                        "tombstone".into(),
1524                        "op".into(),
1525                    ]);
1526                    let mut visible = Vec::new();
1527                    for entry in entries {
1528                        match self.check_vault_capability(
1529                            "vault:read_metadata",
1530                            collection,
1531                            &entry.key,
1532                        ) {
1533                            Ok(()) => {
1534                                self.emit_vault_control_event(
1535                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1536                                    crate::runtime::control_events::Outcome::Allowed,
1537                                    "vault:read_metadata",
1538                                    collection,
1539                                    &entry.key,
1540                                    "ok",
1541                                    Some(&entry),
1542                                    Vec::new(),
1543                                )?;
1544                                visible.push(entry);
1545                            }
1546                            Err(reason) => {
1547                                self.emit_vault_control_event(
1548                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1549                                    crate::runtime::control_events::Outcome::Denied,
1550                                    "vault:read_metadata",
1551                                    collection,
1552                                    &entry.key,
1553                                    &reason,
1554                                    Some(&entry),
1555                                    Vec::new(),
1556                                )?;
1557                            }
1558                        }
1559                    }
1560                    for entry in visible
1561                        .into_iter()
1562                        .skip(*offset)
1563                        .take(limit.unwrap_or(usize::MAX))
1564                    {
1565                        push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1566                    }
1567                    Ok(RuntimeQueryResult {
1568                        query: raw_query.to_string(),
1569                        mode: crate::storage::query::modes::QueryMode::Sql,
1570                        statement: "vault_list",
1571                        engine: "vault",
1572                        result,
1573                        affected_rows: 0,
1574                        statement_type: "select",
1575                        bookmark: None,
1576                    })
1577                } else {
1578                    let mut result = UnifiedResult::with_columns(vec![
1579                        "rid".into(),
1580                        "collection".into(),
1581                        "kind".into(),
1582                        "tenant".into(),
1583                        "created_at".into(),
1584                        "updated_at".into(),
1585                        "key".into(),
1586                        "value".into(),
1587                        "tags".into(),
1588                    ]);
1589                    let entries = ops.latest_kv_entries(collection, prefix.as_deref())?;
1590                    if *as_json {
1591                        let mut tree = crate::json::Value::Object(crate::json::Map::new());
1592                        for (key, entity) in entries
1593                            .into_iter()
1594                            .skip(*offset)
1595                            .take(limit.unwrap_or(usize::MAX))
1596                        {
1597                            let Some(value) = kv_value_from_entity(&entity) else {
1598                                continue;
1599                            };
1600                            let relative = match prefix {
1601                                Some(pfx) if key == *pfx => "",
1602                                Some(pfx) => key
1603                                    .strip_prefix(pfx.as_str())
1604                                    .map(|tail| tail.strip_prefix('.').unwrap_or(tail))
1605                                    .unwrap_or(key.as_str()),
1606                                None => key.as_str(),
1607                            };
1608                            insert_kv_json_path(
1609                                &mut tree,
1610                                relative,
1611                                crate::presentation::entity_json::storage_value_to_json(&value),
1612                            );
1613                        }
1614                        Ok(kv_list_json_result(raw_query, collection, prefix, tree))
1615                    } else {
1616                        for (key, entity) in entries
1617                            .into_iter()
1618                            .skip(*offset)
1619                            .take(limit.unwrap_or(usize::MAX))
1620                        {
1621                            let mut record = UnifiedRecord::new();
1622                            record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1623                            record.set("collection", Value::text(collection.clone()));
1624                            record.set("kind", Value::text("kv"));
1625                            record.set("tenant", Value::Null);
1626                            record.set("created_at", Value::UnsignedInteger(entity.created_at));
1627                            record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1628                            record.set("key", Value::text(key.clone()));
1629                            record.set(
1630                                "value",
1631                                kv_value_from_entity(&entity)
1632                                    .unwrap_or(crate::storage::schema::Value::Null),
1633                            );
1634                            record.set("tags", kv_tags_value(&ops.tags_for_key(collection, &key)));
1635                            result.push(record);
1636                        }
1637                        Ok(RuntimeQueryResult {
1638                            query: raw_query.to_string(),
1639                            mode: crate::storage::query::modes::QueryMode::Sql,
1640                            statement: "kv_list",
1641                            engine: "kv",
1642                            result,
1643                            affected_rows: 0,
1644                            statement_type: "select",
1645                            bookmark: None,
1646                        })
1647                    }
1648                }
1649            }
1650
1651            KvCommand::History { collection, key } => {
1652                let latest = ops.get_vault_entry(collection, key)?;
1653                if let Err(reason) =
1654                    self.check_vault_capability("vault:read_metadata", collection, key)
1655                {
1656                    self.emit_vault_control_event(
1657                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1658                        crate::runtime::control_events::Outcome::Denied,
1659                        "vault:read_metadata",
1660                        collection,
1661                        key,
1662                        &reason,
1663                        latest.as_ref(),
1664                        Vec::new(),
1665                    )?;
1666                    return Err(RedDBError::Query(reason));
1667                }
1668                let versions =
1669                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1670                let result = vault_history_result(collection, key, &versions);
1671                self.emit_vault_control_event(
1672                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1673                    crate::runtime::control_events::Outcome::Allowed,
1674                    "vault:read_metadata",
1675                    collection,
1676                    key,
1677                    "ok",
1678                    latest.as_ref(),
1679                    Vec::new(),
1680                )?;
1681                Ok(RuntimeQueryResult {
1682                    query: raw_query.to_string(),
1683                    mode: crate::storage::query::modes::QueryMode::Sql,
1684                    statement: "vault_history",
1685                    engine: "vault",
1686                    result,
1687                    affected_rows: 0,
1688                    statement_type: "select",
1689                    bookmark: None,
1690                })
1691            }
1692
1693            KvCommand::Purge { collection, key } => {
1694                let entry = ops.get_vault_entry(collection, key)?;
1695                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1696                    self.audit_vault_lifecycle(
1697                        "purge",
1698                        collection,
1699                        key,
1700                        crate::runtime::audit_log::Outcome::Denied,
1701                        &reason,
1702                        entry.as_ref(),
1703                    );
1704                    self.emit_vault_control_event(
1705                        crate::runtime::control_events::EventKind::VaultPurge,
1706                        crate::runtime::control_events::Outcome::Denied,
1707                        "vault:purge",
1708                        collection,
1709                        key,
1710                        &reason,
1711                        entry.as_ref(),
1712                        Vec::new(),
1713                    )?;
1714                    return Err(RedDBError::Query(reason));
1715                }
1716                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1717                let purged = ops.purge_vault_versions(collection, key)?;
1718                self.audit_vault_lifecycle(
1719                    "purge",
1720                    collection,
1721                    key,
1722                    crate::runtime::audit_log::Outcome::Success,
1723                    "ok",
1724                    entry.as_ref(),
1725                );
1726                self.emit_vault_control_event(
1727                    crate::runtime::control_events::EventKind::VaultPurge,
1728                    crate::runtime::control_events::Outcome::Allowed,
1729                    "vault:purge",
1730                    collection,
1731                    key,
1732                    "ok",
1733                    entry.as_ref(),
1734                    vec![(
1735                        "purged".to_string(),
1736                        crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1737                    )],
1738                )?;
1739                let mut result = UnifiedResult::with_columns(vec![
1740                    "ok".into(),
1741                    "collection".into(),
1742                    "key".into(),
1743                    "purged".into(),
1744                ]);
1745                let mut record = UnifiedRecord::new();
1746                record.set("ok", Value::Boolean(true));
1747                record.set("collection", Value::text(collection.clone()));
1748                record.set("key", Value::text(key.clone()));
1749                record.set("purged", Value::Integer(purged as i64));
1750                result.push(record);
1751                Ok(RuntimeQueryResult {
1752                    query: raw_query.to_string(),
1753                    mode: crate::storage::query::modes::QueryMode::Sql,
1754                    statement: "vault_purge",
1755                    engine: "vault",
1756                    result,
1757                    affected_rows: purged as u64,
1758                    statement_type: "delete",
1759                    bookmark: None,
1760                })
1761            }
1762
1763            KvCommand::Get {
1764                model,
1765                collection,
1766                key,
1767            } => {
1768                if *model == crate::catalog::CollectionModel::Vault {
1769                    let entry = ops.get_vault_entry(collection, key)?;
1770                    if let Err(reason) =
1771                        self.check_vault_capability("vault:read_metadata", collection, key)
1772                    {
1773                        self.emit_vault_control_event(
1774                            crate::runtime::control_events::EventKind::VaultMetadataRead,
1775                            crate::runtime::control_events::Outcome::Denied,
1776                            "vault:read_metadata",
1777                            collection,
1778                            key,
1779                            &reason,
1780                            entry.as_ref(),
1781                            Vec::new(),
1782                        )?;
1783                        return Err(RedDBError::Query(reason));
1784                    }
1785                    let key_available = self.vault_key_available(collection);
1786                    let result =
1787                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1788                    self.emit_vault_control_event(
1789                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1790                        crate::runtime::control_events::Outcome::Allowed,
1791                        "vault:read_metadata",
1792                        collection,
1793                        key,
1794                        "ok",
1795                        entry.as_ref(),
1796                        Vec::new(),
1797                    )?;
1798                    return Ok(RuntimeQueryResult {
1799                        query: raw_query.to_string(),
1800                        mode: crate::storage::query::modes::QueryMode::Sql,
1801                        statement: "vault_get",
1802                        engine: "vault",
1803                        result,
1804                        affected_rows: 0,
1805                        statement_type: "select",
1806                        bookmark: None,
1807                    });
1808                }
1809
1810                let entity = ops.get_entity(*model, collection, key)?;
1811                let value = entity.as_ref().and_then(kv_value_from_entity);
1812                if *model == crate::catalog::CollectionModel::Kv {
1813                    self.inner.kv_stats.incr_gets();
1814                }
1815                let mut result = UnifiedResult::with_columns(vec![
1816                    "rid".into(),
1817                    "collection".into(),
1818                    "kind".into(),
1819                    "tenant".into(),
1820                    "created_at".into(),
1821                    "updated_at".into(),
1822                    "key".into(),
1823                    "value".into(),
1824                    "tags".into(),
1825                ]);
1826                let mut record = UnifiedRecord::new();
1827                if let Some(entity) = entity.as_ref() {
1828                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1829                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1830                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1831                } else {
1832                    record.set("rid", Value::Null);
1833                    record.set("created_at", Value::Null);
1834                    record.set("updated_at", Value::Null);
1835                }
1836                record.set("collection", Value::text(collection.clone()));
1837                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1838                record.set("tenant", Value::Null);
1839                record.set("key", Value::text(key.clone()));
1840                record.set(
1841                    "value",
1842                    value.unwrap_or(crate::storage::schema::Value::Null),
1843                );
1844                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1845                result.push(record);
1846
1847                Ok(RuntimeQueryResult {
1848                    query: raw_query.to_string(),
1849                    mode: crate::storage::query::modes::QueryMode::Sql,
1850                    statement: "kv_get",
1851                    engine: "kv",
1852                    result,
1853                    affected_rows: 0,
1854                    statement_type: "select",
1855                    bookmark: None,
1856                })
1857            }
1858            KvCommand::Watch {
1859                model,
1860                collection,
1861                key,
1862                prefix,
1863                from_lsn,
1864            } => {
1865                let watch_key = if *prefix {
1866                    format!("{key}.*")
1867                } else {
1868                    key.clone()
1869                };
1870                let endpoint = match from_lsn {
1871                    Some(lsn) => format!(
1872                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1873                        keyed_model_name(*model)
1874                    ),
1875                    None => format!(
1876                        "/collections/{collection}/{}/{watch_key}/watch",
1877                        keyed_model_name(*model)
1878                    ),
1879                };
1880                let mut result = UnifiedResult::with_columns(vec![
1881                    "collection".into(),
1882                    "key".into(),
1883                    "prefix".into(),
1884                    "from_lsn".into(),
1885                    "watch_url".into(),
1886                    "streaming".into(),
1887                ]);
1888                let mut record = UnifiedRecord::new();
1889                record.set("collection", Value::text(collection.clone()));
1890                record.set("key", Value::text(watch_key));
1891                record.set("prefix", Value::Boolean(*prefix));
1892                record.set(
1893                    "from_lsn",
1894                    from_lsn
1895                        .map(Value::UnsignedInteger)
1896                        .unwrap_or(crate::storage::schema::Value::Null),
1897                );
1898                record.set("watch_url", Value::text(endpoint));
1899                record.set("streaming", Value::Boolean(true));
1900                result.push(record);
1901
1902                Ok(RuntimeQueryResult {
1903                    query: raw_query.to_string(),
1904                    mode: crate::storage::query::modes::QueryMode::Sql,
1905                    statement: "kv_watch",
1906                    engine: keyed_model_name(*model),
1907                    result,
1908                    affected_rows: 0,
1909                    statement_type: "stream",
1910                    bookmark: None,
1911                })
1912            }
1913
1914            KvCommand::Unseal {
1915                collection,
1916                key,
1917                version,
1918            } => {
1919                let latest = ops.get_vault_entry(collection, key)?;
1920                let entry = match version {
1921                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1922                    None => latest.clone(),
1923                };
1924                let action = match (version, latest.as_ref()) {
1925                    (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1926                    (Some(_), _) => "vault:unseal_history",
1927                    _ => "vault:read",
1928                };
1929                let event_kind = if action == "vault:read" {
1930                    crate::runtime::control_events::EventKind::VaultRead
1931                } else {
1932                    crate::runtime::control_events::EventKind::VaultUnseal
1933                };
1934                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1935                    self.audit_vault_unseal(
1936                        collection,
1937                        key,
1938                        crate::runtime::audit_log::Outcome::Denied,
1939                        &reason,
1940                        entry.as_ref(),
1941                    );
1942                    self.emit_vault_control_event(
1943                        event_kind,
1944                        crate::runtime::control_events::Outcome::Denied,
1945                        action,
1946                        collection,
1947                        key,
1948                        &reason,
1949                        entry.as_ref(),
1950                        Vec::new(),
1951                    )?;
1952                    return Err(RedDBError::Query(reason));
1953                }
1954                let Some(entry) = entry else {
1955                    let reason = "not_found";
1956                    self.audit_vault_unseal(
1957                        collection,
1958                        key,
1959                        crate::runtime::audit_log::Outcome::Denied,
1960                        reason,
1961                        None,
1962                    );
1963                    self.emit_vault_control_event(
1964                        event_kind,
1965                        crate::runtime::control_events::Outcome::Denied,
1966                        action,
1967                        collection,
1968                        key,
1969                        reason,
1970                        None,
1971                        Vec::new(),
1972                    )?;
1973                    return Err(RedDBError::NotFound(format!(
1974                        "vault secret '{}.{}' not found",
1975                        collection, key
1976                    )));
1977                };
1978                if entry.tombstone {
1979                    let reason = "deleted";
1980                    self.audit_vault_unseal(
1981                        collection,
1982                        key,
1983                        crate::runtime::audit_log::Outcome::Denied,
1984                        reason,
1985                        Some(&entry),
1986                    );
1987                    self.emit_vault_control_event(
1988                        event_kind,
1989                        crate::runtime::control_events::Outcome::Denied,
1990                        action,
1991                        collection,
1992                        key,
1993                        reason,
1994                        Some(&entry),
1995                        Vec::new(),
1996                    )?;
1997                    return Err(RedDBError::NotFound(format!(
1998                        "vault secret '{}.{}' is deleted",
1999                        collection, key
2000                    )));
2001                }
2002                match self.unseal_vault_value(collection, &entry.value) {
2003                    Ok(value) => {
2004                        self.audit_vault_unseal(
2005                            collection,
2006                            key,
2007                            crate::runtime::audit_log::Outcome::Success,
2008                            "ok",
2009                            Some(&entry),
2010                        );
2011                        self.emit_vault_control_event(
2012                            event_kind,
2013                            crate::runtime::control_events::Outcome::Allowed,
2014                            action,
2015                            collection,
2016                            key,
2017                            "ok",
2018                            Some(&entry),
2019                            Vec::new(),
2020                        )?;
2021                        let mut result = UnifiedResult::with_columns(vec![
2022                            "collection".into(),
2023                            "key".into(),
2024                            "value".into(),
2025                        ]);
2026                        let mut record = UnifiedRecord::new();
2027                        record.set("collection", Value::text(collection.clone()));
2028                        record.set("key", Value::text(key.clone()));
2029                        record.set("value", value);
2030                        result.push(record);
2031                        Ok(RuntimeQueryResult {
2032                            query: raw_query.to_string(),
2033                            mode: crate::storage::query::modes::QueryMode::Sql,
2034                            statement: "vault_unseal",
2035                            engine: "vault",
2036                            result,
2037                            affected_rows: 0,
2038                            statement_type: "select",
2039                            bookmark: None,
2040                        })
2041                    }
2042                    Err(err) => {
2043                        let reason = err.to_string();
2044                        self.audit_vault_unseal(
2045                            collection,
2046                            key,
2047                            crate::runtime::audit_log::Outcome::Error,
2048                            &reason,
2049                            Some(&entry),
2050                        );
2051                        self.emit_vault_control_event(
2052                            event_kind,
2053                            crate::runtime::control_events::Outcome::Error,
2054                            action,
2055                            collection,
2056                            key,
2057                            &reason,
2058                            Some(&entry),
2059                            Vec::new(),
2060                        )?;
2061                        Err(err)
2062                    }
2063                }
2064            }
2065
2066            KvCommand::Incr {
2067                model,
2068                collection,
2069                key,
2070                by,
2071                ttl_ms,
2072            } => {
2073                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2074                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
2075
2076                let mut result = UnifiedResult::with_columns(vec![
2077                    "ok".into(),
2078                    "collection".into(),
2079                    "key".into(),
2080                    "value".into(),
2081                ]);
2082                let mut record = UnifiedRecord::new();
2083                record.set("ok", Value::Boolean(true));
2084                record.set("collection", Value::text(collection.clone()));
2085                record.set("key", Value::text(key.clone()));
2086                record.set("value", Value::Integer(new_value));
2087                result.push(record);
2088
2089                Ok(RuntimeQueryResult {
2090                    query: raw_query.to_string(),
2091                    mode: crate::storage::query::modes::QueryMode::Sql,
2092                    statement: "kv_incr",
2093                    engine: "kv",
2094                    result,
2095                    affected_rows: 1,
2096                    statement_type: "update",
2097                    bookmark: None,
2098                })
2099            }
2100
2101            KvCommand::Cas {
2102                model,
2103                collection,
2104                key,
2105                expected,
2106                new_value,
2107                ttl_ms,
2108            } => {
2109                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2110                let (ok, current) = ops.cas(
2111                    *model,
2112                    collection,
2113                    key,
2114                    expected.as_ref(),
2115                    new_value.clone(),
2116                    *ttl_ms,
2117                )?;
2118
2119                let mut result = UnifiedResult::with_columns(vec![
2120                    "ok".into(),
2121                    "collection".into(),
2122                    "key".into(),
2123                    "current".into(),
2124                ]);
2125                let mut record = UnifiedRecord::new();
2126                record.set("ok", Value::Boolean(ok));
2127                record.set("collection", Value::text(collection.clone()));
2128                record.set("key", Value::text(key.clone()));
2129                record.set(
2130                    "current",
2131                    current.unwrap_or(crate::storage::schema::Value::Null),
2132                );
2133                result.push(record);
2134
2135                Ok(RuntimeQueryResult {
2136                    query: raw_query.to_string(),
2137                    mode: crate::storage::query::modes::QueryMode::Sql,
2138                    statement: "kv_cas",
2139                    engine: "kv",
2140                    result,
2141                    affected_rows: if ok { 1 } else { 0 },
2142                    statement_type: "update",
2143                    bookmark: None,
2144                })
2145            }
2146
2147            KvCommand::Delete {
2148                model,
2149                collection,
2150                key,
2151            } => {
2152                if *model == crate::catalog::CollectionModel::Vault {
2153                    self.check_system_vault_capability("vault:write", collection, key)
2154                        .map_err(RedDBError::Query)?;
2155                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2156                    let entry = ops.append_vault_version(
2157                        collection,
2158                        key,
2159                        Value::Null,
2160                        "delete",
2161                        true,
2162                        &[],
2163                    )?;
2164                    self.record_kv_watch_event(
2165                        crate::replication::cdc::ChangeOperation::Delete,
2166                        collection,
2167                        key,
2168                        entry.id.raw(),
2169                        None,
2170                        Some(vault_entry_metadata_json(&entry)),
2171                    );
2172                    self.audit_vault_lifecycle(
2173                        "delete",
2174                        collection,
2175                        key,
2176                        crate::runtime::audit_log::Outcome::Success,
2177                        "ok",
2178                        Some(&entry),
2179                    );
2180                    return Ok(vault_write_result(
2181                        raw_query,
2182                        "vault_delete",
2183                        "delete",
2184                        collection,
2185                        key,
2186                        &entry,
2187                        1,
2188                    ));
2189                }
2190                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2191                let deleted = ops.delete(*model, collection, key)?;
2192
2193                let mut result = UnifiedResult::with_columns(vec![
2194                    "ok".into(),
2195                    "collection".into(),
2196                    "key".into(),
2197                    "deleted".into(),
2198                ]);
2199                let mut record = UnifiedRecord::new();
2200                record.set("ok", Value::Boolean(true));
2201                record.set("collection", Value::text(collection.clone()));
2202                record.set("key", Value::text(key.clone()));
2203                record.set("deleted", Value::Boolean(deleted));
2204                result.push(record);
2205
2206                Ok(RuntimeQueryResult {
2207                    query: raw_query.to_string(),
2208                    mode: crate::storage::query::modes::QueryMode::Sql,
2209                    statement: "kv_delete",
2210                    engine: "kv",
2211                    result,
2212                    affected_rows: if deleted { 1 } else { 0 },
2213                    statement_type: "delete",
2214                    bookmark: None,
2215                })
2216            }
2217        }
2218    }
2219
2220    pub fn vault_watch_events_since(
2221        &self,
2222        collection: &str,
2223        key: &str,
2224        since_lsn: u64,
2225        max_count: usize,
2226    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2227        self.kv_watch_events_since(collection, key, since_lsn, max_count)
2228            .into_iter()
2229            .filter(|event| {
2230                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2231                    .is_ok()
2232            })
2233            .map(vault_filter_watch_event)
2234            .collect()
2235    }
2236
2237    pub fn vault_watch_events_since_prefix(
2238        &self,
2239        collection: &str,
2240        prefix: &str,
2241        since_lsn: u64,
2242        max_count: usize,
2243    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2244        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2245            .into_iter()
2246            .filter(|event| {
2247                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2248                    .is_ok()
2249            })
2250            .map(vault_filter_watch_event)
2251            .collect()
2252    }
2253
2254    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2255        let auth_store = match self.inner.auth_store.read().clone() {
2256            Some(store) => store,
2257            None => return Ok(()),
2258        };
2259        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2260            Some(identity) => identity,
2261            None => return Ok(()),
2262        };
2263        if role < crate::auth::Role::Write {
2264            return Err(RedDBError::Query(format!(
2265                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2266            )));
2267        }
2268        if !auth_store.iam_authorization_enabled() {
2269            return Ok(());
2270        }
2271
2272        let tenant = crate::runtime::impl_core::current_tenant();
2273        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2274        let mut resource =
2275            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2276        if let Some(tenant) = tenant.clone() {
2277            resource = resource.with_tenant(tenant);
2278        }
2279        let ctx = crate::auth::policies::EvalContext {
2280            principal_tenant: tenant.clone(),
2281            current_tenant: tenant,
2282            peer_ip: None,
2283            mfa_present: false,
2284            now_ms: current_unix_ms(),
2285            principal_is_admin_role: role == crate::auth::Role::Admin,
2286            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2287            principal_is_platform_scoped: principal.tenant.is_none(),
2288        };
2289        if auth_store.check_policy_authz_with_role(
2290            &principal,
2291            "kv:invalidate",
2292            &resource,
2293            &ctx,
2294            role,
2295        ) {
2296            Ok(())
2297        } else {
2298            Err(RedDBError::Query(format!(
2299                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2300            )))
2301        }
2302    }
2303}
2304
2305fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2306    let ttl_ms = ttl_ms?;
2307    Some(Metadata::with_fields(
2308        [(
2309            "_ttl_ms".to_string(),
2310            if ttl_ms <= i64::MAX as u64 {
2311                MetadataValue::Int(ttl_ms as i64)
2312            } else {
2313                MetadataValue::Timestamp(ttl_ms)
2314            },
2315        )]
2316        .into_iter()
2317        .collect(),
2318    ))
2319}
2320
2321fn vault_write_result(
2322    raw_query: &str,
2323    statement: &'static str,
2324    statement_type: &'static str,
2325    collection: &str,
2326    key: &str,
2327    entry: &VaultEntry,
2328    affected_rows: u64,
2329) -> RuntimeQueryResult {
2330    let mut result = UnifiedResult::with_columns(vec![
2331        "ok".into(),
2332        "collection".into(),
2333        "key".into(),
2334        "version".into(),
2335        "fingerprint".into(),
2336        "tombstone".into(),
2337        "op".into(),
2338        "id".into(),
2339    ]);
2340    let mut record = UnifiedRecord::new();
2341    record.set("ok", Value::Boolean(true));
2342    record.set("collection", Value::text(collection.to_string()));
2343    record.set("key", Value::text(key.to_string()));
2344    record.set("version", Value::Integer(entry.version));
2345    if entry.tombstone {
2346        record.set("fingerprint", Value::Null);
2347    } else {
2348        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2349    }
2350    record.set("tombstone", Value::Boolean(entry.tombstone));
2351    record.set("op", Value::text(entry.op.clone()));
2352    record.set("id", Value::Integer(entry.id.raw() as i64));
2353    result.push(record);
2354    RuntimeQueryResult {
2355        query: raw_query.to_string(),
2356        mode: crate::storage::query::modes::QueryMode::Sql,
2357        statement,
2358        engine: "vault",
2359        result,
2360        affected_rows,
2361        statement_type,
2362        bookmark: None,
2363    }
2364}
2365
2366fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2367    let mut result = UnifiedResult::with_columns(vec![
2368        "collection".into(),
2369        "key".into(),
2370        "version".into(),
2371        "fingerprint".into(),
2372        "tags".into(),
2373        "created_at".into(),
2374        "updated_at".into(),
2375        "status".into(),
2376        "tombstone".into(),
2377        "op".into(),
2378    ]);
2379    for entry in versions {
2380        push_vault_metadata_record(&mut result, collection, key, entry);
2381    }
2382    result
2383}
2384
2385fn push_vault_metadata_record(
2386    result: &mut UnifiedResult,
2387    collection: &str,
2388    key: &str,
2389    entry: &VaultEntry,
2390) {
2391    let mut record = UnifiedRecord::new();
2392    record.set("collection", Value::text(collection.to_string()));
2393    record.set("key", Value::text(key.to_string()));
2394    record.set("version", Value::Integer(entry.version));
2395    if entry.tombstone {
2396        record.set("fingerprint", Value::Null);
2397        record.set("status", Value::text("deleted"));
2398    } else {
2399        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2400        record.set("status", Value::text("sealed"));
2401    }
2402    record.set("tags", vault_tags_value(&entry.metadata));
2403    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2404    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2405    record.set("tombstone", Value::Boolean(entry.tombstone));
2406    record.set("op", Value::text(entry.op.clone()));
2407    result.push(record);
2408}
2409
2410fn vault_metadata_result(
2411    collection: &str,
2412    key: &str,
2413    entry: Option<&VaultEntry>,
2414    key_available: bool,
2415) -> UnifiedResult {
2416    let mut result = UnifiedResult::with_columns(vec![
2417        "collection".into(),
2418        "key".into(),
2419        "version".into(),
2420        "fingerprint".into(),
2421        "tags".into(),
2422        "created_at".into(),
2423        "updated_at".into(),
2424        "value".into(),
2425        "status".into(),
2426        "tombstone".into(),
2427        "op".into(),
2428    ]);
2429    let mut record = UnifiedRecord::new();
2430    record.set("collection", Value::text(collection.to_string()));
2431    record.set("key", Value::text(key.to_string()));
2432    match entry {
2433        Some(entry) => {
2434            record.set("version", Value::Integer(entry.version));
2435            if entry.tombstone {
2436                record.set("fingerprint", Value::Null);
2437            } else {
2438                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2439            }
2440            record.set("tags", vault_tags_value(&entry.metadata));
2441            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2442            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2443            record.set("value", Value::text("***"));
2444            record.set(
2445                "status",
2446                Value::text(if entry.tombstone {
2447                    "deleted"
2448                } else if key_available {
2449                    "sealed"
2450                } else {
2451                    "sealed_unavailable"
2452                }),
2453            );
2454            record.set("tombstone", Value::Boolean(entry.tombstone));
2455            record.set("op", Value::text(entry.op.clone()));
2456        }
2457        None => {
2458            record.set("version", Value::Null);
2459            record.set("fingerprint", Value::Null);
2460            record.set("tags", Value::Array(Vec::new()));
2461            record.set("created_at", Value::Null);
2462            record.set("updated_at", Value::Null);
2463            record.set("value", Value::text(""));
2464            record.set("status", Value::text("missing"));
2465            record.set("tombstone", Value::Boolean(false));
2466            record.set("op", Value::Null);
2467        }
2468    }
2469    result.push(record);
2470    result
2471}
2472
2473fn vault_fingerprint(value: &Value) -> String {
2474    match value {
2475        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2476        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2477    }
2478}
2479
2480fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2481    let mut object = crate::json::Map::new();
2482    object.insert(
2483        "key".to_string(),
2484        crate::json::Value::String(entry.key.clone()),
2485    );
2486    object.insert(
2487        "version".to_string(),
2488        crate::json::Value::Number(entry.version as f64),
2489    );
2490    object.insert(
2491        "fingerprint".to_string(),
2492        if entry.tombstone {
2493            crate::json::Value::Null
2494        } else {
2495            crate::json::Value::String(vault_fingerprint(&entry.value))
2496        },
2497    );
2498    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2499    object.insert(
2500        "actor".to_string(),
2501        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2502    );
2503    object.insert(
2504        "sequence_id".to_string(),
2505        crate::json::Value::Number(entry.sequence_id as f64),
2506    );
2507    object.insert(
2508        "tombstone".to_string(),
2509        crate::json::Value::Bool(entry.tombstone),
2510    );
2511    object.insert(
2512        "op".to_string(),
2513        crate::json::Value::String(entry.op.clone()),
2514    );
2515    crate::json::Value::Object(object)
2516}
2517
2518fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2519    match vault_tags_value(metadata) {
2520        Value::Array(values) => crate::json::Value::Array(
2521            values
2522                .into_iter()
2523                .filter_map(|value| match value {
2524                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2525                    _ => None,
2526                })
2527                .collect(),
2528        ),
2529        _ => crate::json::Value::Array(Vec::new()),
2530    }
2531}
2532
2533fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2534    [(
2535        "tags".to_string(),
2536        MetadataValue::Array(
2537            tags.iter()
2538                .map(|tag| MetadataValue::String(tag.clone()))
2539                .collect(),
2540        ),
2541    )]
2542    .into_iter()
2543    .collect()
2544}
2545
2546fn vault_filter_watch_event(
2547    mut event: crate::replication::cdc::KvWatchEvent,
2548) -> crate::replication::cdc::KvWatchEvent {
2549    event.before = event.before.and_then(vault_metadata_json_only);
2550    event.after = event.after.and_then(vault_metadata_json_only);
2551    event
2552}
2553
2554fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2555    let object = value.as_object()?;
2556    let mut out = crate::json::Map::new();
2557    for field in [
2558        "key",
2559        "version",
2560        "fingerprint",
2561        "tags",
2562        "actor",
2563        "sequence_id",
2564        "tombstone",
2565        "op",
2566    ] {
2567        if let Some(value) = object.get(field) {
2568            out.insert(field.to_string(), value.clone());
2569        }
2570    }
2571    Some(crate::json::Value::Object(out))
2572}
2573
2574fn vault_tags_value(metadata: &Metadata) -> Value {
2575    match metadata.get("tags") {
2576        Some(MetadataValue::Array(values)) => Value::Array(
2577            values
2578                .iter()
2579                .filter_map(|value| match value {
2580                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2581                    _ => None,
2582                })
2583                .collect(),
2584        ),
2585        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2586            Value::Array(vec![Value::text(tag.clone())])
2587        }
2588        _ => Value::Array(Vec::new()),
2589    }
2590}
2591
2592fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2593    let bytes = hex::decode(hex_key)
2594        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2595    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2596        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2597    })?;
2598    Ok(key)
2599}
2600
2601fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2602    if tags.is_empty() {
2603        return None;
2604    }
2605    let values = tags
2606        .iter()
2607        .map(|tag| MetadataValue::String(tag.clone()))
2608        .collect();
2609    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2610}
2611
2612fn kv_tags_value(tags: &[String]) -> Value {
2613    let json = crate::json::Value::Array(
2614        tags.iter()
2615            .map(|tag| crate::json::Value::String(tag.clone()))
2616            .collect(),
2617    );
2618    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2619}
2620
2621fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2622    if let crate::storage::EntityData::Row(ref row) = entity.data {
2623        if let Some(ref named) = row.named {
2624            return named.get("value").cloned();
2625        }
2626    }
2627    None
2628}
2629
2630fn insert_kv_json_path(root: &mut crate::json::Value, path: &str, value: crate::json::Value) {
2631    let segments: Vec<&str> = path
2632        .split('.')
2633        .filter(|segment| !segment.is_empty())
2634        .collect();
2635    insert_kv_json_segments(root, &segments, value);
2636}
2637
2638fn insert_kv_json_segments(
2639    root: &mut crate::json::Value,
2640    segments: &[&str],
2641    value: crate::json::Value,
2642) {
2643    if segments.is_empty() {
2644        *root = value;
2645        return;
2646    }
2647
2648    if !matches!(root, crate::json::Value::Object(_)) {
2649        *root = crate::json::Value::Object(crate::json::Map::new());
2650    }
2651
2652    let crate::json::Value::Object(map) = root else {
2653        return;
2654    };
2655    if segments.len() == 1 {
2656        map.insert(segments[0].to_string(), value);
2657        return;
2658    }
2659    let entry = map
2660        .entry(segments[0].to_string())
2661        .or_insert_with(|| crate::json::Value::Object(crate::json::Map::new()));
2662    insert_kv_json_segments(entry, &segments[1..], value);
2663}
2664
2665fn kv_list_json_result(
2666    raw_query: &str,
2667    collection: &str,
2668    prefix: &Option<String>,
2669    value: crate::json::Value,
2670) -> RuntimeQueryResult {
2671    let mut result =
2672        UnifiedResult::with_columns(vec!["collection".into(), "prefix".into(), "value".into()]);
2673    let mut record = UnifiedRecord::new();
2674    record.set("collection", Value::text(collection.to_string()));
2675    record.set(
2676        "prefix",
2677        prefix
2678            .as_ref()
2679            .map(|prefix| Value::text(prefix.clone()))
2680            .unwrap_or(Value::Null),
2681    );
2682    record.set("value", Value::Json(value.to_string_compact().into_bytes()));
2683    result.push(record);
2684    RuntimeQueryResult {
2685        query: raw_query.to_string(),
2686        mode: crate::storage::query::modes::QueryMode::Sql,
2687        statement: "kv_list_json",
2688        engine: "kv",
2689        result,
2690        affected_rows: 0,
2691        statement_type: "select",
2692        bookmark: None,
2693    }
2694}
2695
2696fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2697    let now = current_unix_ms();
2698    crate::physical::CollectionContract {
2699        name: name.to_string(),
2700        declared_model: crate::catalog::CollectionModel::Kv,
2701        schema_mode: crate::catalog::SchemaMode::Dynamic,
2702        origin: crate::physical::ContractOrigin::Implicit,
2703        version: 1,
2704        created_at_unix_ms: now,
2705        updated_at_unix_ms: now,
2706        default_ttl_ms: None,
2707        vector_dimension: None,
2708        vector_metric: None,
2709        context_index_fields: Vec::new(),
2710        declared_columns: Vec::new(),
2711        table_def: None,
2712        timestamps_enabled: false,
2713        context_index_enabled: false,
2714        metrics_raw_retention_ms: None,
2715        metrics_rollup_policies: Vec::new(),
2716        metrics_tenant_identity: None,
2717        metrics_namespace: None,
2718        append_only: false,
2719        subscriptions: Vec::new(),
2720        analytics_config: Vec::new(),
2721        session_key: None,
2722        session_gap_ms: None,
2723        retention_duration_ms: None,
2724        analytical_storage: None,
2725    }
2726}
2727
2728fn current_unix_ms() -> u128 {
2729    std::time::SystemTime::now()
2730        .duration_since(std::time::UNIX_EPOCH)
2731        .unwrap_or_default()
2732        .as_millis()
2733}
2734
2735#[cfg(test)]
2736mod tests {
2737    use crate::api::RedDBOptions;
2738    use crate::catalog::CollectionModel;
2739    use crate::runtime::RedDBRuntime;
2740
2741    fn rt() -> RedDBRuntime {
2742        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2743    }
2744
2745    #[test]
2746    fn incr_missing_key_initialises_at_by() {
2747        let r = rt();
2748        let ops = super::KvAtomicOps::new(&r);
2749        let v = ops
2750            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2751            .unwrap();
2752        assert_eq!(v, 5);
2753    }
2754
2755    #[test]
2756    fn kv_runtime_stats_count_public_ops() {
2757        let r = rt();
2758        let ops = super::KvAtomicOps::new(&r);
2759
2760        ops.set(
2761            CollectionModel::Kv,
2762            "kv_default",
2763            "profile",
2764            crate::storage::schema::Value::text("alice"),
2765            None,
2766            false,
2767        )
2768        .unwrap();
2769        ops.get(CollectionModel::Kv, "kv_default", "profile")
2770            .unwrap();
2771        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2772            .unwrap();
2773        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2774            .unwrap();
2775        ops.cas(
2776            CollectionModel::Kv,
2777            "kv_default",
2778            "profile",
2779            None,
2780            crate::storage::schema::Value::text("created"),
2781            None,
2782        )
2783        .unwrap();
2784        ops.cas(
2785            CollectionModel::Kv,
2786            "kv_default",
2787            "profile",
2788            Some(&crate::storage::schema::Value::text("different")),
2789            crate::storage::schema::Value::text("ignored"),
2790            None,
2791        )
2792        .unwrap();
2793
2794        let stats = r.stats().kv;
2795        assert_eq!(stats.puts, 1);
2796        assert_eq!(stats.gets, 1);
2797        assert_eq!(stats.deletes, 1);
2798        assert_eq!(stats.incrs, 1);
2799        assert_eq!(stats.cas_success, 1);
2800        assert_eq!(stats.cas_conflict, 1);
2801    }
2802
2803    #[test]
2804    fn kv_invalidate_tags_removes_matching_entries_only() {
2805        let r = rt();
2806
2807        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2808            .unwrap();
2809
2810        let miss = r
2811            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2812            .unwrap();
2813        assert_eq!(miss.affected_rows, 0);
2814        assert!(matches!(
2815            r.execute_query("KV GET sessions.blob")
2816                .unwrap()
2817                .result
2818                .records[0]
2819                .get("value"),
2820            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2821        ));
2822
2823        let hit = r
2824            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2825            .unwrap();
2826        assert_eq!(hit.affected_rows, 1);
2827        assert!(matches!(
2828            r.execute_query("KV GET sessions.blob")
2829                .unwrap()
2830                .result
2831                .records[0]
2832                .get("value"),
2833            Some(crate::storage::schema::Value::Null)
2834        ));
2835    }
2836
2837    #[test]
2838    fn kv_runtime_stats_count_watch_streams_and_events() {
2839        let r = rt();
2840        let ops = super::KvAtomicOps::new(&r);
2841        assert_eq!(r.stats().kv.watch_streams_active, 0);
2842
2843        {
2844            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2845            assert_eq!(r.stats().kv.watch_streams_active, 1);
2846
2847            ops.set(
2848                CollectionModel::Kv,
2849                "kv_default",
2850                "watched",
2851                crate::storage::schema::Value::Integer(1),
2852                None,
2853                false,
2854            )
2855            .unwrap();
2856            let event = stream.poll_next().expect("watch event");
2857            assert_eq!(event.key, "watched");
2858            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2859
2860            stream.record_drop_count(3);
2861            assert_eq!(r.stats().kv.watch_drops, 3);
2862        }
2863
2864        assert_eq!(r.stats().kv.watch_streams_active, 0);
2865    }
2866
2867    #[test]
2868    fn incr_existing_integer_accumulates() {
2869        let r = rt();
2870        let ops = super::KvAtomicOps::new(&r);
2871        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2872            .unwrap();
2873        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2874            .unwrap();
2875        let v = ops
2876            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2877            .unwrap();
2878        assert_eq!(v, 3);
2879    }
2880
2881    #[test]
2882    fn decr_via_negative_by() {
2883        let r = rt();
2884        let ops = super::KvAtomicOps::new(&r);
2885        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2886            .unwrap();
2887        let v = ops
2888            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2889            .unwrap();
2890        assert_eq!(v, 7);
2891    }
2892
2893    #[test]
2894    fn concurrent_incr_single_key_is_atomic() {
2895        const THREADS: usize = 8;
2896        const ITERS: usize = 1000;
2897
2898        let runtime = std::sync::Arc::new(rt());
2899        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2900        let mut handles = Vec::new();
2901
2902        for _ in 0..THREADS {
2903            let runtime = std::sync::Arc::clone(&runtime);
2904            let barrier = std::sync::Arc::clone(&barrier);
2905            handles.push(std::thread::spawn(move || {
2906                let ops = super::KvAtomicOps::new(&runtime);
2907                barrier.wait();
2908                for _ in 0..ITERS {
2909                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2910                        .unwrap();
2911                }
2912            }));
2913        }
2914
2915        for handle in handles {
2916            handle.join().expect("worker should finish");
2917        }
2918
2919        let ops = super::KvAtomicOps::new(&runtime);
2920        assert_eq!(
2921            ops.get(CollectionModel::Kv, "kv_default", "counter")
2922                .unwrap(),
2923            Some(crate::storage::schema::Value::Integer(
2924                (THREADS * ITERS) as i64
2925            ))
2926        );
2927    }
2928
2929    #[test]
2930    fn incr_on_string_value_returns_error() {
2931        let r = rt();
2932        let ops = super::KvAtomicOps::new(&r);
2933        ops.set(
2934            CollectionModel::Kv,
2935            "kv_default",
2936            "name",
2937            crate::storage::schema::Value::text("alice"),
2938            None,
2939            false,
2940        )
2941        .unwrap();
2942        let err = ops
2943            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2944            .unwrap_err();
2945        assert!(err.to_string().contains("non-integer"));
2946    }
2947
2948    // --- CAS tests ---
2949
2950    #[test]
2951    fn cas_matching_value_succeeds() {
2952        let r = rt();
2953        let ops = super::KvAtomicOps::new(&r);
2954        ops.set(
2955            CollectionModel::Kv,
2956            "kv_default",
2957            "lock",
2958            crate::storage::schema::Value::text("free"),
2959            None,
2960            false,
2961        )
2962        .unwrap();
2963        let (ok, prev) = ops
2964            .cas(
2965                CollectionModel::Kv,
2966                "kv_default",
2967                "lock",
2968                Some(&crate::storage::schema::Value::text("free")),
2969                crate::storage::schema::Value::text("held"),
2970                None,
2971            )
2972            .unwrap();
2973        assert!(ok);
2974        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2975        // Value actually changed.
2976        assert_eq!(
2977            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2978            Some(crate::storage::schema::Value::text("held"))
2979        );
2980    }
2981
2982    #[test]
2983    fn concurrent_cas_allows_one_success_per_round() {
2984        const THREADS: usize = 8;
2985        const ROUNDS: usize = 100;
2986
2987        let runtime = std::sync::Arc::new(rt());
2988        let ops = super::KvAtomicOps::new(&runtime);
2989        ops.set(
2990            CollectionModel::Kv,
2991            "kv_default",
2992            "cas_counter",
2993            crate::storage::schema::Value::Integer(0),
2994            None,
2995            false,
2996        )
2997        .unwrap();
2998
2999        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3000        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3001        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
3002        let mut handles = Vec::new();
3003
3004        for _ in 0..THREADS {
3005            let runtime = std::sync::Arc::clone(&runtime);
3006            let start_round = std::sync::Arc::clone(&start_round);
3007            let finish_round = std::sync::Arc::clone(&finish_round);
3008            let successes = std::sync::Arc::clone(&successes);
3009            handles.push(std::thread::spawn(move || {
3010                let ops = super::KvAtomicOps::new(&runtime);
3011                for round in 0..ROUNDS {
3012                    start_round.wait();
3013                    let (ok, _) = ops
3014                        .cas(
3015                            CollectionModel::Kv,
3016                            "kv_default",
3017                            "cas_counter",
3018                            Some(&crate::storage::schema::Value::Integer(round as i64)),
3019                            crate::storage::schema::Value::Integer((round + 1) as i64),
3020                            None,
3021                        )
3022                        .unwrap();
3023                    if ok {
3024                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3025                    }
3026                    finish_round.wait();
3027                }
3028            }));
3029        }
3030
3031        for handle in handles {
3032            handle.join().expect("worker should finish");
3033        }
3034
3035        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
3036        assert_eq!(
3037            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
3038                .unwrap(),
3039            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
3040        );
3041    }
3042
3043    #[test]
3044    fn cas_mismatching_value_fails() {
3045        let r = rt();
3046        let ops = super::KvAtomicOps::new(&r);
3047        ops.set(
3048            CollectionModel::Kv,
3049            "kv_default",
3050            "lock",
3051            crate::storage::schema::Value::text("free"),
3052            None,
3053            false,
3054        )
3055        .unwrap();
3056        let (ok, current) = ops
3057            .cas(
3058                CollectionModel::Kv,
3059                "kv_default",
3060                "lock",
3061                Some(&crate::storage::schema::Value::text("held")),
3062                crate::storage::schema::Value::text("worker-7"),
3063                None,
3064            )
3065            .unwrap();
3066        assert!(!ok);
3067        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
3068        // Value unchanged.
3069        assert_eq!(
3070            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3071            Some(crate::storage::schema::Value::text("free"))
3072        );
3073    }
3074
3075    #[test]
3076    fn cas_expect_null_on_missing_key_creates() {
3077        let r = rt();
3078        let ops = super::KvAtomicOps::new(&r);
3079        let (ok, prev) = ops
3080            .cas(
3081                CollectionModel::Kv,
3082                "kv_default",
3083                "new_key",
3084                None,
3085                crate::storage::schema::Value::text("created"),
3086                None,
3087            )
3088            .unwrap();
3089        assert!(ok);
3090        assert_eq!(prev, None);
3091        assert_eq!(
3092            ops.get(CollectionModel::Kv, "kv_default", "new_key")
3093                .unwrap(),
3094            Some(crate::storage::schema::Value::text("created"))
3095        );
3096    }
3097
3098    #[test]
3099    fn cas_expect_null_on_existing_key_fails() {
3100        let r = rt();
3101        let ops = super::KvAtomicOps::new(&r);
3102        ops.set(
3103            CollectionModel::Kv,
3104            "kv_default",
3105            "taken",
3106            crate::storage::schema::Value::text("worker-1"),
3107            None,
3108            false,
3109        )
3110        .unwrap();
3111        let (ok, current) = ops
3112            .cas(
3113                CollectionModel::Kv,
3114                "kv_default",
3115                "taken",
3116                None,
3117                crate::storage::schema::Value::text("worker-2"),
3118                None,
3119            )
3120            .unwrap();
3121        assert!(!ok);
3122        assert_eq!(
3123            current,
3124            Some(crate::storage::schema::Value::text("worker-1"))
3125        );
3126    }
3127
3128    #[test]
3129    fn cas_via_sql_roundtrip() {
3130        let r = rt();
3131        // Seed value.
3132        r.execute_query("KV PUT lock = 'free'").unwrap();
3133        // CAS: free → held — should succeed.
3134        let res = r
3135            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3136            .unwrap();
3137        let row = &res.result.records[0];
3138        assert_eq!(
3139            row.get("ok"),
3140            Some(&crate::storage::schema::Value::Boolean(true))
3141        );
3142        // CAS: free → held again — should fail (value is now 'held').
3143        let res2 = r
3144            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3145            .unwrap();
3146        let row2 = &res2.result.records[0];
3147        assert_eq!(
3148            row2.get("ok"),
3149            Some(&crate::storage::schema::Value::Boolean(false))
3150        );
3151    }
3152
3153    #[test]
3154    fn cas_expect_null_via_sql() {
3155        let r = rt();
3156        let res = r
3157            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
3158            .unwrap();
3159        let row = &res.result.records[0];
3160        assert_eq!(
3161            row.get("ok"),
3162            Some(&crate::storage::schema::Value::Boolean(true))
3163        );
3164        // Second call must fail.
3165        let res2 = r
3166            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
3167            .unwrap();
3168        let row2 = &res2.result.records[0];
3169        assert_eq!(
3170            row2.get("ok"),
3171            Some(&crate::storage::schema::Value::Boolean(false))
3172        );
3173    }
3174
3175    #[test]
3176    fn incr_via_sql_roundtrip() {
3177        let r = rt();
3178        let res = r.execute_query("KV INCR hits").unwrap();
3179        let row = &res.result.records[0];
3180        assert_eq!(
3181            row.get("value"),
3182            Some(&crate::storage::schema::Value::Integer(1))
3183        );
3184        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
3185        let row2 = &res2.result.records[0];
3186        assert_eq!(
3187            row2.get("value"),
3188            Some(&crate::storage::schema::Value::Integer(5))
3189        );
3190    }
3191
3192    #[test]
3193    fn concurrent_self_referential_update_is_atomic() {
3194        const THREADS: usize = 8;
3195        const ITERS: usize = 100;
3196
3197        let runtime = std::sync::Arc::new(rt());
3198        runtime
3199            .execute_query("CREATE TABLE counters (id INT, n INT)")
3200            .unwrap();
3201        runtime
3202            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3203            .unwrap();
3204
3205        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3206        let mut handles = Vec::new();
3207        for _ in 0..THREADS {
3208            let runtime = std::sync::Arc::clone(&runtime);
3209            let barrier = std::sync::Arc::clone(&barrier);
3210            handles.push(std::thread::spawn(move || {
3211                barrier.wait();
3212                for _ in 0..ITERS {
3213                    runtime
3214                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3215                        .unwrap();
3216                }
3217            }));
3218        }
3219
3220        for handle in handles {
3221            handle.join().expect("worker should finish");
3222        }
3223
3224        let selected = runtime
3225            .execute_query("SELECT n FROM counters WHERE id = 1")
3226            .unwrap();
3227        assert_eq!(
3228            selected.result.records[0].get("n"),
3229            Some(&crate::storage::schema::Value::Integer(
3230                (THREADS * ITERS) as i64
3231            ))
3232        );
3233    }
3234
3235    #[test]
3236    fn watch_stream_delivers_key_events_in_lsn_order() {
3237        let r = rt();
3238        let ops = super::KvAtomicOps::new(&r);
3239        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3240
3241        ops.set(
3242            CollectionModel::Kv,
3243            "kv_default",
3244            "seq",
3245            crate::storage::schema::Value::Integer(1),
3246            None,
3247            false,
3248        )
3249        .unwrap();
3250        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3251            .unwrap();
3252        ops.delete(CollectionModel::Kv, "kv_default", "seq")
3253            .unwrap();
3254        ops.set(
3255            CollectionModel::Kv,
3256            "kv_default",
3257            "seq",
3258            crate::storage::schema::Value::Integer(9),
3259            None,
3260            false,
3261        )
3262        .unwrap();
3263
3264        let mut events = Vec::new();
3265        while let Some(event) = stream.poll_next() {
3266            events.push(event);
3267            if events.len() == 4 {
3268                break;
3269            }
3270        }
3271
3272        assert_eq!(events.len(), 4);
3273        assert_eq!(
3274            events[0].op,
3275            crate::replication::cdc::ChangeOperation::Insert
3276        );
3277        assert_eq!(
3278            events[1].op,
3279            crate::replication::cdc::ChangeOperation::Update
3280        );
3281        assert_eq!(
3282            events[2].op,
3283            crate::replication::cdc::ChangeOperation::Delete
3284        );
3285        assert_eq!(
3286            events[3].op,
3287            crate::replication::cdc::ChangeOperation::Insert
3288        );
3289        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3290    }
3291
3292    #[test]
3293    fn watch_prefix_stream_delivers_matching_events_only() {
3294        let r = rt();
3295        let ops = super::KvAtomicOps::new(&r);
3296        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3297
3298        ops.set(
3299            CollectionModel::Kv,
3300            "kv_default",
3301            "acct:1",
3302            crate::storage::schema::Value::Integer(1),
3303            None,
3304            false,
3305        )
3306        .unwrap();
3307        ops.set(
3308            CollectionModel::Kv,
3309            "kv_default",
3310            "session:1",
3311            crate::storage::schema::Value::Integer(2),
3312            None,
3313            false,
3314        )
3315        .unwrap();
3316        ops.set(
3317            CollectionModel::Kv,
3318            "kv_default",
3319            "acct:2",
3320            crate::storage::schema::Value::Integer(3),
3321            None,
3322            false,
3323        )
3324        .unwrap();
3325
3326        let first = stream.poll_next().expect("first prefix event");
3327        let second = stream.poll_next().expect("second prefix event");
3328        assert_eq!(first.key, "acct:1");
3329        assert_eq!(second.key, "acct:2");
3330        assert!(stream.poll_next().is_none());
3331    }
3332
3333    #[test]
3334    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3335        let r = rt();
3336        let ops = super::KvAtomicOps::new(&r);
3337        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3338
3339        let mut last_seen_lsn = 0;
3340        for value in 0..5 {
3341            ops.set(
3342                CollectionModel::Kv,
3343                "kv_default",
3344                "resume",
3345                crate::storage::schema::Value::Integer(value),
3346                None,
3347                false,
3348            )
3349            .unwrap();
3350            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3351        }
3352        drop(stream);
3353
3354        for value in 5..55 {
3355            ops.set(
3356                CollectionModel::Kv,
3357                "kv_default",
3358                "resume",
3359                crate::storage::schema::Value::Integer(value),
3360                None,
3361                false,
3362            )
3363            .unwrap();
3364        }
3365
3366        let expected_lsns: Vec<u64> = r
3367            .kv_watch_events_since("kv_default", "resume", last_seen_lsn, 200)
3368            .into_iter()
3369            .map(|event| event.lsn)
3370            .collect();
3371        assert!(!expected_lsns.is_empty());
3372
3373        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3374        let mut lsns = Vec::new();
3375        while let Some(event) = resumed.poll_next() {
3376            lsns.push(event.lsn);
3377            if lsns.len() == expected_lsns.len() {
3378                break;
3379            }
3380        }
3381
3382        assert_eq!(lsns, expected_lsns);
3383        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3384        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3385        assert!(resumed.poll_next().is_none());
3386    }
3387
3388    #[test]
3389    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3390        let r = rt();
3391        let ops = super::KvAtomicOps::new(&r);
3392        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3393
3394        for value in 0..1_200 {
3395            ops.set(
3396                CollectionModel::Kv,
3397                "kv_default",
3398                "slow",
3399                crate::storage::schema::Value::Integer(value),
3400                None,
3401                false,
3402            )
3403            .unwrap();
3404        }
3405
3406        let event = stream.poll_next().expect("tail event after drops");
3407        assert!(event.lsn > 1);
3408        assert!(event.dropped_event_count > 0);
3409        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3410        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3411    }
3412
3413    #[test]
3414    fn watch_stream_idle_timeout_closes_subscription() {
3415        let r = rt();
3416        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3417            .unwrap();
3418
3419        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3420        assert_eq!(r.stats().kv.watch_streams_active, 1);
3421        std::thread::sleep(std::time::Duration::from_millis(5));
3422
3423        assert!(stream.poll_next().is_none());
3424        assert_eq!(r.stats().kv.watch_streams_active, 0);
3425    }
3426
3427    #[test]
3428    fn watch_stream_does_not_emit_rolled_back_put() {
3429        let r = rt();
3430        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3431
3432        r.execute_query("BEGIN").unwrap();
3433        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3434        r.execute_query("ROLLBACK").unwrap();
3435
3436        assert!(stream.poll_next().is_none());
3437    }
3438}