Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604        self.vault_versions(collection, key)
605            .map(super::keyed_spine::latest_version)
606    }
607
608    fn get_vault_entry_version(
609        &self,
610        collection: &str,
611        key: &str,
612        version: i64,
613    ) -> RedDBResult<Option<VaultEntry>> {
614        Ok(self
615            .vault_versions(collection, key)?
616            .into_iter()
617            .find(|entry| entry.version == version))
618    }
619
620    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622        let store = self.runtime.inner.db.store();
623        let Some(manager) = store.get_collection(collection) else {
624            return Ok(Vec::new());
625        };
626        let entities = manager.query_all(|_| true);
627        let mut versions = Vec::new();
628        for entity in entities {
629            let crate::storage::EntityData::Row(ref row) = entity.data else {
630                continue;
631            };
632            let Some(version) =
633                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634            else {
635                continue;
636            };
637            if version.key != key {
638                continue;
639            }
640            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641            versions.push(VaultEntry::from_keyed_row(
642                version,
643                metadata,
644                entity.created_at,
645                entity.updated_at,
646                entity.sequence_id,
647            ));
648        }
649        Ok(versions)
650    }
651
652    fn latest_vault_entries(
653        &self,
654        collection: &str,
655        prefix: Option<&str>,
656    ) -> RedDBResult<Vec<VaultEntry>> {
657        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658        let store = self.runtime.inner.db.store();
659        let Some(manager) = store.get_collection(collection) else {
660            return Ok(Vec::new());
661        };
662        let mut versions = Vec::new();
663        for entity in manager.query_all(|_| true) {
664            let crate::storage::EntityData::Row(ref row) = entity.data else {
665                continue;
666            };
667            let Some(version) =
668                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669            else {
670                continue;
671            };
672            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673            let entry = VaultEntry::from_keyed_row(
674                version,
675                metadata,
676                entity.created_at,
677                entity.updated_at,
678                entity.sequence_id,
679            );
680            versions.push(entry);
681        }
682        Ok(super::keyed_spine::latest_versions(versions, prefix))
683    }
684
685    fn append_vault_version(
686        &self,
687        collection: &str,
688        key: &str,
689        value: crate::storage::schema::Value,
690        op: &str,
691        tombstone: bool,
692        tags: &[String],
693    ) -> RedDBResult<VaultEntry> {
694        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695        let version = self
696            .get_vault_entry(collection, key)?
697            .map(|entry| entry.version)
698            .unwrap_or(0)
699            + 1;
700        let stored_value = if tombstone {
701            crate::storage::schema::Value::Null
702        } else {
703            self.runtime.seal_vault_value(collection, value)?
704        };
705        let now = current_unix_ms() as i64;
706        let fields = vec![
707            (
708                "key".to_string(),
709                crate::storage::schema::Value::text(key.to_string()),
710            ),
711            ("value".to_string(), stored_value),
712            (
713                "version".to_string(),
714                crate::storage::schema::Value::Integer(version),
715            ),
716            (
717                "tombstone".to_string(),
718                crate::storage::schema::Value::Boolean(tombstone),
719            ),
720            (
721                "op".to_string(),
722                crate::storage::schema::Value::text(op.to_string()),
723            ),
724            (
725                "created_at_ms".to_string(),
726                crate::storage::schema::Value::Integer(now),
727            ),
728        ];
729        let mut row = crate::storage::RowData::new(Vec::new());
730        row.named = Some(fields.into_iter().collect());
731        let entity = crate::storage::UnifiedEntity::new(
732            crate::storage::EntityId::new(0),
733            crate::storage::EntityKind::TableRow {
734                table: std::sync::Arc::from(collection),
735                row_id: 0,
736            },
737            crate::storage::EntityData::Row(row),
738        );
739        let id = self
740            .runtime
741            .inner
742            .db
743            .store()
744            .insert(collection, entity)
745            .map_err(|err| RedDBError::Internal(err.to_string()))?;
746        if !tags.is_empty() {
747            self.runtime
748                .inner
749                .db
750                .store()
751                .set_metadata(
752                    collection,
753                    id,
754                    Metadata::with_fields(vault_tags_metadata(tags)),
755                )
756                .map_err(|err| RedDBError::Internal(err.to_string()))?;
757            self.runtime
758                .inner
759                .kv_tag_index
760                .replace(collection, key, id, tags);
761        }
762        self.get_vault_entry_version(collection, key, version)?
763            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764    }
765
766    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768        let versions = self.vault_versions(collection, key)?;
769        let store = self.runtime.inner.db.store();
770        let mut purged = 0usize;
771        for entry in versions {
772            if store
773                .delete(collection, entry.id)
774                .map_err(|err| RedDBError::Internal(err.to_string()))?
775            {
776                store.context_index().remove_entity(entry.id);
777                purged += 1;
778            }
779        }
780        Ok(purged)
781    }
782
783    fn ensure_declared_model(
784        &self,
785        model: crate::catalog::CollectionModel,
786        collection: &str,
787    ) -> RedDBResult<()> {
788        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789            return Ok(());
790        };
791        if contract.declared_model == model
792            || contract.declared_model == crate::catalog::CollectionModel::Mixed
793        {
794            return Ok(());
795        }
796        Err(RedDBError::InvalidOperation(format!(
797            "collection '{}' is declared as '{}' and does not allow '{}' operations",
798            collection,
799            keyed_model_name(contract.declared_model),
800            keyed_model_name(model)
801        )))
802    }
803}
804
805impl RedDBRuntime {
806    pub(crate) fn seal_vault_value(
807        &self,
808        collection: &str,
809        value: crate::storage::schema::Value,
810    ) -> RedDBResult<crate::storage::schema::Value> {
811        let key = self.vault_encryption_key(collection)?;
812        let plaintext = value.to_bytes();
813        let nonce_bytes = crate::auth::store::random_bytes(12);
814        let mut nonce = [0u8; 12];
815        nonce.copy_from_slice(&nonce_bytes[..12]);
816        let aad = format!("reddb.vault.{collection}");
817        let ciphertext =
818            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819        let mut payload = Vec::with_capacity(12 + ciphertext.len());
820        payload.extend_from_slice(&nonce);
821        payload.extend_from_slice(&ciphertext);
822        Ok(crate::storage::schema::Value::Secret(payload))
823    }
824
825    fn vault_key_available(&self, collection: &str) -> bool {
826        self.vault_encryption_key(collection).is_ok()
827    }
828
829    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832        })?;
833        if !auth_store.is_vault_backed() {
834            return Err(RedDBError::Query(
835                "vault sealed_unavailable: key provider is sealed".to_string(),
836            ));
837        }
838
839        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840            return decode_vault_key(&hex_key);
841        }
842        auth_store.vault_secret_key().ok_or_else(|| {
843            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844        })
845    }
846
847    fn unseal_vault_value(
848        &self,
849        collection: &str,
850        sealed: &crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let crate::storage::schema::Value::Secret(payload) = sealed else {
853            return Err(RedDBError::Query(
854                "vault unseal failed: stored value is not sealed".to_string(),
855            ));
856        };
857        if payload.len() < 12 {
858            return Err(RedDBError::Query(
859                "vault unseal failed: sealed payload is truncated".to_string(),
860            ));
861        }
862        let key = self.vault_encryption_key(collection)?;
863        let mut nonce = [0u8; 12];
864        nonce.copy_from_slice(&payload[..12]);
865        let aad = format!("reddb.vault.{collection}");
866        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867            &key,
868            &nonce,
869            aad.as_bytes(),
870            &payload[12..],
871        )
872        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873        let (value, consumed) =
874            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876            })?;
877        if consumed != plaintext.len() {
878            return Err(RedDBError::Query(
879                "vault unseal failed: trailing plaintext bytes".to_string(),
880            ));
881        }
882        Ok(value)
883    }
884
885    /// Internal peek of a vault entry's unsealed value, bypassing audit and
886    /// policy checks. Used by `SecretRefGuard` to detect chained `secret_ref`
887    /// indirection at config write time without emitting vault read events.
888    /// Returns `Ok(None)` if the entry is absent, tombstoned, or its sealed
889    /// payload cannot be decrypted with the available key material.
890    pub(crate) fn peek_vault_unsealed(
891        &self,
892        collection: &str,
893        key: &str,
894    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
895        let ops = KvAtomicOps::new(self);
896        let Some(entry) = ops.get_vault_entry(collection, key)? else {
897            return Ok(None);
898        };
899        if entry.tombstone {
900            return Ok(None);
901        }
902        Ok(self.unseal_vault_value(collection, &entry.value).ok())
903    }
904
905    fn vault_target_resource(collection: &str, key: &str) -> String {
906        if collection == "red.vault" {
907            return format!("red.vault/{}", key.to_ascii_lowercase());
908        }
909        format!("{collection}.{key}")
910    }
911
912    fn current_vault_actor() -> String {
913        current_auth_identity()
914            .map(|(principal, _)| principal)
915            .unwrap_or_else(|| "anonymous".to_string())
916    }
917
918    fn vault_request_id() -> String {
919        let conn_id = current_connection_id();
920        if conn_id == 0 {
921            "embedded".to_string()
922        } else {
923            format!("conn-{conn_id}")
924        }
925    }
926
927    fn check_vault_capability(
928        &self,
929        action: &str,
930        collection: &str,
931        key: &str,
932    ) -> Result<(), String> {
933        let Some(auth_store) = self.inner.auth_store.read().clone() else {
934            return Ok(());
935        };
936        if !auth_store.iam_authorization_enabled() {
937            return Ok(());
938        }
939        let Some((principal, role)) = current_auth_identity() else {
940            return Err(
941                "IAM authorization is enabled; vault capability check requires an authenticated principal"
942                    .to_string(),
943            );
944        };
945        let tenant = current_tenant();
946        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
947        let mut resource = crate::auth::policies::ResourceRef::new(
948            "vault",
949            Self::vault_target_resource(collection, key),
950        );
951        if let Some(ref tenant) = tenant {
952            resource = resource.with_tenant(tenant.clone());
953        }
954        let ctx = crate::auth::policies::EvalContext {
955            principal_tenant: tenant.clone(),
956            current_tenant: tenant,
957            peer_ip: None,
958            mfa_present: false,
959            now_ms: crate::utils::now_unix_millis() as u128,
960            principal_is_admin_role: role == crate::auth::Role::Admin,
961            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
962            principal_is_platform_scoped: principal_id.tenant.is_none(),
963        };
964        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
965            Ok(())
966        } else {
967            Err(format!(
968                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
969                principal,
970                action,
971                Self::vault_target_resource(collection, key)
972            ))
973        }
974    }
975
976    fn check_system_vault_capability(
977        &self,
978        action: &str,
979        collection: &str,
980        key: &str,
981    ) -> Result<(), String> {
982        if collection != "red.vault" {
983            return Ok(());
984        }
985        self.check_vault_capability(action, collection, key)
986    }
987
988    fn audit_vault_unseal(
989        &self,
990        collection: &str,
991        key: &str,
992        outcome: crate::runtime::audit_log::Outcome,
993        reason: &str,
994        entry: Option<&VaultEntry>,
995    ) {
996        let actor = Self::current_vault_actor();
997        let request_id = Self::vault_request_id();
998        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
999            .principal(actor.clone())
1000            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1001            .resource(format!(
1002                "vault:{}",
1003                Self::vault_target_resource(collection, key)
1004            ))
1005            .outcome(outcome)
1006            .correlation_id(request_id.clone())
1007            .fields([
1008                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1009                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1010                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1011                crate::runtime::audit_log::AuditFieldEscaper::field(
1012                    "target",
1013                    Self::vault_target_resource(collection, key),
1014                ),
1015                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1016                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1017                crate::runtime::audit_log::AuditFieldEscaper::field(
1018                    "connection_id",
1019                    current_connection_id(),
1020                ),
1021            ]);
1022        if let Some(tenant) = current_tenant() {
1023            builder = builder.tenant(tenant);
1024        }
1025        if let Some(entry) = entry {
1026            builder = builder.fields([
1027                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1028                crate::runtime::audit_log::AuditFieldEscaper::field(
1029                    "sequence_id",
1030                    entry.sequence_id,
1031                ),
1032            ]);
1033        }
1034        self.audit_log().record_event(builder.build());
1035    }
1036
1037    fn audit_vault_lifecycle(
1038        &self,
1039        operation: &str,
1040        collection: &str,
1041        key: &str,
1042        outcome: crate::runtime::audit_log::Outcome,
1043        reason: &str,
1044        entry: Option<&VaultEntry>,
1045    ) {
1046        let actor = Self::current_vault_actor();
1047        let request_id = Self::vault_request_id();
1048        let mut builder =
1049            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1050                .principal(actor.clone())
1051                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1052                .resource(format!(
1053                    "vault:{}",
1054                    Self::vault_target_resource(collection, key)
1055                ))
1056                .outcome(outcome)
1057                .correlation_id(request_id.clone())
1058                .fields([
1059                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1060                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1061                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1062                    crate::runtime::audit_log::AuditFieldEscaper::field(
1063                        "target",
1064                        Self::vault_target_resource(collection, key),
1065                    ),
1066                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1067                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1068                    crate::runtime::audit_log::AuditFieldEscaper::field(
1069                        "connection_id",
1070                        current_connection_id(),
1071                    ),
1072                ]);
1073        if let Some(tenant) = current_tenant() {
1074            builder = builder.tenant(tenant);
1075        }
1076        if let Some(entry) = entry {
1077            builder = builder.fields([
1078                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1079                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1080                crate::runtime::audit_log::AuditFieldEscaper::field(
1081                    "sequence_id",
1082                    entry.sequence_id,
1083                ),
1084            ]);
1085        }
1086        self.audit_log().record_event(builder.build());
1087    }
1088
1089    fn emit_vault_control_event(
1090        &self,
1091        kind: crate::runtime::control_events::EventKind,
1092        outcome: crate::runtime::control_events::Outcome,
1093        action: &'static str,
1094        collection: &str,
1095        key: &str,
1096        reason: &str,
1097        entry: Option<&VaultEntry>,
1098        extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1099    ) -> RedDBResult<()> {
1100        use crate::runtime::control_events::{
1101            ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1102        };
1103        use std::borrow::Cow;
1104
1105        let tenant = current_tenant();
1106        let principal = current_auth_identity().map(|(principal, _)| principal);
1107        let actor_user = principal
1108            .as_ref()
1109            .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1110        let request_id = Self::vault_request_id();
1111        let actor = actor_user
1112            .as_ref()
1113            .map(ActorRef::User)
1114            .unwrap_or(ActorRef::Anonymous);
1115        let ctx = ControlEventCtx {
1116            actor,
1117            scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1118            request_id: Some(Cow::Borrowed(request_id.as_str())),
1119            trace_id: None,
1120        };
1121
1122        let target = Self::vault_target_resource(collection, key);
1123        let mut fields = std::collections::HashMap::new();
1124        fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1125        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1126        fields.insert("key".to_string(), Sensitivity::raw(key));
1127        fields.insert(
1128            "connection_id".to_string(),
1129            Sensitivity::raw(current_connection_id().to_string()),
1130        );
1131        if let Some(entry) = entry {
1132            fields.insert(
1133                "entity_id".to_string(),
1134                Sensitivity::raw(entry.id.raw().to_string()),
1135            );
1136            fields.insert(
1137                "sequence_id".to_string(),
1138                Sensitivity::raw(entry.sequence_id.to_string()),
1139            );
1140            fields.insert(
1141                "version".to_string(),
1142                Sensitivity::raw(entry.version.to_string()),
1143            );
1144            fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1145            fields.insert(
1146                "tombstone".to_string(),
1147                Sensitivity::raw(entry.tombstone.to_string()),
1148            );
1149            if !entry.tombstone {
1150                fields.insert(
1151                    "fingerprint".to_string(),
1152                    Sensitivity::raw(vault_fingerprint(&entry.value)),
1153                );
1154            }
1155            fields.insert(
1156                "tags".to_string(),
1157                Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1158            );
1159        }
1160        for (key, value) in extra_fields {
1161            fields.insert(key, value);
1162        }
1163
1164        let event = ControlEvent {
1165            kind,
1166            outcome,
1167            action: Cow::Borrowed(action),
1168            resource: Some(format!("vault:{target}")),
1169            reason: Some(reason.to_string()),
1170            matched_policy_id: None,
1171            fields,
1172        };
1173        let ledger = self.inner.control_event_ledger.read();
1174        match ledger.emit(&ctx, event) {
1175            Ok(_) => Ok(()),
1176            Err(err) if self.inner.control_event_config.require_persistence() => {
1177                Err(RedDBError::Internal(err.to_string()))
1178            }
1179            Err(_) => Ok(()),
1180        }
1181    }
1182
1183    pub(crate) fn resolve_vault_secret_value(
1184        &self,
1185        collection: &str,
1186        key: &str,
1187    ) -> RedDBResult<Value> {
1188        let ops = KvAtomicOps::new(self);
1189        let entry = ops.get_vault_entry(collection, key)?;
1190        if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1191            self.audit_vault_unseal(
1192                collection,
1193                key,
1194                crate::runtime::audit_log::Outcome::Denied,
1195                &reason,
1196                entry.as_ref(),
1197            );
1198            self.emit_vault_control_event(
1199                crate::runtime::control_events::EventKind::VaultRead,
1200                crate::runtime::control_events::Outcome::Denied,
1201                "vault:read",
1202                collection,
1203                key,
1204                &reason,
1205                entry.as_ref(),
1206                Vec::new(),
1207            )?;
1208            return Err(RedDBError::Query(reason));
1209        }
1210        let Some(entry) = entry else {
1211            let reason = "not_found";
1212            self.audit_vault_unseal(
1213                collection,
1214                key,
1215                crate::runtime::audit_log::Outcome::Denied,
1216                reason,
1217                None,
1218            );
1219            self.emit_vault_control_event(
1220                crate::runtime::control_events::EventKind::VaultRead,
1221                crate::runtime::control_events::Outcome::Denied,
1222                "vault:read",
1223                collection,
1224                key,
1225                reason,
1226                None,
1227                Vec::new(),
1228            )?;
1229            return Err(RedDBError::NotFound(format!(
1230                "vault secret '{}.{}' not found",
1231                collection, key
1232            )));
1233        };
1234        if entry.tombstone {
1235            let reason = "deleted";
1236            self.audit_vault_unseal(
1237                collection,
1238                key,
1239                crate::runtime::audit_log::Outcome::Denied,
1240                reason,
1241                Some(&entry),
1242            );
1243            self.emit_vault_control_event(
1244                crate::runtime::control_events::EventKind::VaultRead,
1245                crate::runtime::control_events::Outcome::Denied,
1246                "vault:read",
1247                collection,
1248                key,
1249                reason,
1250                Some(&entry),
1251                Vec::new(),
1252            )?;
1253            return Err(RedDBError::NotFound(format!(
1254                "vault secret '{}.{}' is deleted",
1255                collection, key
1256            )));
1257        }
1258        match self.unseal_vault_value(collection, &entry.value) {
1259            Ok(value) => {
1260                self.audit_vault_unseal(
1261                    collection,
1262                    key,
1263                    crate::runtime::audit_log::Outcome::Success,
1264                    "ok",
1265                    Some(&entry),
1266                );
1267                self.emit_vault_control_event(
1268                    crate::runtime::control_events::EventKind::VaultRead,
1269                    crate::runtime::control_events::Outcome::Allowed,
1270                    "vault:read",
1271                    collection,
1272                    key,
1273                    "ok",
1274                    Some(&entry),
1275                    Vec::new(),
1276                )?;
1277                Ok(value)
1278            }
1279            Err(err) => {
1280                let reason = err.to_string();
1281                self.audit_vault_unseal(
1282                    collection,
1283                    key,
1284                    crate::runtime::audit_log::Outcome::Error,
1285                    &reason,
1286                    Some(&entry),
1287                );
1288                self.emit_vault_control_event(
1289                    crate::runtime::control_events::EventKind::VaultRead,
1290                    crate::runtime::control_events::Outcome::Error,
1291                    "vault:read",
1292                    collection,
1293                    key,
1294                    &reason,
1295                    Some(&entry),
1296                    Vec::new(),
1297                )?;
1298                Err(err)
1299            }
1300        }
1301    }
1302
1303    /// Dispatch a `KV PUT / GET / DELETE` command.
1304    pub fn execute_kv_command(
1305        &self,
1306        raw_query: &str,
1307        cmd: &crate::storage::query::ast::KvCommand,
1308    ) -> RedDBResult<RuntimeQueryResult> {
1309        use crate::storage::query::ast::KvCommand;
1310
1311        let ops = KvAtomicOps::new(self);
1312
1313        match cmd {
1314            KvCommand::Put {
1315                model,
1316                collection,
1317                key,
1318                value,
1319                ttl_ms,
1320                tags,
1321                if_not_exists,
1322            } => {
1323                if *model == crate::catalog::CollectionModel::Vault {
1324                    self.check_system_vault_capability("vault:write", collection, key)
1325                        .map_err(RedDBError::Query)?;
1326                }
1327                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1328                let (created, id) = ops.set_with_tags_for_model(
1329                    *model,
1330                    collection,
1331                    key,
1332                    value.clone(),
1333                    *ttl_ms,
1334                    tags,
1335                    *if_not_exists,
1336                )?;
1337
1338                let mut result = UnifiedResult::with_columns(vec![
1339                    "ok".into(),
1340                    "collection".into(),
1341                    "key".into(),
1342                    "id".into(),
1343                    "created".into(),
1344                    "tags".into(),
1345                ]);
1346                let mut record = UnifiedRecord::new();
1347                record.set("ok", Value::Boolean(true));
1348                record.set("collection", Value::text(collection.clone()));
1349                record.set("key", Value::text(key.clone()));
1350                record.set("id", Value::Integer(id.raw() as i64));
1351                record.set("created", Value::Boolean(created));
1352                record.set("tags", kv_tags_value(tags));
1353                result.push(record);
1354
1355                Ok(RuntimeQueryResult {
1356                    query: raw_query.to_string(),
1357                    mode: crate::storage::query::modes::QueryMode::Sql,
1358                    statement: if *model == crate::catalog::CollectionModel::Vault {
1359                        "vault_put"
1360                    } else {
1361                        "kv_put"
1362                    },
1363                    engine: if *model == crate::catalog::CollectionModel::Vault {
1364                        "vault"
1365                    } else {
1366                        "kv"
1367                    },
1368                    result,
1369                    affected_rows: 1,
1370                    statement_type: if created { "insert" } else { "update" },
1371                    bookmark: None,
1372                })
1373            }
1374            KvCommand::InvalidateTags { collection, tags } => {
1375                let invalidated = ops.invalidate_tags(collection, tags)?;
1376
1377                let mut result = UnifiedResult::with_columns(vec![
1378                    "ok".into(),
1379                    "collection".into(),
1380                    "invalidated".into(),
1381                    "tags".into(),
1382                ]);
1383                let mut record = UnifiedRecord::new();
1384                record.set("ok", Value::Boolean(true));
1385                record.set("collection", Value::text(collection.clone()));
1386                record.set("invalidated", Value::Integer(invalidated as i64));
1387                record.set("tags", kv_tags_value(tags));
1388                result.push(record);
1389
1390                Ok(RuntimeQueryResult {
1391                    query: raw_query.to_string(),
1392                    mode: crate::storage::query::modes::QueryMode::Sql,
1393                    statement: "kv_invalidate_tags",
1394                    engine: "kv",
1395                    result,
1396                    affected_rows: invalidated as u64,
1397                    statement_type: "delete",
1398                    bookmark: None,
1399                })
1400            }
1401
1402            KvCommand::Rotate {
1403                collection,
1404                key,
1405                value,
1406                tags,
1407            } => {
1408                self.check_system_vault_capability("vault:write", collection, key)
1409                    .map_err(RedDBError::Query)?;
1410                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1411                let entry = ops.append_vault_version(
1412                    collection,
1413                    key,
1414                    value.clone(),
1415                    "rotate",
1416                    false,
1417                    tags,
1418                )?;
1419                self.record_kv_watch_event(
1420                    crate::replication::cdc::ChangeOperation::Update,
1421                    collection,
1422                    key,
1423                    entry.id.raw(),
1424                    None,
1425                    Some(vault_entry_metadata_json(&entry)),
1426                );
1427                self.audit_vault_lifecycle(
1428                    "rotate",
1429                    collection,
1430                    key,
1431                    crate::runtime::audit_log::Outcome::Success,
1432                    "ok",
1433                    Some(&entry),
1434                );
1435                self.emit_vault_control_event(
1436                    crate::runtime::control_events::EventKind::VaultRotate,
1437                    crate::runtime::control_events::Outcome::Allowed,
1438                    "vault:rotate",
1439                    collection,
1440                    key,
1441                    "ok",
1442                    Some(&entry),
1443                    Vec::new(),
1444                )?;
1445                Ok(vault_write_result(
1446                    raw_query,
1447                    "vault_rotate",
1448                    "update",
1449                    collection,
1450                    key,
1451                    &entry,
1452                    1,
1453                ))
1454            }
1455
1456            KvCommand::List {
1457                model,
1458                collection,
1459                prefix,
1460                limit,
1461                offset,
1462            } => {
1463                if *model != crate::catalog::CollectionModel::Vault {
1464                    return Err(RedDBError::InvalidOperation(
1465                        "LIST is not supported through normal KV command execution".to_string(),
1466                    ));
1467                }
1468                let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1469                entries.sort_by(|left, right| left.key.cmp(&right.key));
1470                let mut result = UnifiedResult::with_columns(vec![
1471                    "collection".into(),
1472                    "key".into(),
1473                    "version".into(),
1474                    "fingerprint".into(),
1475                    "tags".into(),
1476                    "created_at".into(),
1477                    "updated_at".into(),
1478                    "status".into(),
1479                    "tombstone".into(),
1480                    "op".into(),
1481                ]);
1482                let mut visible = Vec::new();
1483                for entry in entries {
1484                    match self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1485                    {
1486                        Ok(()) => {
1487                            self.emit_vault_control_event(
1488                                crate::runtime::control_events::EventKind::VaultMetadataRead,
1489                                crate::runtime::control_events::Outcome::Allowed,
1490                                "vault:read_metadata",
1491                                collection,
1492                                &entry.key,
1493                                "ok",
1494                                Some(&entry),
1495                                Vec::new(),
1496                            )?;
1497                            visible.push(entry);
1498                        }
1499                        Err(reason) => {
1500                            self.emit_vault_control_event(
1501                                crate::runtime::control_events::EventKind::VaultMetadataRead,
1502                                crate::runtime::control_events::Outcome::Denied,
1503                                "vault:read_metadata",
1504                                collection,
1505                                &entry.key,
1506                                &reason,
1507                                Some(&entry),
1508                                Vec::new(),
1509                            )?;
1510                        }
1511                    }
1512                }
1513                for entry in visible
1514                    .into_iter()
1515                    .skip(*offset)
1516                    .take(limit.unwrap_or(usize::MAX))
1517                {
1518                    push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1519                }
1520                Ok(RuntimeQueryResult {
1521                    query: raw_query.to_string(),
1522                    mode: crate::storage::query::modes::QueryMode::Sql,
1523                    statement: "vault_list",
1524                    engine: "vault",
1525                    result,
1526                    affected_rows: 0,
1527                    statement_type: "select",
1528                    bookmark: None,
1529                })
1530            }
1531
1532            KvCommand::History { collection, key } => {
1533                let latest = ops.get_vault_entry(collection, key)?;
1534                if let Err(reason) =
1535                    self.check_vault_capability("vault:read_metadata", collection, key)
1536                {
1537                    self.emit_vault_control_event(
1538                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1539                        crate::runtime::control_events::Outcome::Denied,
1540                        "vault:read_metadata",
1541                        collection,
1542                        key,
1543                        &reason,
1544                        latest.as_ref(),
1545                        Vec::new(),
1546                    )?;
1547                    return Err(RedDBError::Query(reason));
1548                }
1549                let versions =
1550                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1551                let result = vault_history_result(collection, key, &versions);
1552                self.emit_vault_control_event(
1553                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1554                    crate::runtime::control_events::Outcome::Allowed,
1555                    "vault:read_metadata",
1556                    collection,
1557                    key,
1558                    "ok",
1559                    latest.as_ref(),
1560                    Vec::new(),
1561                )?;
1562                Ok(RuntimeQueryResult {
1563                    query: raw_query.to_string(),
1564                    mode: crate::storage::query::modes::QueryMode::Sql,
1565                    statement: "vault_history",
1566                    engine: "vault",
1567                    result,
1568                    affected_rows: 0,
1569                    statement_type: "select",
1570                    bookmark: None,
1571                })
1572            }
1573
1574            KvCommand::Purge { collection, key } => {
1575                let entry = ops.get_vault_entry(collection, key)?;
1576                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1577                    self.audit_vault_lifecycle(
1578                        "purge",
1579                        collection,
1580                        key,
1581                        crate::runtime::audit_log::Outcome::Denied,
1582                        &reason,
1583                        entry.as_ref(),
1584                    );
1585                    self.emit_vault_control_event(
1586                        crate::runtime::control_events::EventKind::VaultPurge,
1587                        crate::runtime::control_events::Outcome::Denied,
1588                        "vault:purge",
1589                        collection,
1590                        key,
1591                        &reason,
1592                        entry.as_ref(),
1593                        Vec::new(),
1594                    )?;
1595                    return Err(RedDBError::Query(reason));
1596                }
1597                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1598                let purged = ops.purge_vault_versions(collection, key)?;
1599                self.audit_vault_lifecycle(
1600                    "purge",
1601                    collection,
1602                    key,
1603                    crate::runtime::audit_log::Outcome::Success,
1604                    "ok",
1605                    entry.as_ref(),
1606                );
1607                self.emit_vault_control_event(
1608                    crate::runtime::control_events::EventKind::VaultPurge,
1609                    crate::runtime::control_events::Outcome::Allowed,
1610                    "vault:purge",
1611                    collection,
1612                    key,
1613                    "ok",
1614                    entry.as_ref(),
1615                    vec![(
1616                        "purged".to_string(),
1617                        crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1618                    )],
1619                )?;
1620                let mut result = UnifiedResult::with_columns(vec![
1621                    "ok".into(),
1622                    "collection".into(),
1623                    "key".into(),
1624                    "purged".into(),
1625                ]);
1626                let mut record = UnifiedRecord::new();
1627                record.set("ok", Value::Boolean(true));
1628                record.set("collection", Value::text(collection.clone()));
1629                record.set("key", Value::text(key.clone()));
1630                record.set("purged", Value::Integer(purged as i64));
1631                result.push(record);
1632                Ok(RuntimeQueryResult {
1633                    query: raw_query.to_string(),
1634                    mode: crate::storage::query::modes::QueryMode::Sql,
1635                    statement: "vault_purge",
1636                    engine: "vault",
1637                    result,
1638                    affected_rows: purged as u64,
1639                    statement_type: "delete",
1640                    bookmark: None,
1641                })
1642            }
1643
1644            KvCommand::Get {
1645                model,
1646                collection,
1647                key,
1648            } => {
1649                if *model == crate::catalog::CollectionModel::Vault {
1650                    let entry = ops.get_vault_entry(collection, key)?;
1651                    if let Err(reason) =
1652                        self.check_vault_capability("vault:read_metadata", collection, key)
1653                    {
1654                        self.emit_vault_control_event(
1655                            crate::runtime::control_events::EventKind::VaultMetadataRead,
1656                            crate::runtime::control_events::Outcome::Denied,
1657                            "vault:read_metadata",
1658                            collection,
1659                            key,
1660                            &reason,
1661                            entry.as_ref(),
1662                            Vec::new(),
1663                        )?;
1664                        return Err(RedDBError::Query(reason));
1665                    }
1666                    let key_available = self.vault_key_available(collection);
1667                    let result =
1668                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1669                    self.emit_vault_control_event(
1670                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1671                        crate::runtime::control_events::Outcome::Allowed,
1672                        "vault:read_metadata",
1673                        collection,
1674                        key,
1675                        "ok",
1676                        entry.as_ref(),
1677                        Vec::new(),
1678                    )?;
1679                    return Ok(RuntimeQueryResult {
1680                        query: raw_query.to_string(),
1681                        mode: crate::storage::query::modes::QueryMode::Sql,
1682                        statement: "vault_get",
1683                        engine: "vault",
1684                        result,
1685                        affected_rows: 0,
1686                        statement_type: "select",
1687                        bookmark: None,
1688                    });
1689                }
1690
1691                let entity = ops.get_entity(*model, collection, key)?;
1692                let value = entity.as_ref().and_then(kv_value_from_entity);
1693                if *model == crate::catalog::CollectionModel::Kv {
1694                    self.inner.kv_stats.incr_gets();
1695                }
1696                let mut result = UnifiedResult::with_columns(vec![
1697                    "rid".into(),
1698                    "collection".into(),
1699                    "kind".into(),
1700                    "tenant".into(),
1701                    "created_at".into(),
1702                    "updated_at".into(),
1703                    "key".into(),
1704                    "value".into(),
1705                    "tags".into(),
1706                ]);
1707                let mut record = UnifiedRecord::new();
1708                if let Some(entity) = entity.as_ref() {
1709                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1710                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1711                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1712                } else {
1713                    record.set("rid", Value::Null);
1714                    record.set("created_at", Value::Null);
1715                    record.set("updated_at", Value::Null);
1716                }
1717                record.set("collection", Value::text(collection.clone()));
1718                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1719                record.set("tenant", Value::Null);
1720                record.set("key", Value::text(key.clone()));
1721                record.set(
1722                    "value",
1723                    value.unwrap_or(crate::storage::schema::Value::Null),
1724                );
1725                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1726                result.push(record);
1727
1728                Ok(RuntimeQueryResult {
1729                    query: raw_query.to_string(),
1730                    mode: crate::storage::query::modes::QueryMode::Sql,
1731                    statement: "kv_get",
1732                    engine: "kv",
1733                    result,
1734                    affected_rows: 0,
1735                    statement_type: "select",
1736                    bookmark: None,
1737                })
1738            }
1739            KvCommand::Watch {
1740                model,
1741                collection,
1742                key,
1743                prefix,
1744                from_lsn,
1745            } => {
1746                let watch_key = if *prefix {
1747                    format!("{key}.*")
1748                } else {
1749                    key.clone()
1750                };
1751                let endpoint = match from_lsn {
1752                    Some(lsn) => format!(
1753                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1754                        keyed_model_name(*model)
1755                    ),
1756                    None => format!(
1757                        "/collections/{collection}/{}/{watch_key}/watch",
1758                        keyed_model_name(*model)
1759                    ),
1760                };
1761                let mut result = UnifiedResult::with_columns(vec![
1762                    "collection".into(),
1763                    "key".into(),
1764                    "prefix".into(),
1765                    "from_lsn".into(),
1766                    "watch_url".into(),
1767                    "streaming".into(),
1768                ]);
1769                let mut record = UnifiedRecord::new();
1770                record.set("collection", Value::text(collection.clone()));
1771                record.set("key", Value::text(watch_key));
1772                record.set("prefix", Value::Boolean(*prefix));
1773                record.set(
1774                    "from_lsn",
1775                    from_lsn
1776                        .map(Value::UnsignedInteger)
1777                        .unwrap_or(crate::storage::schema::Value::Null),
1778                );
1779                record.set("watch_url", Value::text(endpoint));
1780                record.set("streaming", Value::Boolean(true));
1781                result.push(record);
1782
1783                Ok(RuntimeQueryResult {
1784                    query: raw_query.to_string(),
1785                    mode: crate::storage::query::modes::QueryMode::Sql,
1786                    statement: "kv_watch",
1787                    engine: keyed_model_name(*model),
1788                    result,
1789                    affected_rows: 0,
1790                    statement_type: "stream",
1791                    bookmark: None,
1792                })
1793            }
1794
1795            KvCommand::Unseal {
1796                collection,
1797                key,
1798                version,
1799            } => {
1800                let latest = ops.get_vault_entry(collection, key)?;
1801                let entry = match version {
1802                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1803                    None => latest.clone(),
1804                };
1805                let action = match (version, latest.as_ref()) {
1806                    (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
1807                    (Some(_), _) => "vault:unseal_history",
1808                    _ => "vault:read",
1809                };
1810                let event_kind = if action == "vault:read" {
1811                    crate::runtime::control_events::EventKind::VaultRead
1812                } else {
1813                    crate::runtime::control_events::EventKind::VaultUnseal
1814                };
1815                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1816                    self.audit_vault_unseal(
1817                        collection,
1818                        key,
1819                        crate::runtime::audit_log::Outcome::Denied,
1820                        &reason,
1821                        entry.as_ref(),
1822                    );
1823                    self.emit_vault_control_event(
1824                        event_kind,
1825                        crate::runtime::control_events::Outcome::Denied,
1826                        action,
1827                        collection,
1828                        key,
1829                        &reason,
1830                        entry.as_ref(),
1831                        Vec::new(),
1832                    )?;
1833                    return Err(RedDBError::Query(reason));
1834                }
1835                let Some(entry) = entry else {
1836                    let reason = "not_found";
1837                    self.audit_vault_unseal(
1838                        collection,
1839                        key,
1840                        crate::runtime::audit_log::Outcome::Denied,
1841                        reason,
1842                        None,
1843                    );
1844                    self.emit_vault_control_event(
1845                        event_kind,
1846                        crate::runtime::control_events::Outcome::Denied,
1847                        action,
1848                        collection,
1849                        key,
1850                        reason,
1851                        None,
1852                        Vec::new(),
1853                    )?;
1854                    return Err(RedDBError::NotFound(format!(
1855                        "vault secret '{}.{}' not found",
1856                        collection, key
1857                    )));
1858                };
1859                if entry.tombstone {
1860                    let reason = "deleted";
1861                    self.audit_vault_unseal(
1862                        collection,
1863                        key,
1864                        crate::runtime::audit_log::Outcome::Denied,
1865                        reason,
1866                        Some(&entry),
1867                    );
1868                    self.emit_vault_control_event(
1869                        event_kind,
1870                        crate::runtime::control_events::Outcome::Denied,
1871                        action,
1872                        collection,
1873                        key,
1874                        reason,
1875                        Some(&entry),
1876                        Vec::new(),
1877                    )?;
1878                    return Err(RedDBError::NotFound(format!(
1879                        "vault secret '{}.{}' is deleted",
1880                        collection, key
1881                    )));
1882                }
1883                match self.unseal_vault_value(collection, &entry.value) {
1884                    Ok(value) => {
1885                        self.audit_vault_unseal(
1886                            collection,
1887                            key,
1888                            crate::runtime::audit_log::Outcome::Success,
1889                            "ok",
1890                            Some(&entry),
1891                        );
1892                        self.emit_vault_control_event(
1893                            event_kind,
1894                            crate::runtime::control_events::Outcome::Allowed,
1895                            action,
1896                            collection,
1897                            key,
1898                            "ok",
1899                            Some(&entry),
1900                            Vec::new(),
1901                        )?;
1902                        let mut result = UnifiedResult::with_columns(vec![
1903                            "collection".into(),
1904                            "key".into(),
1905                            "value".into(),
1906                        ]);
1907                        let mut record = UnifiedRecord::new();
1908                        record.set("collection", Value::text(collection.clone()));
1909                        record.set("key", Value::text(key.clone()));
1910                        record.set("value", value);
1911                        result.push(record);
1912                        Ok(RuntimeQueryResult {
1913                            query: raw_query.to_string(),
1914                            mode: crate::storage::query::modes::QueryMode::Sql,
1915                            statement: "vault_unseal",
1916                            engine: "vault",
1917                            result,
1918                            affected_rows: 0,
1919                            statement_type: "select",
1920                            bookmark: None,
1921                        })
1922                    }
1923                    Err(err) => {
1924                        let reason = err.to_string();
1925                        self.audit_vault_unseal(
1926                            collection,
1927                            key,
1928                            crate::runtime::audit_log::Outcome::Error,
1929                            &reason,
1930                            Some(&entry),
1931                        );
1932                        self.emit_vault_control_event(
1933                            event_kind,
1934                            crate::runtime::control_events::Outcome::Error,
1935                            action,
1936                            collection,
1937                            key,
1938                            &reason,
1939                            Some(&entry),
1940                            Vec::new(),
1941                        )?;
1942                        Err(err)
1943                    }
1944                }
1945            }
1946
1947            KvCommand::Incr {
1948                model,
1949                collection,
1950                key,
1951                by,
1952                ttl_ms,
1953            } => {
1954                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1955                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1956
1957                let mut result = UnifiedResult::with_columns(vec![
1958                    "ok".into(),
1959                    "collection".into(),
1960                    "key".into(),
1961                    "value".into(),
1962                ]);
1963                let mut record = UnifiedRecord::new();
1964                record.set("ok", Value::Boolean(true));
1965                record.set("collection", Value::text(collection.clone()));
1966                record.set("key", Value::text(key.clone()));
1967                record.set("value", Value::Integer(new_value));
1968                result.push(record);
1969
1970                Ok(RuntimeQueryResult {
1971                    query: raw_query.to_string(),
1972                    mode: crate::storage::query::modes::QueryMode::Sql,
1973                    statement: "kv_incr",
1974                    engine: "kv",
1975                    result,
1976                    affected_rows: 1,
1977                    statement_type: "update",
1978                    bookmark: None,
1979                })
1980            }
1981
1982            KvCommand::Cas {
1983                model,
1984                collection,
1985                key,
1986                expected,
1987                new_value,
1988                ttl_ms,
1989            } => {
1990                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1991                let (ok, current) = ops.cas(
1992                    *model,
1993                    collection,
1994                    key,
1995                    expected.as_ref(),
1996                    new_value.clone(),
1997                    *ttl_ms,
1998                )?;
1999
2000                let mut result = UnifiedResult::with_columns(vec![
2001                    "ok".into(),
2002                    "collection".into(),
2003                    "key".into(),
2004                    "current".into(),
2005                ]);
2006                let mut record = UnifiedRecord::new();
2007                record.set("ok", Value::Boolean(ok));
2008                record.set("collection", Value::text(collection.clone()));
2009                record.set("key", Value::text(key.clone()));
2010                record.set(
2011                    "current",
2012                    current.unwrap_or(crate::storage::schema::Value::Null),
2013                );
2014                result.push(record);
2015
2016                Ok(RuntimeQueryResult {
2017                    query: raw_query.to_string(),
2018                    mode: crate::storage::query::modes::QueryMode::Sql,
2019                    statement: "kv_cas",
2020                    engine: "kv",
2021                    result,
2022                    affected_rows: if ok { 1 } else { 0 },
2023                    statement_type: "update",
2024                    bookmark: None,
2025                })
2026            }
2027
2028            KvCommand::Delete {
2029                model,
2030                collection,
2031                key,
2032            } => {
2033                if *model == crate::catalog::CollectionModel::Vault {
2034                    self.check_system_vault_capability("vault:write", collection, key)
2035                        .map_err(RedDBError::Query)?;
2036                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2037                    let entry = ops.append_vault_version(
2038                        collection,
2039                        key,
2040                        Value::Null,
2041                        "delete",
2042                        true,
2043                        &[],
2044                    )?;
2045                    self.record_kv_watch_event(
2046                        crate::replication::cdc::ChangeOperation::Delete,
2047                        collection,
2048                        key,
2049                        entry.id.raw(),
2050                        None,
2051                        Some(vault_entry_metadata_json(&entry)),
2052                    );
2053                    self.audit_vault_lifecycle(
2054                        "delete",
2055                        collection,
2056                        key,
2057                        crate::runtime::audit_log::Outcome::Success,
2058                        "ok",
2059                        Some(&entry),
2060                    );
2061                    return Ok(vault_write_result(
2062                        raw_query,
2063                        "vault_delete",
2064                        "delete",
2065                        collection,
2066                        key,
2067                        &entry,
2068                        1,
2069                    ));
2070                }
2071                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2072                let deleted = ops.delete(*model, collection, key)?;
2073
2074                let mut result = UnifiedResult::with_columns(vec![
2075                    "ok".into(),
2076                    "collection".into(),
2077                    "key".into(),
2078                    "deleted".into(),
2079                ]);
2080                let mut record = UnifiedRecord::new();
2081                record.set("ok", Value::Boolean(true));
2082                record.set("collection", Value::text(collection.clone()));
2083                record.set("key", Value::text(key.clone()));
2084                record.set("deleted", Value::Boolean(deleted));
2085                result.push(record);
2086
2087                Ok(RuntimeQueryResult {
2088                    query: raw_query.to_string(),
2089                    mode: crate::storage::query::modes::QueryMode::Sql,
2090                    statement: "kv_delete",
2091                    engine: "kv",
2092                    result,
2093                    affected_rows: if deleted { 1 } else { 0 },
2094                    statement_type: "delete",
2095                    bookmark: None,
2096                })
2097            }
2098        }
2099    }
2100
2101    pub fn vault_watch_events_since(
2102        &self,
2103        collection: &str,
2104        key: &str,
2105        since_lsn: u64,
2106        max_count: usize,
2107    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2108        self.kv_watch_events_since(collection, key, since_lsn, max_count)
2109            .into_iter()
2110            .filter(|event| {
2111                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2112                    .is_ok()
2113            })
2114            .map(vault_filter_watch_event)
2115            .collect()
2116    }
2117
2118    pub fn vault_watch_events_since_prefix(
2119        &self,
2120        collection: &str,
2121        prefix: &str,
2122        since_lsn: u64,
2123        max_count: usize,
2124    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2125        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2126            .into_iter()
2127            .filter(|event| {
2128                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2129                    .is_ok()
2130            })
2131            .map(vault_filter_watch_event)
2132            .collect()
2133    }
2134
2135    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2136        let auth_store = match self.inner.auth_store.read().clone() {
2137            Some(store) => store,
2138            None => return Ok(()),
2139        };
2140        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2141            Some(identity) => identity,
2142            None => return Ok(()),
2143        };
2144        if role < crate::auth::Role::Write {
2145            return Err(RedDBError::Query(format!(
2146                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2147            )));
2148        }
2149        if !auth_store.iam_authorization_enabled() {
2150            return Ok(());
2151        }
2152
2153        let tenant = crate::runtime::impl_core::current_tenant();
2154        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2155        let mut resource =
2156            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2157        if let Some(tenant) = tenant.clone() {
2158            resource = resource.with_tenant(tenant);
2159        }
2160        let ctx = crate::auth::policies::EvalContext {
2161            principal_tenant: tenant.clone(),
2162            current_tenant: tenant,
2163            peer_ip: None,
2164            mfa_present: false,
2165            now_ms: current_unix_ms(),
2166            principal_is_admin_role: role == crate::auth::Role::Admin,
2167            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2168            principal_is_platform_scoped: principal.tenant.is_none(),
2169        };
2170        if auth_store.check_policy_authz_with_role(
2171            &principal,
2172            "kv:invalidate",
2173            &resource,
2174            &ctx,
2175            role,
2176        ) {
2177            Ok(())
2178        } else {
2179            Err(RedDBError::Query(format!(
2180                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2181            )))
2182        }
2183    }
2184}
2185
2186fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2187    let ttl_ms = ttl_ms?;
2188    Some(Metadata::with_fields(
2189        [(
2190            "_ttl_ms".to_string(),
2191            if ttl_ms <= i64::MAX as u64 {
2192                MetadataValue::Int(ttl_ms as i64)
2193            } else {
2194                MetadataValue::Timestamp(ttl_ms)
2195            },
2196        )]
2197        .into_iter()
2198        .collect(),
2199    ))
2200}
2201
2202fn vault_write_result(
2203    raw_query: &str,
2204    statement: &'static str,
2205    statement_type: &'static str,
2206    collection: &str,
2207    key: &str,
2208    entry: &VaultEntry,
2209    affected_rows: u64,
2210) -> RuntimeQueryResult {
2211    let mut result = UnifiedResult::with_columns(vec![
2212        "ok".into(),
2213        "collection".into(),
2214        "key".into(),
2215        "version".into(),
2216        "fingerprint".into(),
2217        "tombstone".into(),
2218        "op".into(),
2219        "id".into(),
2220    ]);
2221    let mut record = UnifiedRecord::new();
2222    record.set("ok", Value::Boolean(true));
2223    record.set("collection", Value::text(collection.to_string()));
2224    record.set("key", Value::text(key.to_string()));
2225    record.set("version", Value::Integer(entry.version));
2226    if entry.tombstone {
2227        record.set("fingerprint", Value::Null);
2228    } else {
2229        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2230    }
2231    record.set("tombstone", Value::Boolean(entry.tombstone));
2232    record.set("op", Value::text(entry.op.clone()));
2233    record.set("id", Value::Integer(entry.id.raw() as i64));
2234    result.push(record);
2235    RuntimeQueryResult {
2236        query: raw_query.to_string(),
2237        mode: crate::storage::query::modes::QueryMode::Sql,
2238        statement,
2239        engine: "vault",
2240        result,
2241        affected_rows,
2242        statement_type,
2243        bookmark: None,
2244    }
2245}
2246
2247fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2248    let mut result = UnifiedResult::with_columns(vec![
2249        "collection".into(),
2250        "key".into(),
2251        "version".into(),
2252        "fingerprint".into(),
2253        "tags".into(),
2254        "created_at".into(),
2255        "updated_at".into(),
2256        "status".into(),
2257        "tombstone".into(),
2258        "op".into(),
2259    ]);
2260    for entry in versions {
2261        push_vault_metadata_record(&mut result, collection, key, entry);
2262    }
2263    result
2264}
2265
2266fn push_vault_metadata_record(
2267    result: &mut UnifiedResult,
2268    collection: &str,
2269    key: &str,
2270    entry: &VaultEntry,
2271) {
2272    let mut record = UnifiedRecord::new();
2273    record.set("collection", Value::text(collection.to_string()));
2274    record.set("key", Value::text(key.to_string()));
2275    record.set("version", Value::Integer(entry.version));
2276    if entry.tombstone {
2277        record.set("fingerprint", Value::Null);
2278        record.set("status", Value::text("deleted"));
2279    } else {
2280        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2281        record.set("status", Value::text("sealed"));
2282    }
2283    record.set("tags", vault_tags_value(&entry.metadata));
2284    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2285    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2286    record.set("tombstone", Value::Boolean(entry.tombstone));
2287    record.set("op", Value::text(entry.op.clone()));
2288    result.push(record);
2289}
2290
2291fn vault_metadata_result(
2292    collection: &str,
2293    key: &str,
2294    entry: Option<&VaultEntry>,
2295    key_available: bool,
2296) -> UnifiedResult {
2297    let mut result = UnifiedResult::with_columns(vec![
2298        "collection".into(),
2299        "key".into(),
2300        "version".into(),
2301        "fingerprint".into(),
2302        "tags".into(),
2303        "created_at".into(),
2304        "updated_at".into(),
2305        "value".into(),
2306        "status".into(),
2307        "tombstone".into(),
2308        "op".into(),
2309    ]);
2310    let mut record = UnifiedRecord::new();
2311    record.set("collection", Value::text(collection.to_string()));
2312    record.set("key", Value::text(key.to_string()));
2313    match entry {
2314        Some(entry) => {
2315            record.set("version", Value::Integer(entry.version));
2316            if entry.tombstone {
2317                record.set("fingerprint", Value::Null);
2318            } else {
2319                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2320            }
2321            record.set("tags", vault_tags_value(&entry.metadata));
2322            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2323            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2324            record.set("value", Value::text("***"));
2325            record.set(
2326                "status",
2327                Value::text(if entry.tombstone {
2328                    "deleted"
2329                } else if key_available {
2330                    "sealed"
2331                } else {
2332                    "sealed_unavailable"
2333                }),
2334            );
2335            record.set("tombstone", Value::Boolean(entry.tombstone));
2336            record.set("op", Value::text(entry.op.clone()));
2337        }
2338        None => {
2339            record.set("version", Value::Null);
2340            record.set("fingerprint", Value::Null);
2341            record.set("tags", Value::Array(Vec::new()));
2342            record.set("created_at", Value::Null);
2343            record.set("updated_at", Value::Null);
2344            record.set("value", Value::text(""));
2345            record.set("status", Value::text("missing"));
2346            record.set("tombstone", Value::Boolean(false));
2347            record.set("op", Value::Null);
2348        }
2349    }
2350    result.push(record);
2351    result
2352}
2353
2354fn vault_fingerprint(value: &Value) -> String {
2355    match value {
2356        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2357        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2358    }
2359}
2360
2361fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2362    let mut object = crate::json::Map::new();
2363    object.insert(
2364        "key".to_string(),
2365        crate::json::Value::String(entry.key.clone()),
2366    );
2367    object.insert(
2368        "version".to_string(),
2369        crate::json::Value::Number(entry.version as f64),
2370    );
2371    object.insert(
2372        "fingerprint".to_string(),
2373        if entry.tombstone {
2374            crate::json::Value::Null
2375        } else {
2376            crate::json::Value::String(vault_fingerprint(&entry.value))
2377        },
2378    );
2379    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2380    object.insert(
2381        "actor".to_string(),
2382        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2383    );
2384    object.insert(
2385        "sequence_id".to_string(),
2386        crate::json::Value::Number(entry.sequence_id as f64),
2387    );
2388    object.insert(
2389        "tombstone".to_string(),
2390        crate::json::Value::Bool(entry.tombstone),
2391    );
2392    object.insert(
2393        "op".to_string(),
2394        crate::json::Value::String(entry.op.clone()),
2395    );
2396    crate::json::Value::Object(object)
2397}
2398
2399fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2400    match vault_tags_value(metadata) {
2401        Value::Array(values) => crate::json::Value::Array(
2402            values
2403                .into_iter()
2404                .filter_map(|value| match value {
2405                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2406                    _ => None,
2407                })
2408                .collect(),
2409        ),
2410        _ => crate::json::Value::Array(Vec::new()),
2411    }
2412}
2413
2414fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2415    [(
2416        "tags".to_string(),
2417        MetadataValue::Array(
2418            tags.iter()
2419                .map(|tag| MetadataValue::String(tag.clone()))
2420                .collect(),
2421        ),
2422    )]
2423    .into_iter()
2424    .collect()
2425}
2426
2427fn vault_filter_watch_event(
2428    mut event: crate::replication::cdc::KvWatchEvent,
2429) -> crate::replication::cdc::KvWatchEvent {
2430    event.before = event.before.and_then(vault_metadata_json_only);
2431    event.after = event.after.and_then(vault_metadata_json_only);
2432    event
2433}
2434
2435fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2436    let object = value.as_object()?;
2437    let mut out = crate::json::Map::new();
2438    for field in [
2439        "key",
2440        "version",
2441        "fingerprint",
2442        "tags",
2443        "actor",
2444        "sequence_id",
2445        "tombstone",
2446        "op",
2447    ] {
2448        if let Some(value) = object.get(field) {
2449            out.insert(field.to_string(), value.clone());
2450        }
2451    }
2452    Some(crate::json::Value::Object(out))
2453}
2454
2455fn vault_tags_value(metadata: &Metadata) -> Value {
2456    match metadata.get("tags") {
2457        Some(MetadataValue::Array(values)) => Value::Array(
2458            values
2459                .iter()
2460                .filter_map(|value| match value {
2461                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2462                    _ => None,
2463                })
2464                .collect(),
2465        ),
2466        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2467            Value::Array(vec![Value::text(tag.clone())])
2468        }
2469        _ => Value::Array(Vec::new()),
2470    }
2471}
2472
2473fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2474    let bytes = hex::decode(hex_key)
2475        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2476    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2477        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2478    })?;
2479    Ok(key)
2480}
2481
2482fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2483    if tags.is_empty() {
2484        return None;
2485    }
2486    let values = tags
2487        .iter()
2488        .map(|tag| MetadataValue::String(tag.clone()))
2489        .collect();
2490    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2491}
2492
2493fn kv_tags_value(tags: &[String]) -> Value {
2494    let json = crate::json::Value::Array(
2495        tags.iter()
2496            .map(|tag| crate::json::Value::String(tag.clone()))
2497            .collect(),
2498    );
2499    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2500}
2501
2502fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2503    if let crate::storage::EntityData::Row(ref row) = entity.data {
2504        if let Some(ref named) = row.named {
2505            return named.get("value").cloned();
2506        }
2507    }
2508    None
2509}
2510
2511fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2512    let now = current_unix_ms();
2513    crate::physical::CollectionContract {
2514        name: name.to_string(),
2515        declared_model: crate::catalog::CollectionModel::Kv,
2516        schema_mode: crate::catalog::SchemaMode::Dynamic,
2517        origin: crate::physical::ContractOrigin::Implicit,
2518        version: 1,
2519        created_at_unix_ms: now,
2520        updated_at_unix_ms: now,
2521        default_ttl_ms: None,
2522        vector_dimension: None,
2523        vector_metric: None,
2524        context_index_fields: Vec::new(),
2525        declared_columns: Vec::new(),
2526        table_def: None,
2527        timestamps_enabled: false,
2528        context_index_enabled: false,
2529        metrics_raw_retention_ms: None,
2530        metrics_rollup_policies: Vec::new(),
2531        metrics_tenant_identity: None,
2532        metrics_namespace: None,
2533        append_only: false,
2534        subscriptions: Vec::new(),
2535        analytics_config: Vec::new(),
2536        session_key: None,
2537        session_gap_ms: None,
2538        retention_duration_ms: None,
2539    }
2540}
2541
2542fn current_unix_ms() -> u128 {
2543    std::time::SystemTime::now()
2544        .duration_since(std::time::UNIX_EPOCH)
2545        .unwrap_or_default()
2546        .as_millis()
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551    use crate::api::RedDBOptions;
2552    use crate::catalog::CollectionModel;
2553    use crate::runtime::RedDBRuntime;
2554
2555    fn rt() -> RedDBRuntime {
2556        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2557    }
2558
2559    #[test]
2560    fn incr_missing_key_initialises_at_by() {
2561        let r = rt();
2562        let ops = super::KvAtomicOps::new(&r);
2563        let v = ops
2564            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2565            .unwrap();
2566        assert_eq!(v, 5);
2567    }
2568
2569    #[test]
2570    fn kv_runtime_stats_count_public_ops() {
2571        let r = rt();
2572        let ops = super::KvAtomicOps::new(&r);
2573
2574        ops.set(
2575            CollectionModel::Kv,
2576            "kv_default",
2577            "profile",
2578            crate::storage::schema::Value::text("alice"),
2579            None,
2580            false,
2581        )
2582        .unwrap();
2583        ops.get(CollectionModel::Kv, "kv_default", "profile")
2584            .unwrap();
2585        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2586            .unwrap();
2587        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2588            .unwrap();
2589        ops.cas(
2590            CollectionModel::Kv,
2591            "kv_default",
2592            "profile",
2593            None,
2594            crate::storage::schema::Value::text("created"),
2595            None,
2596        )
2597        .unwrap();
2598        ops.cas(
2599            CollectionModel::Kv,
2600            "kv_default",
2601            "profile",
2602            Some(&crate::storage::schema::Value::text("different")),
2603            crate::storage::schema::Value::text("ignored"),
2604            None,
2605        )
2606        .unwrap();
2607
2608        let stats = r.stats().kv;
2609        assert_eq!(stats.puts, 1);
2610        assert_eq!(stats.gets, 1);
2611        assert_eq!(stats.deletes, 1);
2612        assert_eq!(stats.incrs, 1);
2613        assert_eq!(stats.cas_success, 1);
2614        assert_eq!(stats.cas_conflict, 1);
2615    }
2616
2617    #[test]
2618    fn kv_invalidate_tags_removes_matching_entries_only() {
2619        let r = rt();
2620
2621        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2622            .unwrap();
2623
2624        let miss = r
2625            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2626            .unwrap();
2627        assert_eq!(miss.affected_rows, 0);
2628        assert!(matches!(
2629            r.execute_query("KV GET sessions.blob")
2630                .unwrap()
2631                .result
2632                .records[0]
2633                .get("value"),
2634            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2635        ));
2636
2637        let hit = r
2638            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2639            .unwrap();
2640        assert_eq!(hit.affected_rows, 1);
2641        assert!(matches!(
2642            r.execute_query("KV GET sessions.blob")
2643                .unwrap()
2644                .result
2645                .records[0]
2646                .get("value"),
2647            Some(crate::storage::schema::Value::Null)
2648        ));
2649    }
2650
2651    #[test]
2652    fn kv_runtime_stats_count_watch_streams_and_events() {
2653        let r = rt();
2654        let ops = super::KvAtomicOps::new(&r);
2655        assert_eq!(r.stats().kv.watch_streams_active, 0);
2656
2657        {
2658            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2659            assert_eq!(r.stats().kv.watch_streams_active, 1);
2660
2661            ops.set(
2662                CollectionModel::Kv,
2663                "kv_default",
2664                "watched",
2665                crate::storage::schema::Value::Integer(1),
2666                None,
2667                false,
2668            )
2669            .unwrap();
2670            let event = stream.poll_next().expect("watch event");
2671            assert_eq!(event.key, "watched");
2672            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2673
2674            stream.record_drop_count(3);
2675            assert_eq!(r.stats().kv.watch_drops, 3);
2676        }
2677
2678        assert_eq!(r.stats().kv.watch_streams_active, 0);
2679    }
2680
2681    #[test]
2682    fn incr_existing_integer_accumulates() {
2683        let r = rt();
2684        let ops = super::KvAtomicOps::new(&r);
2685        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2686            .unwrap();
2687        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2688            .unwrap();
2689        let v = ops
2690            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2691            .unwrap();
2692        assert_eq!(v, 3);
2693    }
2694
2695    #[test]
2696    fn decr_via_negative_by() {
2697        let r = rt();
2698        let ops = super::KvAtomicOps::new(&r);
2699        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2700            .unwrap();
2701        let v = ops
2702            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2703            .unwrap();
2704        assert_eq!(v, 7);
2705    }
2706
2707    #[test]
2708    fn concurrent_incr_single_key_is_atomic() {
2709        const THREADS: usize = 8;
2710        const ITERS: usize = 1000;
2711
2712        let runtime = std::sync::Arc::new(rt());
2713        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2714        let mut handles = Vec::new();
2715
2716        for _ in 0..THREADS {
2717            let runtime = std::sync::Arc::clone(&runtime);
2718            let barrier = std::sync::Arc::clone(&barrier);
2719            handles.push(std::thread::spawn(move || {
2720                let ops = super::KvAtomicOps::new(&runtime);
2721                barrier.wait();
2722                for _ in 0..ITERS {
2723                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2724                        .unwrap();
2725                }
2726            }));
2727        }
2728
2729        for handle in handles {
2730            handle.join().expect("worker should finish");
2731        }
2732
2733        let ops = super::KvAtomicOps::new(&runtime);
2734        assert_eq!(
2735            ops.get(CollectionModel::Kv, "kv_default", "counter")
2736                .unwrap(),
2737            Some(crate::storage::schema::Value::Integer(
2738                (THREADS * ITERS) as i64
2739            ))
2740        );
2741    }
2742
2743    #[test]
2744    fn incr_on_string_value_returns_error() {
2745        let r = rt();
2746        let ops = super::KvAtomicOps::new(&r);
2747        ops.set(
2748            CollectionModel::Kv,
2749            "kv_default",
2750            "name",
2751            crate::storage::schema::Value::text("alice"),
2752            None,
2753            false,
2754        )
2755        .unwrap();
2756        let err = ops
2757            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2758            .unwrap_err();
2759        assert!(err.to_string().contains("non-integer"));
2760    }
2761
2762    // --- CAS tests ---
2763
2764    #[test]
2765    fn cas_matching_value_succeeds() {
2766        let r = rt();
2767        let ops = super::KvAtomicOps::new(&r);
2768        ops.set(
2769            CollectionModel::Kv,
2770            "kv_default",
2771            "lock",
2772            crate::storage::schema::Value::text("free"),
2773            None,
2774            false,
2775        )
2776        .unwrap();
2777        let (ok, prev) = ops
2778            .cas(
2779                CollectionModel::Kv,
2780                "kv_default",
2781                "lock",
2782                Some(&crate::storage::schema::Value::text("free")),
2783                crate::storage::schema::Value::text("held"),
2784                None,
2785            )
2786            .unwrap();
2787        assert!(ok);
2788        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2789        // Value actually changed.
2790        assert_eq!(
2791            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2792            Some(crate::storage::schema::Value::text("held"))
2793        );
2794    }
2795
2796    #[test]
2797    fn concurrent_cas_allows_one_success_per_round() {
2798        const THREADS: usize = 8;
2799        const ROUNDS: usize = 100;
2800
2801        let runtime = std::sync::Arc::new(rt());
2802        let ops = super::KvAtomicOps::new(&runtime);
2803        ops.set(
2804            CollectionModel::Kv,
2805            "kv_default",
2806            "cas_counter",
2807            crate::storage::schema::Value::Integer(0),
2808            None,
2809            false,
2810        )
2811        .unwrap();
2812
2813        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2814        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2815        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2816        let mut handles = Vec::new();
2817
2818        for _ in 0..THREADS {
2819            let runtime = std::sync::Arc::clone(&runtime);
2820            let start_round = std::sync::Arc::clone(&start_round);
2821            let finish_round = std::sync::Arc::clone(&finish_round);
2822            let successes = std::sync::Arc::clone(&successes);
2823            handles.push(std::thread::spawn(move || {
2824                let ops = super::KvAtomicOps::new(&runtime);
2825                for round in 0..ROUNDS {
2826                    start_round.wait();
2827                    let (ok, _) = ops
2828                        .cas(
2829                            CollectionModel::Kv,
2830                            "kv_default",
2831                            "cas_counter",
2832                            Some(&crate::storage::schema::Value::Integer(round as i64)),
2833                            crate::storage::schema::Value::Integer((round + 1) as i64),
2834                            None,
2835                        )
2836                        .unwrap();
2837                    if ok {
2838                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2839                    }
2840                    finish_round.wait();
2841                }
2842            }));
2843        }
2844
2845        for handle in handles {
2846            handle.join().expect("worker should finish");
2847        }
2848
2849        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2850        assert_eq!(
2851            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2852                .unwrap(),
2853            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2854        );
2855    }
2856
2857    #[test]
2858    fn cas_mismatching_value_fails() {
2859        let r = rt();
2860        let ops = super::KvAtomicOps::new(&r);
2861        ops.set(
2862            CollectionModel::Kv,
2863            "kv_default",
2864            "lock",
2865            crate::storage::schema::Value::text("free"),
2866            None,
2867            false,
2868        )
2869        .unwrap();
2870        let (ok, current) = ops
2871            .cas(
2872                CollectionModel::Kv,
2873                "kv_default",
2874                "lock",
2875                Some(&crate::storage::schema::Value::text("held")),
2876                crate::storage::schema::Value::text("worker-7"),
2877                None,
2878            )
2879            .unwrap();
2880        assert!(!ok);
2881        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2882        // Value unchanged.
2883        assert_eq!(
2884            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2885            Some(crate::storage::schema::Value::text("free"))
2886        );
2887    }
2888
2889    #[test]
2890    fn cas_expect_null_on_missing_key_creates() {
2891        let r = rt();
2892        let ops = super::KvAtomicOps::new(&r);
2893        let (ok, prev) = ops
2894            .cas(
2895                CollectionModel::Kv,
2896                "kv_default",
2897                "new_key",
2898                None,
2899                crate::storage::schema::Value::text("created"),
2900                None,
2901            )
2902            .unwrap();
2903        assert!(ok);
2904        assert_eq!(prev, None);
2905        assert_eq!(
2906            ops.get(CollectionModel::Kv, "kv_default", "new_key")
2907                .unwrap(),
2908            Some(crate::storage::schema::Value::text("created"))
2909        );
2910    }
2911
2912    #[test]
2913    fn cas_expect_null_on_existing_key_fails() {
2914        let r = rt();
2915        let ops = super::KvAtomicOps::new(&r);
2916        ops.set(
2917            CollectionModel::Kv,
2918            "kv_default",
2919            "taken",
2920            crate::storage::schema::Value::text("worker-1"),
2921            None,
2922            false,
2923        )
2924        .unwrap();
2925        let (ok, current) = ops
2926            .cas(
2927                CollectionModel::Kv,
2928                "kv_default",
2929                "taken",
2930                None,
2931                crate::storage::schema::Value::text("worker-2"),
2932                None,
2933            )
2934            .unwrap();
2935        assert!(!ok);
2936        assert_eq!(
2937            current,
2938            Some(crate::storage::schema::Value::text("worker-1"))
2939        );
2940    }
2941
2942    #[test]
2943    fn cas_via_sql_roundtrip() {
2944        let r = rt();
2945        // Seed value.
2946        r.execute_query("KV PUT lock = 'free'").unwrap();
2947        // CAS: free → held — should succeed.
2948        let res = r
2949            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2950            .unwrap();
2951        let row = &res.result.records[0];
2952        assert_eq!(
2953            row.get("ok"),
2954            Some(&crate::storage::schema::Value::Boolean(true))
2955        );
2956        // CAS: free → held again — should fail (value is now 'held').
2957        let res2 = r
2958            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2959            .unwrap();
2960        let row2 = &res2.result.records[0];
2961        assert_eq!(
2962            row2.get("ok"),
2963            Some(&crate::storage::schema::Value::Boolean(false))
2964        );
2965    }
2966
2967    #[test]
2968    fn cas_expect_null_via_sql() {
2969        let r = rt();
2970        let res = r
2971            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2972            .unwrap();
2973        let row = &res.result.records[0];
2974        assert_eq!(
2975            row.get("ok"),
2976            Some(&crate::storage::schema::Value::Boolean(true))
2977        );
2978        // Second call must fail.
2979        let res2 = r
2980            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2981            .unwrap();
2982        let row2 = &res2.result.records[0];
2983        assert_eq!(
2984            row2.get("ok"),
2985            Some(&crate::storage::schema::Value::Boolean(false))
2986        );
2987    }
2988
2989    #[test]
2990    fn incr_via_sql_roundtrip() {
2991        let r = rt();
2992        let res = r.execute_query("KV INCR hits").unwrap();
2993        let row = &res.result.records[0];
2994        assert_eq!(
2995            row.get("value"),
2996            Some(&crate::storage::schema::Value::Integer(1))
2997        );
2998        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2999        let row2 = &res2.result.records[0];
3000        assert_eq!(
3001            row2.get("value"),
3002            Some(&crate::storage::schema::Value::Integer(5))
3003        );
3004    }
3005
3006    #[test]
3007    fn concurrent_self_referential_update_is_atomic() {
3008        const THREADS: usize = 8;
3009        const ITERS: usize = 100;
3010
3011        let runtime = std::sync::Arc::new(rt());
3012        runtime
3013            .execute_query("CREATE TABLE counters (id INT, n INT)")
3014            .unwrap();
3015        runtime
3016            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3017            .unwrap();
3018
3019        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3020        let mut handles = Vec::new();
3021        for _ in 0..THREADS {
3022            let runtime = std::sync::Arc::clone(&runtime);
3023            let barrier = std::sync::Arc::clone(&barrier);
3024            handles.push(std::thread::spawn(move || {
3025                barrier.wait();
3026                for _ in 0..ITERS {
3027                    runtime
3028                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3029                        .unwrap();
3030                }
3031            }));
3032        }
3033
3034        for handle in handles {
3035            handle.join().expect("worker should finish");
3036        }
3037
3038        let selected = runtime
3039            .execute_query("SELECT n FROM counters WHERE id = 1")
3040            .unwrap();
3041        assert_eq!(
3042            selected.result.records[0].get("n"),
3043            Some(&crate::storage::schema::Value::Integer(
3044                (THREADS * ITERS) as i64
3045            ))
3046        );
3047    }
3048
3049    #[test]
3050    fn watch_stream_delivers_key_events_in_lsn_order() {
3051        let r = rt();
3052        let ops = super::KvAtomicOps::new(&r);
3053        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3054
3055        ops.set(
3056            CollectionModel::Kv,
3057            "kv_default",
3058            "seq",
3059            crate::storage::schema::Value::Integer(1),
3060            None,
3061            false,
3062        )
3063        .unwrap();
3064        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3065            .unwrap();
3066        ops.delete(CollectionModel::Kv, "kv_default", "seq")
3067            .unwrap();
3068        ops.set(
3069            CollectionModel::Kv,
3070            "kv_default",
3071            "seq",
3072            crate::storage::schema::Value::Integer(9),
3073            None,
3074            false,
3075        )
3076        .unwrap();
3077
3078        let mut events = Vec::new();
3079        while let Some(event) = stream.poll_next() {
3080            events.push(event);
3081            if events.len() == 4 {
3082                break;
3083            }
3084        }
3085
3086        assert_eq!(events.len(), 4);
3087        assert_eq!(
3088            events[0].op,
3089            crate::replication::cdc::ChangeOperation::Insert
3090        );
3091        assert_eq!(
3092            events[1].op,
3093            crate::replication::cdc::ChangeOperation::Update
3094        );
3095        assert_eq!(
3096            events[2].op,
3097            crate::replication::cdc::ChangeOperation::Delete
3098        );
3099        assert_eq!(
3100            events[3].op,
3101            crate::replication::cdc::ChangeOperation::Insert
3102        );
3103        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3104    }
3105
3106    #[test]
3107    fn watch_prefix_stream_delivers_matching_events_only() {
3108        let r = rt();
3109        let ops = super::KvAtomicOps::new(&r);
3110        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3111
3112        ops.set(
3113            CollectionModel::Kv,
3114            "kv_default",
3115            "acct:1",
3116            crate::storage::schema::Value::Integer(1),
3117            None,
3118            false,
3119        )
3120        .unwrap();
3121        ops.set(
3122            CollectionModel::Kv,
3123            "kv_default",
3124            "session:1",
3125            crate::storage::schema::Value::Integer(2),
3126            None,
3127            false,
3128        )
3129        .unwrap();
3130        ops.set(
3131            CollectionModel::Kv,
3132            "kv_default",
3133            "acct:2",
3134            crate::storage::schema::Value::Integer(3),
3135            None,
3136            false,
3137        )
3138        .unwrap();
3139
3140        let first = stream.poll_next().expect("first prefix event");
3141        let second = stream.poll_next().expect("second prefix event");
3142        assert_eq!(first.key, "acct:1");
3143        assert_eq!(second.key, "acct:2");
3144        assert!(stream.poll_next().is_none());
3145    }
3146
3147    #[test]
3148    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3149        let r = rt();
3150        let ops = super::KvAtomicOps::new(&r);
3151        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3152
3153        let mut last_seen_lsn = 0;
3154        for value in 0..5 {
3155            ops.set(
3156                CollectionModel::Kv,
3157                "kv_default",
3158                "resume",
3159                crate::storage::schema::Value::Integer(value),
3160                None,
3161                false,
3162            )
3163            .unwrap();
3164            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3165        }
3166        drop(stream);
3167
3168        for value in 5..55 {
3169            ops.set(
3170                CollectionModel::Kv,
3171                "kv_default",
3172                "resume",
3173                crate::storage::schema::Value::Integer(value),
3174                None,
3175                false,
3176            )
3177            .unwrap();
3178        }
3179
3180        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3181        let mut lsns = Vec::new();
3182        while let Some(event) = resumed.poll_next() {
3183            lsns.push(event.lsn);
3184            if lsns.len() == 50 {
3185                break;
3186            }
3187        }
3188
3189        assert_eq!(lsns.len(), 50);
3190        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3191        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3192        assert!(resumed.poll_next().is_none());
3193    }
3194
3195    #[test]
3196    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3197        let r = rt();
3198        let ops = super::KvAtomicOps::new(&r);
3199        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3200
3201        for value in 0..10_000 {
3202            ops.set(
3203                CollectionModel::Kv,
3204                "kv_default",
3205                "slow",
3206                crate::storage::schema::Value::Integer(value),
3207                None,
3208                false,
3209            )
3210            .unwrap();
3211        }
3212
3213        let event = stream.poll_next().expect("tail event after drops");
3214        assert!(event.lsn > 1);
3215        assert!(event.dropped_event_count > 0);
3216        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3217        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3218    }
3219
3220    #[test]
3221    fn watch_stream_idle_timeout_closes_subscription() {
3222        let r = rt();
3223        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3224            .unwrap();
3225
3226        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3227        assert_eq!(r.stats().kv.watch_streams_active, 1);
3228        std::thread::sleep(std::time::Duration::from_millis(5));
3229
3230        assert!(stream.poll_next().is_none());
3231        assert_eq!(r.stats().kv.watch_streams_active, 0);
3232    }
3233
3234    #[test]
3235    fn watch_stream_does_not_emit_rolled_back_put() {
3236        let r = rt();
3237        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3238
3239        r.execute_query("BEGIN").unwrap();
3240        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3241        r.execute_query("ROLLBACK").unwrap();
3242
3243        assert!(stream.poll_next().is_none());
3244    }
3245}