Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        self.ensure_kv_collection(collection)?;
330
331        let existing = self.runtime.get_kv(collection, key)?;
332        let current: i64 = match existing.as_ref() {
333            None => 0,
334            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
335            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
336            Some((other, _)) => {
337                return Err(RedDBError::Internal(format!(
338                    "INCR on non-integer value: {:?}",
339                    other
340                )));
341            }
342        };
343
344        let next = current
345            .checked_add(by)
346            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
347
348        // Delete then re-create so TTL is refreshed.
349        if existing.is_some() {
350            self.runtime.delete_kv(collection, key)?;
351        }
352
353        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
354            .map(|m| m.fields.into_iter().collect())
355            .unwrap_or_default();
356
357        let output = self
358            .runtime
359            .create_kv(crate::application::entity::CreateKvInput {
360                collection: collection.to_string(),
361                key: key.to_string(),
362                value: crate::storage::schema::Value::Integer(next),
363                metadata: meta_vec,
364            })?;
365        self.runtime
366            .inner
367            .kv_tag_index
368            .replace(collection, key, output.id, &[]);
369
370        self.runtime.record_kv_watch_event(
371            if existing.is_some() {
372                crate::replication::cdc::ChangeOperation::Update
373            } else {
374                crate::replication::cdc::ChangeOperation::Insert
375            },
376            collection,
377            key,
378            output.id.raw(),
379            existing
380                .as_ref()
381                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
382            Some(crate::presentation::entity_json::storage_value_to_json(
383                &crate::storage::schema::Value::Integer(next),
384            )),
385        );
386
387        self.runtime.inner.kv_stats.incr_incrs();
388        Ok(next)
389    }
390
391    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
392    ///
393    /// Returns `(ok, current)`:
394    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
395    /// - `ok = false` → swap skipped; `current` holds the actual current value.
396    ///
397    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
398    pub fn cas(
399        &self,
400        model: crate::catalog::CollectionModel,
401        collection: &str,
402        key: &str,
403        expected: Option<&crate::storage::schema::Value>,
404        new_value: crate::storage::schema::Value,
405        ttl_ms: Option<u64>,
406    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
407        if model == crate::catalog::CollectionModel::Vault {
408            return Err(RedDBError::InvalidOperation(
409                "VAULT CAS is not supported for sealed secrets".to_string(),
410            ));
411        }
412        self.ensure_kv_collection(collection)?;
413
414        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
415
416        let matches = match (&current, expected) {
417            (None, None) => true,
418            (Some(cur), Some(exp)) => cur == exp,
419            _ => false,
420        };
421
422        if !matches {
423            self.runtime.inner.kv_stats.incr_cas_conflict();
424            return Ok((false, current));
425        }
426
427        // Swap: delete old entry (if present), write new one.
428        if current.is_some() {
429            self.runtime.delete_kv(collection, key)?;
430        }
431
432        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
433            .map(|m| m.fields.into_iter().collect())
434            .unwrap_or_default();
435
436        let output = self
437            .runtime
438            .create_kv(crate::application::entity::CreateKvInput {
439                collection: collection.to_string(),
440                key: key.to_string(),
441                value: new_value.clone(),
442                metadata: meta_vec,
443            })?;
444        self.runtime
445            .inner
446            .kv_tag_index
447            .replace(collection, key, output.id, &[]);
448
449        self.runtime.record_kv_watch_event(
450            if current.is_some() {
451                crate::replication::cdc::ChangeOperation::Update
452            } else {
453                crate::replication::cdc::ChangeOperation::Insert
454            },
455            collection,
456            key,
457            output.id.raw(),
458            current
459                .as_ref()
460                .map(crate::presentation::entity_json::storage_value_to_json),
461            Some(crate::presentation::entity_json::storage_value_to_json(
462                &new_value,
463            )),
464        );
465
466        self.runtime.inner.kv_stats.incr_cas_success();
467        Ok((true, current))
468    }
469
470    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
471        self.runtime
472            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
473        self.runtime.check_kv_invalidate_policy(collection)?;
474        self.ensure_kv_collection(collection)?;
475        let entries = self
476            .runtime
477            .inner
478            .kv_tag_index
479            .entries_for_tags(collection, tags);
480        if entries.is_empty() {
481            return Ok(0);
482        }
483
484        let store = self.runtime.inner.db.store();
485        let mut removed = 0usize;
486        for (key, id) in entries {
487            let before = store
488                .get(collection, id)
489                .and_then(|entity| kv_value_from_entity(&entity));
490            let deleted = store
491                .delete(collection, id)
492                .map_err(|err| RedDBError::Internal(err.to_string()))?;
493            if deleted {
494                store.context_index().remove_entity(id);
495                self.runtime.inner.kv_tag_index.remove(collection, &key);
496                self.runtime.record_kv_watch_event(
497                    crate::replication::cdc::ChangeOperation::Delete,
498                    collection,
499                    &key,
500                    id.raw(),
501                    before
502                        .as_ref()
503                        .map(crate::presentation::entity_json::storage_value_to_json),
504                    None,
505                );
506                removed += 1;
507            }
508        }
509        if removed > 0 {
510            self.runtime.inner.kv_stats.incr_deletes();
511        }
512        Ok(removed)
513    }
514
515    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
516        self.runtime
517            .inner
518            .kv_tag_index
519            .tags_for_key(collection, key)
520    }
521
522    /// Auto-create a KV collection if it does not exist yet.
523    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
524        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
525    }
526
527    fn ensure_keyed_collection(
528        &self,
529        model: crate::catalog::CollectionModel,
530        collection: &str,
531    ) -> RedDBResult<()> {
532        let store = self.runtime.inner.db.store();
533        if store.get_collection(collection).is_some() {
534            return self.ensure_declared_model(model, collection);
535        }
536        if model != crate::catalog::CollectionModel::Kv {
537            return Err(RedDBError::NotFound(format!(
538                "{} collection '{collection}' does not exist",
539                keyed_model_name(model)
540            )));
541        }
542        // Check config gate: red.config.kv.default_collection (default = true).
543        let auto_create = self
544            .runtime
545            .config_bool("red.config.kv.default_collection", true);
546        if !auto_create {
547            return Err(RedDBError::NotFound(format!(
548                "kv collection '{collection}' does not exist and auto-create is disabled \
549                 (red.config.kv.default_collection = false)"
550            )));
551        }
552        store
553            .create_collection(collection)
554            .map_err(|err| RedDBError::Internal(err.to_string()))?;
555        self.runtime
556            .inner
557            .db
558            .save_collection_contract(kv_collection_contract(collection))
559            .map_err(|err| RedDBError::Internal(err.to_string()))?;
560        Ok(())
561    }
562
563    fn get_entry(
564        &self,
565        model: crate::catalog::CollectionModel,
566        collection: &str,
567        key: &str,
568    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
569        self.ensure_declared_model(model, collection)?;
570        let store = self.runtime.inner.db.store();
571        let Some(manager) = store.get_collection(collection) else {
572            return Ok(None);
573        };
574        let entities = manager.query_all(|_| true);
575        for entity in entities {
576            if let crate::storage::EntityData::Row(ref row) = entity.data {
577                if let Some(ref named) = row.named {
578                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
579                        if &**k == key {
580                            let value = named
581                                .get("value")
582                                .cloned()
583                                .unwrap_or(crate::storage::schema::Value::Null);
584                            return Ok(Some((value, entity.id)));
585                        }
586                    }
587                }
588            }
589        }
590        Ok(None)
591    }
592
593    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
594        self.vault_versions(collection, key)
595            .map(super::keyed_spine::latest_version)
596    }
597
598    fn get_vault_entry_version(
599        &self,
600        collection: &str,
601        key: &str,
602        version: i64,
603    ) -> RedDBResult<Option<VaultEntry>> {
604        Ok(self
605            .vault_versions(collection, key)?
606            .into_iter()
607            .find(|entry| entry.version == version))
608    }
609
610    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
611        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
612        let store = self.runtime.inner.db.store();
613        let Some(manager) = store.get_collection(collection) else {
614            return Ok(Vec::new());
615        };
616        let entities = manager.query_all(|_| true);
617        let mut versions = Vec::new();
618        for entity in entities {
619            let crate::storage::EntityData::Row(ref row) = entity.data else {
620                continue;
621            };
622            let Some(version) =
623                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
624            else {
625                continue;
626            };
627            if version.key != key {
628                continue;
629            }
630            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
631            versions.push(VaultEntry::from_keyed_row(
632                version,
633                metadata,
634                entity.created_at,
635                entity.updated_at,
636                entity.sequence_id,
637            ));
638        }
639        Ok(versions)
640    }
641
642    fn latest_vault_entries(
643        &self,
644        collection: &str,
645        prefix: Option<&str>,
646    ) -> RedDBResult<Vec<VaultEntry>> {
647        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
648        let store = self.runtime.inner.db.store();
649        let Some(manager) = store.get_collection(collection) else {
650            return Ok(Vec::new());
651        };
652        let mut versions = Vec::new();
653        for entity in manager.query_all(|_| true) {
654            let crate::storage::EntityData::Row(ref row) = entity.data else {
655                continue;
656            };
657            let Some(version) =
658                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
659            else {
660                continue;
661            };
662            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
663            let entry = VaultEntry::from_keyed_row(
664                version,
665                metadata,
666                entity.created_at,
667                entity.updated_at,
668                entity.sequence_id,
669            );
670            versions.push(entry);
671        }
672        Ok(super::keyed_spine::latest_versions(versions, prefix))
673    }
674
675    fn append_vault_version(
676        &self,
677        collection: &str,
678        key: &str,
679        value: crate::storage::schema::Value,
680        op: &str,
681        tombstone: bool,
682        tags: &[String],
683    ) -> RedDBResult<VaultEntry> {
684        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
685        let version = self
686            .get_vault_entry(collection, key)?
687            .map(|entry| entry.version)
688            .unwrap_or(0)
689            + 1;
690        let stored_value = if tombstone {
691            crate::storage::schema::Value::Null
692        } else {
693            self.runtime.seal_vault_value(collection, value)?
694        };
695        let now = current_unix_ms() as i64;
696        let fields = vec![
697            (
698                "key".to_string(),
699                crate::storage::schema::Value::text(key.to_string()),
700            ),
701            ("value".to_string(), stored_value),
702            (
703                "version".to_string(),
704                crate::storage::schema::Value::Integer(version),
705            ),
706            (
707                "tombstone".to_string(),
708                crate::storage::schema::Value::Boolean(tombstone),
709            ),
710            (
711                "op".to_string(),
712                crate::storage::schema::Value::text(op.to_string()),
713            ),
714            (
715                "created_at_ms".to_string(),
716                crate::storage::schema::Value::Integer(now),
717            ),
718        ];
719        let mut row = crate::storage::RowData::new(Vec::new());
720        row.named = Some(fields.into_iter().collect());
721        let entity = crate::storage::UnifiedEntity::new(
722            crate::storage::EntityId::new(0),
723            crate::storage::EntityKind::TableRow {
724                table: std::sync::Arc::from(collection),
725                row_id: 0,
726            },
727            crate::storage::EntityData::Row(row),
728        );
729        let id = self
730            .runtime
731            .inner
732            .db
733            .store()
734            .insert(collection, entity)
735            .map_err(|err| RedDBError::Internal(err.to_string()))?;
736        if !tags.is_empty() {
737            self.runtime
738                .inner
739                .db
740                .store()
741                .set_metadata(
742                    collection,
743                    id,
744                    Metadata::with_fields(vault_tags_metadata(tags)),
745                )
746                .map_err(|err| RedDBError::Internal(err.to_string()))?;
747            self.runtime
748                .inner
749                .kv_tag_index
750                .replace(collection, key, id, tags);
751        }
752        self.get_vault_entry_version(collection, key, version)?
753            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
754    }
755
756    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
757        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
758        let versions = self.vault_versions(collection, key)?;
759        let store = self.runtime.inner.db.store();
760        let mut purged = 0usize;
761        for entry in versions {
762            if store
763                .delete(collection, entry.id)
764                .map_err(|err| RedDBError::Internal(err.to_string()))?
765            {
766                store.context_index().remove_entity(entry.id);
767                purged += 1;
768            }
769        }
770        Ok(purged)
771    }
772
773    fn ensure_declared_model(
774        &self,
775        model: crate::catalog::CollectionModel,
776        collection: &str,
777    ) -> RedDBResult<()> {
778        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
779            return Ok(());
780        };
781        if contract.declared_model == model
782            || contract.declared_model == crate::catalog::CollectionModel::Mixed
783        {
784            return Ok(());
785        }
786        Err(RedDBError::InvalidOperation(format!(
787            "collection '{}' is declared as '{}' and does not allow '{}' operations",
788            collection,
789            keyed_model_name(contract.declared_model),
790            keyed_model_name(model)
791        )))
792    }
793}
794
795impl RedDBRuntime {
796    pub(crate) fn seal_vault_value(
797        &self,
798        collection: &str,
799        value: crate::storage::schema::Value,
800    ) -> RedDBResult<crate::storage::schema::Value> {
801        let key = self.vault_encryption_key(collection)?;
802        let plaintext = value.to_bytes();
803        let nonce_bytes = crate::auth::store::random_bytes(12);
804        let mut nonce = [0u8; 12];
805        nonce.copy_from_slice(&nonce_bytes[..12]);
806        let aad = format!("reddb.vault.{collection}");
807        let ciphertext =
808            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
809        let mut payload = Vec::with_capacity(12 + ciphertext.len());
810        payload.extend_from_slice(&nonce);
811        payload.extend_from_slice(&ciphertext);
812        Ok(crate::storage::schema::Value::Secret(payload))
813    }
814
815    fn vault_key_available(&self, collection: &str) -> bool {
816        self.vault_encryption_key(collection).is_ok()
817    }
818
819    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
820        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
821            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
822        })?;
823        if !auth_store.is_vault_backed() {
824            return Err(RedDBError::Query(
825                "vault sealed_unavailable: key provider is sealed".to_string(),
826            ));
827        }
828
829        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
830            return decode_vault_key(&hex_key);
831        }
832        auth_store.vault_secret_key().ok_or_else(|| {
833            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
834        })
835    }
836
837    fn unseal_vault_value(
838        &self,
839        collection: &str,
840        sealed: &crate::storage::schema::Value,
841    ) -> RedDBResult<crate::storage::schema::Value> {
842        let crate::storage::schema::Value::Secret(payload) = sealed else {
843            return Err(RedDBError::Query(
844                "vault unseal failed: stored value is not sealed".to_string(),
845            ));
846        };
847        if payload.len() < 12 {
848            return Err(RedDBError::Query(
849                "vault unseal failed: sealed payload is truncated".to_string(),
850            ));
851        }
852        let key = self.vault_encryption_key(collection)?;
853        let mut nonce = [0u8; 12];
854        nonce.copy_from_slice(&payload[..12]);
855        let aad = format!("reddb.vault.{collection}");
856        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
857            &key,
858            &nonce,
859            aad.as_bytes(),
860            &payload[12..],
861        )
862        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
863        let (value, consumed) =
864            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
865                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
866            })?;
867        if consumed != plaintext.len() {
868            return Err(RedDBError::Query(
869                "vault unseal failed: trailing plaintext bytes".to_string(),
870            ));
871        }
872        Ok(value)
873    }
874
875    fn vault_target_resource(collection: &str, key: &str) -> String {
876        if collection == "red.vault" {
877            return format!("red.vault/{}", key.to_ascii_lowercase());
878        }
879        format!("{collection}.{key}")
880    }
881
882    fn current_vault_actor() -> String {
883        current_auth_identity()
884            .map(|(principal, _)| principal)
885            .unwrap_or_else(|| "anonymous".to_string())
886    }
887
888    fn vault_request_id() -> String {
889        let conn_id = current_connection_id();
890        if conn_id == 0 {
891            "embedded".to_string()
892        } else {
893            format!("conn-{conn_id}")
894        }
895    }
896
897    fn check_vault_capability(
898        &self,
899        action: &str,
900        collection: &str,
901        key: &str,
902    ) -> Result<(), String> {
903        let Some(auth_store) = self.inner.auth_store.read().clone() else {
904            return Ok(());
905        };
906        if !auth_store.iam_authorization_enabled() {
907            return Ok(());
908        }
909        let Some((principal, role)) = current_auth_identity() else {
910            return Err(
911                "IAM authorization is enabled; vault capability check requires an authenticated principal"
912                    .to_string(),
913            );
914        };
915        let tenant = current_tenant();
916        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
917        let mut resource = crate::auth::policies::ResourceRef::new(
918            "vault",
919            Self::vault_target_resource(collection, key),
920        );
921        if let Some(ref tenant) = tenant {
922            resource = resource.with_tenant(tenant.clone());
923        }
924        let ctx = crate::auth::policies::EvalContext {
925            principal_tenant: tenant.clone(),
926            current_tenant: tenant,
927            peer_ip: None,
928            mfa_present: false,
929            now_ms: crate::utils::now_unix_millis() as u128,
930            principal_is_admin_role: role == crate::auth::Role::Admin,
931        };
932        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
933            Ok(())
934        } else {
935            Err(format!(
936                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
937                principal,
938                action,
939                Self::vault_target_resource(collection, key)
940            ))
941        }
942    }
943
944    fn check_system_vault_capability(
945        &self,
946        action: &str,
947        collection: &str,
948        key: &str,
949    ) -> Result<(), String> {
950        if collection != "red.vault" {
951            return Ok(());
952        }
953        self.check_vault_capability(action, collection, key)
954    }
955
956    fn audit_vault_unseal(
957        &self,
958        collection: &str,
959        key: &str,
960        outcome: crate::runtime::audit_log::Outcome,
961        reason: &str,
962        entry: Option<&VaultEntry>,
963    ) {
964        let actor = Self::current_vault_actor();
965        let request_id = Self::vault_request_id();
966        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
967            .principal(actor.clone())
968            .source(crate::runtime::audit_log::AuditAuthSource::Password)
969            .resource(format!(
970                "vault:{}",
971                Self::vault_target_resource(collection, key)
972            ))
973            .outcome(outcome)
974            .correlation_id(request_id.clone())
975            .fields([
976                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
977                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
978                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
979                crate::runtime::audit_log::AuditFieldEscaper::field(
980                    "target",
981                    Self::vault_target_resource(collection, key),
982                ),
983                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
984                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
985                crate::runtime::audit_log::AuditFieldEscaper::field(
986                    "connection_id",
987                    current_connection_id(),
988                ),
989            ]);
990        if let Some(tenant) = current_tenant() {
991            builder = builder.tenant(tenant);
992        }
993        if let Some(entry) = entry {
994            builder = builder.fields([
995                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
996                crate::runtime::audit_log::AuditFieldEscaper::field(
997                    "sequence_id",
998                    entry.sequence_id,
999                ),
1000            ]);
1001        }
1002        self.audit_log().record_event(builder.build());
1003    }
1004
1005    fn audit_vault_lifecycle(
1006        &self,
1007        operation: &str,
1008        collection: &str,
1009        key: &str,
1010        outcome: crate::runtime::audit_log::Outcome,
1011        reason: &str,
1012        entry: Option<&VaultEntry>,
1013    ) {
1014        let actor = Self::current_vault_actor();
1015        let request_id = Self::vault_request_id();
1016        let mut builder =
1017            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1018                .principal(actor.clone())
1019                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1020                .resource(format!(
1021                    "vault:{}",
1022                    Self::vault_target_resource(collection, key)
1023                ))
1024                .outcome(outcome)
1025                .correlation_id(request_id.clone())
1026                .fields([
1027                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1028                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1029                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1030                    crate::runtime::audit_log::AuditFieldEscaper::field(
1031                        "target",
1032                        Self::vault_target_resource(collection, key),
1033                    ),
1034                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1035                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1036                    crate::runtime::audit_log::AuditFieldEscaper::field(
1037                        "connection_id",
1038                        current_connection_id(),
1039                    ),
1040                ]);
1041        if let Some(tenant) = current_tenant() {
1042            builder = builder.tenant(tenant);
1043        }
1044        if let Some(entry) = entry {
1045            builder = builder.fields([
1046                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1047                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1048                crate::runtime::audit_log::AuditFieldEscaper::field(
1049                    "sequence_id",
1050                    entry.sequence_id,
1051                ),
1052            ]);
1053        }
1054        self.audit_log().record_event(builder.build());
1055    }
1056
1057    pub(crate) fn resolve_vault_secret_value(
1058        &self,
1059        collection: &str,
1060        key: &str,
1061    ) -> RedDBResult<Value> {
1062        let ops = KvAtomicOps::new(self);
1063        let entry = ops.get_vault_entry(collection, key)?;
1064        if let Err(reason) = self.check_vault_capability("vault:unseal", collection, key) {
1065            self.audit_vault_unseal(
1066                collection,
1067                key,
1068                crate::runtime::audit_log::Outcome::Denied,
1069                &reason,
1070                entry.as_ref(),
1071            );
1072            return Err(RedDBError::Query(reason));
1073        }
1074        let Some(entry) = entry else {
1075            let reason = "not_found";
1076            self.audit_vault_unseal(
1077                collection,
1078                key,
1079                crate::runtime::audit_log::Outcome::Denied,
1080                reason,
1081                None,
1082            );
1083            return Err(RedDBError::NotFound(format!(
1084                "vault secret '{}.{}' not found",
1085                collection, key
1086            )));
1087        };
1088        if entry.tombstone {
1089            let reason = "deleted";
1090            self.audit_vault_unseal(
1091                collection,
1092                key,
1093                crate::runtime::audit_log::Outcome::Denied,
1094                reason,
1095                Some(&entry),
1096            );
1097            return Err(RedDBError::NotFound(format!(
1098                "vault secret '{}.{}' is deleted",
1099                collection, key
1100            )));
1101        }
1102        match self.unseal_vault_value(collection, &entry.value) {
1103            Ok(value) => {
1104                self.audit_vault_unseal(
1105                    collection,
1106                    key,
1107                    crate::runtime::audit_log::Outcome::Success,
1108                    "ok",
1109                    Some(&entry),
1110                );
1111                Ok(value)
1112            }
1113            Err(err) => {
1114                let reason = err.to_string();
1115                self.audit_vault_unseal(
1116                    collection,
1117                    key,
1118                    crate::runtime::audit_log::Outcome::Error,
1119                    &reason,
1120                    Some(&entry),
1121                );
1122                Err(err)
1123            }
1124        }
1125    }
1126
1127    /// Dispatch a `KV PUT / GET / DELETE` command.
1128    pub fn execute_kv_command(
1129        &self,
1130        raw_query: &str,
1131        cmd: &crate::storage::query::ast::KvCommand,
1132    ) -> RedDBResult<RuntimeQueryResult> {
1133        use crate::storage::query::ast::KvCommand;
1134
1135        let ops = KvAtomicOps::new(self);
1136
1137        match cmd {
1138            KvCommand::Put {
1139                model,
1140                collection,
1141                key,
1142                value,
1143                ttl_ms,
1144                tags,
1145                if_not_exists,
1146            } => {
1147                if *model == crate::catalog::CollectionModel::Vault {
1148                    self.check_system_vault_capability("vault:write", collection, key)
1149                        .map_err(RedDBError::Query)?;
1150                }
1151                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1152                let (created, id) = ops.set_with_tags_for_model(
1153                    *model,
1154                    collection,
1155                    key,
1156                    value.clone(),
1157                    *ttl_ms,
1158                    tags,
1159                    *if_not_exists,
1160                )?;
1161
1162                let mut result = UnifiedResult::with_columns(vec![
1163                    "ok".into(),
1164                    "collection".into(),
1165                    "key".into(),
1166                    "id".into(),
1167                    "created".into(),
1168                    "tags".into(),
1169                ]);
1170                let mut record = UnifiedRecord::new();
1171                record.set("ok", Value::Boolean(true));
1172                record.set("collection", Value::text(collection.clone()));
1173                record.set("key", Value::text(key.clone()));
1174                record.set("id", Value::Integer(id.raw() as i64));
1175                record.set("created", Value::Boolean(created));
1176                record.set("tags", kv_tags_value(tags));
1177                result.push(record);
1178
1179                Ok(RuntimeQueryResult {
1180                    query: raw_query.to_string(),
1181                    mode: crate::storage::query::modes::QueryMode::Sql,
1182                    statement: if *model == crate::catalog::CollectionModel::Vault {
1183                        "vault_put"
1184                    } else {
1185                        "kv_put"
1186                    },
1187                    engine: if *model == crate::catalog::CollectionModel::Vault {
1188                        "vault"
1189                    } else {
1190                        "kv"
1191                    },
1192                    result,
1193                    affected_rows: 1,
1194                    statement_type: if created { "insert" } else { "update" },
1195                })
1196            }
1197            KvCommand::InvalidateTags { collection, tags } => {
1198                let invalidated = ops.invalidate_tags(collection, tags)?;
1199
1200                let mut result = UnifiedResult::with_columns(vec![
1201                    "ok".into(),
1202                    "collection".into(),
1203                    "invalidated".into(),
1204                    "tags".into(),
1205                ]);
1206                let mut record = UnifiedRecord::new();
1207                record.set("ok", Value::Boolean(true));
1208                record.set("collection", Value::text(collection.clone()));
1209                record.set("invalidated", Value::Integer(invalidated as i64));
1210                record.set("tags", kv_tags_value(tags));
1211                result.push(record);
1212
1213                Ok(RuntimeQueryResult {
1214                    query: raw_query.to_string(),
1215                    mode: crate::storage::query::modes::QueryMode::Sql,
1216                    statement: "kv_invalidate_tags",
1217                    engine: "kv",
1218                    result,
1219                    affected_rows: invalidated as u64,
1220                    statement_type: "delete",
1221                })
1222            }
1223
1224            KvCommand::Rotate {
1225                collection,
1226                key,
1227                value,
1228                tags,
1229            } => {
1230                self.check_system_vault_capability("vault:write", collection, key)
1231                    .map_err(RedDBError::Query)?;
1232                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1233                let entry = ops.append_vault_version(
1234                    collection,
1235                    key,
1236                    value.clone(),
1237                    "rotate",
1238                    false,
1239                    tags,
1240                )?;
1241                self.record_kv_watch_event(
1242                    crate::replication::cdc::ChangeOperation::Update,
1243                    collection,
1244                    key,
1245                    entry.id.raw(),
1246                    None,
1247                    Some(vault_entry_metadata_json(&entry)),
1248                );
1249                self.audit_vault_lifecycle(
1250                    "rotate",
1251                    collection,
1252                    key,
1253                    crate::runtime::audit_log::Outcome::Success,
1254                    "ok",
1255                    Some(&entry),
1256                );
1257                Ok(vault_write_result(
1258                    raw_query,
1259                    "vault_rotate",
1260                    "update",
1261                    collection,
1262                    key,
1263                    &entry,
1264                    1,
1265                ))
1266            }
1267
1268            KvCommand::List {
1269                model,
1270                collection,
1271                prefix,
1272                limit,
1273                offset,
1274            } => {
1275                if *model != crate::catalog::CollectionModel::Vault {
1276                    return Err(RedDBError::InvalidOperation(
1277                        "LIST is not supported through normal KV command execution".to_string(),
1278                    ));
1279                }
1280                let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1281                entries.sort_by(|left, right| left.key.cmp(&right.key));
1282                let mut result = UnifiedResult::with_columns(vec![
1283                    "collection".into(),
1284                    "key".into(),
1285                    "version".into(),
1286                    "fingerprint".into(),
1287                    "tags".into(),
1288                    "created_at".into(),
1289                    "updated_at".into(),
1290                    "status".into(),
1291                    "tombstone".into(),
1292                    "op".into(),
1293                ]);
1294                for entry in entries
1295                    .into_iter()
1296                    .filter(|entry| {
1297                        self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1298                            .is_ok()
1299                    })
1300                    .skip(*offset)
1301                    .take(limit.unwrap_or(usize::MAX))
1302                {
1303                    push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1304                }
1305                Ok(RuntimeQueryResult {
1306                    query: raw_query.to_string(),
1307                    mode: crate::storage::query::modes::QueryMode::Sql,
1308                    statement: "vault_list",
1309                    engine: "vault",
1310                    result,
1311                    affected_rows: 0,
1312                    statement_type: "select",
1313                })
1314            }
1315
1316            KvCommand::History { collection, key } => {
1317                self.check_vault_capability("vault:read_metadata", collection, key)
1318                    .map_err(RedDBError::Query)?;
1319                let versions =
1320                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1321                let result = vault_history_result(collection, key, &versions);
1322                Ok(RuntimeQueryResult {
1323                    query: raw_query.to_string(),
1324                    mode: crate::storage::query::modes::QueryMode::Sql,
1325                    statement: "vault_history",
1326                    engine: "vault",
1327                    result,
1328                    affected_rows: 0,
1329                    statement_type: "select",
1330                })
1331            }
1332
1333            KvCommand::Purge { collection, key } => {
1334                let entry = ops.get_vault_entry(collection, key)?;
1335                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1336                    self.audit_vault_lifecycle(
1337                        "purge",
1338                        collection,
1339                        key,
1340                        crate::runtime::audit_log::Outcome::Denied,
1341                        &reason,
1342                        entry.as_ref(),
1343                    );
1344                    return Err(RedDBError::Query(reason));
1345                }
1346                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1347                let purged = ops.purge_vault_versions(collection, key)?;
1348                self.audit_vault_lifecycle(
1349                    "purge",
1350                    collection,
1351                    key,
1352                    crate::runtime::audit_log::Outcome::Success,
1353                    "ok",
1354                    entry.as_ref(),
1355                );
1356                let mut result = UnifiedResult::with_columns(vec![
1357                    "ok".into(),
1358                    "collection".into(),
1359                    "key".into(),
1360                    "purged".into(),
1361                ]);
1362                let mut record = UnifiedRecord::new();
1363                record.set("ok", Value::Boolean(true));
1364                record.set("collection", Value::text(collection.clone()));
1365                record.set("key", Value::text(key.clone()));
1366                record.set("purged", Value::Integer(purged as i64));
1367                result.push(record);
1368                Ok(RuntimeQueryResult {
1369                    query: raw_query.to_string(),
1370                    mode: crate::storage::query::modes::QueryMode::Sql,
1371                    statement: "vault_purge",
1372                    engine: "vault",
1373                    result,
1374                    affected_rows: purged as u64,
1375                    statement_type: "delete",
1376                })
1377            }
1378
1379            KvCommand::Get {
1380                model,
1381                collection,
1382                key,
1383            } => {
1384                if *model == crate::catalog::CollectionModel::Vault {
1385                    self.check_vault_capability("vault:read_metadata", collection, key)
1386                        .map_err(RedDBError::Query)?;
1387                    let entry = ops.get_vault_entry(collection, key)?;
1388                    let key_available = self.vault_key_available(collection);
1389                    let result =
1390                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1391                    return Ok(RuntimeQueryResult {
1392                        query: raw_query.to_string(),
1393                        mode: crate::storage::query::modes::QueryMode::Sql,
1394                        statement: "vault_get",
1395                        engine: "vault",
1396                        result,
1397                        affected_rows: 0,
1398                        statement_type: "select",
1399                    });
1400                }
1401
1402                let value = ops.get(*model, collection, key)?;
1403                let mut result = UnifiedResult::with_columns(vec![
1404                    "collection".into(),
1405                    "key".into(),
1406                    "value".into(),
1407                    "tags".into(),
1408                ]);
1409                let mut record = UnifiedRecord::new();
1410                record.set("collection", Value::text(collection.clone()));
1411                record.set("key", Value::text(key.clone()));
1412                record.set(
1413                    "value",
1414                    value.unwrap_or(crate::storage::schema::Value::Null),
1415                );
1416                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1417                result.push(record);
1418
1419                Ok(RuntimeQueryResult {
1420                    query: raw_query.to_string(),
1421                    mode: crate::storage::query::modes::QueryMode::Sql,
1422                    statement: "kv_get",
1423                    engine: "kv",
1424                    result,
1425                    affected_rows: 0,
1426                    statement_type: "select",
1427                })
1428            }
1429            KvCommand::Watch {
1430                model,
1431                collection,
1432                key,
1433                prefix,
1434                from_lsn,
1435            } => {
1436                let watch_key = if *prefix {
1437                    format!("{key}.*")
1438                } else {
1439                    key.clone()
1440                };
1441                let endpoint = match from_lsn {
1442                    Some(lsn) => format!(
1443                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1444                        keyed_model_name(*model)
1445                    ),
1446                    None => format!(
1447                        "/collections/{collection}/{}/{watch_key}/watch",
1448                        keyed_model_name(*model)
1449                    ),
1450                };
1451                let mut result = UnifiedResult::with_columns(vec![
1452                    "collection".into(),
1453                    "key".into(),
1454                    "prefix".into(),
1455                    "from_lsn".into(),
1456                    "watch_url".into(),
1457                    "streaming".into(),
1458                ]);
1459                let mut record = UnifiedRecord::new();
1460                record.set("collection", Value::text(collection.clone()));
1461                record.set("key", Value::text(watch_key));
1462                record.set("prefix", Value::Boolean(*prefix));
1463                record.set(
1464                    "from_lsn",
1465                    from_lsn
1466                        .map(Value::UnsignedInteger)
1467                        .unwrap_or(crate::storage::schema::Value::Null),
1468                );
1469                record.set("watch_url", Value::text(endpoint));
1470                record.set("streaming", Value::Boolean(true));
1471                result.push(record);
1472
1473                Ok(RuntimeQueryResult {
1474                    query: raw_query.to_string(),
1475                    mode: crate::storage::query::modes::QueryMode::Sql,
1476                    statement: "kv_watch",
1477                    engine: keyed_model_name(*model),
1478                    result,
1479                    affected_rows: 0,
1480                    statement_type: "stream",
1481                })
1482            }
1483
1484            KvCommand::Unseal {
1485                collection,
1486                key,
1487                version,
1488            } => {
1489                let latest = ops.get_vault_entry(collection, key)?;
1490                let entry = match version {
1491                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1492                    None => latest.clone(),
1493                };
1494                let action = match (version, latest.as_ref()) {
1495                    (Some(requested), Some(latest)) if *requested == latest.version => {
1496                        "vault:unseal"
1497                    }
1498                    (Some(_), _) => "vault:unseal_history",
1499                    _ => "vault:unseal",
1500                };
1501                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1502                    self.audit_vault_unseal(
1503                        collection,
1504                        key,
1505                        crate::runtime::audit_log::Outcome::Denied,
1506                        &reason,
1507                        entry.as_ref(),
1508                    );
1509                    return Err(RedDBError::Query(reason));
1510                }
1511                let Some(entry) = entry else {
1512                    let reason = "not_found";
1513                    self.audit_vault_unseal(
1514                        collection,
1515                        key,
1516                        crate::runtime::audit_log::Outcome::Denied,
1517                        reason,
1518                        None,
1519                    );
1520                    return Err(RedDBError::NotFound(format!(
1521                        "vault secret '{}.{}' not found",
1522                        collection, key
1523                    )));
1524                };
1525                if entry.tombstone {
1526                    let reason = "deleted";
1527                    self.audit_vault_unseal(
1528                        collection,
1529                        key,
1530                        crate::runtime::audit_log::Outcome::Denied,
1531                        reason,
1532                        Some(&entry),
1533                    );
1534                    return Err(RedDBError::NotFound(format!(
1535                        "vault secret '{}.{}' is deleted",
1536                        collection, key
1537                    )));
1538                }
1539                match self.unseal_vault_value(collection, &entry.value) {
1540                    Ok(value) => {
1541                        self.audit_vault_unseal(
1542                            collection,
1543                            key,
1544                            crate::runtime::audit_log::Outcome::Success,
1545                            "ok",
1546                            Some(&entry),
1547                        );
1548                        let mut result = UnifiedResult::with_columns(vec![
1549                            "collection".into(),
1550                            "key".into(),
1551                            "value".into(),
1552                        ]);
1553                        let mut record = UnifiedRecord::new();
1554                        record.set("collection", Value::text(collection.clone()));
1555                        record.set("key", Value::text(key.clone()));
1556                        record.set("value", value);
1557                        result.push(record);
1558                        Ok(RuntimeQueryResult {
1559                            query: raw_query.to_string(),
1560                            mode: crate::storage::query::modes::QueryMode::Sql,
1561                            statement: "vault_unseal",
1562                            engine: "vault",
1563                            result,
1564                            affected_rows: 0,
1565                            statement_type: "select",
1566                        })
1567                    }
1568                    Err(err) => {
1569                        let reason = err.to_string();
1570                        self.audit_vault_unseal(
1571                            collection,
1572                            key,
1573                            crate::runtime::audit_log::Outcome::Error,
1574                            &reason,
1575                            Some(&entry),
1576                        );
1577                        Err(err)
1578                    }
1579                }
1580            }
1581
1582            KvCommand::Incr {
1583                model,
1584                collection,
1585                key,
1586                by,
1587                ttl_ms,
1588            } => {
1589                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1590                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1591
1592                let mut result = UnifiedResult::with_columns(vec![
1593                    "ok".into(),
1594                    "collection".into(),
1595                    "key".into(),
1596                    "value".into(),
1597                ]);
1598                let mut record = UnifiedRecord::new();
1599                record.set("ok", Value::Boolean(true));
1600                record.set("collection", Value::text(collection.clone()));
1601                record.set("key", Value::text(key.clone()));
1602                record.set("value", Value::Integer(new_value));
1603                result.push(record);
1604
1605                Ok(RuntimeQueryResult {
1606                    query: raw_query.to_string(),
1607                    mode: crate::storage::query::modes::QueryMode::Sql,
1608                    statement: "kv_incr",
1609                    engine: "kv",
1610                    result,
1611                    affected_rows: 1,
1612                    statement_type: "update",
1613                })
1614            }
1615
1616            KvCommand::Cas {
1617                model,
1618                collection,
1619                key,
1620                expected,
1621                new_value,
1622                ttl_ms,
1623            } => {
1624                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1625                let (ok, current) = ops.cas(
1626                    *model,
1627                    collection,
1628                    key,
1629                    expected.as_ref(),
1630                    new_value.clone(),
1631                    *ttl_ms,
1632                )?;
1633
1634                let mut result = UnifiedResult::with_columns(vec![
1635                    "ok".into(),
1636                    "collection".into(),
1637                    "key".into(),
1638                    "current".into(),
1639                ]);
1640                let mut record = UnifiedRecord::new();
1641                record.set("ok", Value::Boolean(ok));
1642                record.set("collection", Value::text(collection.clone()));
1643                record.set("key", Value::text(key.clone()));
1644                record.set(
1645                    "current",
1646                    current.unwrap_or(crate::storage::schema::Value::Null),
1647                );
1648                result.push(record);
1649
1650                Ok(RuntimeQueryResult {
1651                    query: raw_query.to_string(),
1652                    mode: crate::storage::query::modes::QueryMode::Sql,
1653                    statement: "kv_cas",
1654                    engine: "kv",
1655                    result,
1656                    affected_rows: if ok { 1 } else { 0 },
1657                    statement_type: "update",
1658                })
1659            }
1660
1661            KvCommand::Delete {
1662                model,
1663                collection,
1664                key,
1665            } => {
1666                if *model == crate::catalog::CollectionModel::Vault {
1667                    self.check_system_vault_capability("vault:write", collection, key)
1668                        .map_err(RedDBError::Query)?;
1669                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1670                    let entry = ops.append_vault_version(
1671                        collection,
1672                        key,
1673                        Value::Null,
1674                        "delete",
1675                        true,
1676                        &[],
1677                    )?;
1678                    self.record_kv_watch_event(
1679                        crate::replication::cdc::ChangeOperation::Delete,
1680                        collection,
1681                        key,
1682                        entry.id.raw(),
1683                        None,
1684                        Some(vault_entry_metadata_json(&entry)),
1685                    );
1686                    self.audit_vault_lifecycle(
1687                        "delete",
1688                        collection,
1689                        key,
1690                        crate::runtime::audit_log::Outcome::Success,
1691                        "ok",
1692                        Some(&entry),
1693                    );
1694                    return Ok(vault_write_result(
1695                        raw_query,
1696                        "vault_delete",
1697                        "delete",
1698                        collection,
1699                        key,
1700                        &entry,
1701                        1,
1702                    ));
1703                }
1704                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1705                let deleted = ops.delete(*model, collection, key)?;
1706
1707                let mut result = UnifiedResult::with_columns(vec![
1708                    "ok".into(),
1709                    "collection".into(),
1710                    "key".into(),
1711                    "deleted".into(),
1712                ]);
1713                let mut record = UnifiedRecord::new();
1714                record.set("ok", Value::Boolean(true));
1715                record.set("collection", Value::text(collection.clone()));
1716                record.set("key", Value::text(key.clone()));
1717                record.set("deleted", Value::Boolean(deleted));
1718                result.push(record);
1719
1720                Ok(RuntimeQueryResult {
1721                    query: raw_query.to_string(),
1722                    mode: crate::storage::query::modes::QueryMode::Sql,
1723                    statement: "kv_delete",
1724                    engine: "kv",
1725                    result,
1726                    affected_rows: if deleted { 1 } else { 0 },
1727                    statement_type: "delete",
1728                })
1729            }
1730        }
1731    }
1732
1733    pub fn vault_watch_events_since(
1734        &self,
1735        collection: &str,
1736        key: &str,
1737        since_lsn: u64,
1738        max_count: usize,
1739    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1740        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1741            .into_iter()
1742            .filter(|event| {
1743                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1744                    .is_ok()
1745            })
1746            .map(vault_filter_watch_event)
1747            .collect()
1748    }
1749
1750    pub fn vault_watch_events_since_prefix(
1751        &self,
1752        collection: &str,
1753        prefix: &str,
1754        since_lsn: u64,
1755        max_count: usize,
1756    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1757        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1758            .into_iter()
1759            .filter(|event| {
1760                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1761                    .is_ok()
1762            })
1763            .map(vault_filter_watch_event)
1764            .collect()
1765    }
1766
1767    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
1768        let auth_store = match self.inner.auth_store.read().clone() {
1769            Some(store) => store,
1770            None => return Ok(()),
1771        };
1772        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1773            Some(identity) => identity,
1774            None => return Ok(()),
1775        };
1776        if role < crate::auth::Role::Write {
1777            return Err(RedDBError::Query(format!(
1778                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
1779            )));
1780        }
1781        if !auth_store.iam_authorization_enabled() {
1782            return Ok(());
1783        }
1784
1785        let tenant = crate::runtime::impl_core::current_tenant();
1786        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1787        let mut resource =
1788            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
1789        if let Some(tenant) = tenant.clone() {
1790            resource = resource.with_tenant(tenant);
1791        }
1792        let ctx = crate::auth::policies::EvalContext {
1793            principal_tenant: tenant.clone(),
1794            current_tenant: tenant,
1795            peer_ip: None,
1796            mfa_present: false,
1797            now_ms: current_unix_ms(),
1798            principal_is_admin_role: role == crate::auth::Role::Admin,
1799        };
1800        if auth_store.check_policy_authz(&principal, "kv:invalidate", &resource, &ctx) {
1801            Ok(())
1802        } else {
1803            Err(RedDBError::Query(format!(
1804                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
1805            )))
1806        }
1807    }
1808}
1809
1810fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
1811    let ttl_ms = ttl_ms?;
1812    Some(Metadata::with_fields(
1813        [(
1814            "_ttl_ms".to_string(),
1815            if ttl_ms <= i64::MAX as u64 {
1816                MetadataValue::Int(ttl_ms as i64)
1817            } else {
1818                MetadataValue::Timestamp(ttl_ms)
1819            },
1820        )]
1821        .into_iter()
1822        .collect(),
1823    ))
1824}
1825
1826fn vault_write_result(
1827    raw_query: &str,
1828    statement: &'static str,
1829    statement_type: &'static str,
1830    collection: &str,
1831    key: &str,
1832    entry: &VaultEntry,
1833    affected_rows: u64,
1834) -> RuntimeQueryResult {
1835    let mut result = UnifiedResult::with_columns(vec![
1836        "ok".into(),
1837        "collection".into(),
1838        "key".into(),
1839        "version".into(),
1840        "fingerprint".into(),
1841        "tombstone".into(),
1842        "op".into(),
1843        "id".into(),
1844    ]);
1845    let mut record = UnifiedRecord::new();
1846    record.set("ok", Value::Boolean(true));
1847    record.set("collection", Value::text(collection.to_string()));
1848    record.set("key", Value::text(key.to_string()));
1849    record.set("version", Value::Integer(entry.version));
1850    if entry.tombstone {
1851        record.set("fingerprint", Value::Null);
1852    } else {
1853        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1854    }
1855    record.set("tombstone", Value::Boolean(entry.tombstone));
1856    record.set("op", Value::text(entry.op.clone()));
1857    record.set("id", Value::Integer(entry.id.raw() as i64));
1858    result.push(record);
1859    RuntimeQueryResult {
1860        query: raw_query.to_string(),
1861        mode: crate::storage::query::modes::QueryMode::Sql,
1862        statement,
1863        engine: "vault",
1864        result,
1865        affected_rows,
1866        statement_type,
1867    }
1868}
1869
1870fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
1871    let mut result = UnifiedResult::with_columns(vec![
1872        "collection".into(),
1873        "key".into(),
1874        "version".into(),
1875        "fingerprint".into(),
1876        "tags".into(),
1877        "created_at".into(),
1878        "updated_at".into(),
1879        "status".into(),
1880        "tombstone".into(),
1881        "op".into(),
1882    ]);
1883    for entry in versions {
1884        push_vault_metadata_record(&mut result, collection, key, entry);
1885    }
1886    result
1887}
1888
1889fn push_vault_metadata_record(
1890    result: &mut UnifiedResult,
1891    collection: &str,
1892    key: &str,
1893    entry: &VaultEntry,
1894) {
1895    let mut record = UnifiedRecord::new();
1896    record.set("collection", Value::text(collection.to_string()));
1897    record.set("key", Value::text(key.to_string()));
1898    record.set("version", Value::Integer(entry.version));
1899    if entry.tombstone {
1900        record.set("fingerprint", Value::Null);
1901        record.set("status", Value::text("deleted"));
1902    } else {
1903        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1904        record.set("status", Value::text("sealed"));
1905    }
1906    record.set("tags", vault_tags_value(&entry.metadata));
1907    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1908    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1909    record.set("tombstone", Value::Boolean(entry.tombstone));
1910    record.set("op", Value::text(entry.op.clone()));
1911    result.push(record);
1912}
1913
1914fn vault_metadata_result(
1915    collection: &str,
1916    key: &str,
1917    entry: Option<&VaultEntry>,
1918    key_available: bool,
1919) -> UnifiedResult {
1920    let mut result = UnifiedResult::with_columns(vec![
1921        "collection".into(),
1922        "key".into(),
1923        "version".into(),
1924        "fingerprint".into(),
1925        "tags".into(),
1926        "created_at".into(),
1927        "updated_at".into(),
1928        "value".into(),
1929        "status".into(),
1930        "tombstone".into(),
1931        "op".into(),
1932    ]);
1933    let mut record = UnifiedRecord::new();
1934    record.set("collection", Value::text(collection.to_string()));
1935    record.set("key", Value::text(key.to_string()));
1936    match entry {
1937        Some(entry) => {
1938            record.set("version", Value::Integer(entry.version));
1939            if entry.tombstone {
1940                record.set("fingerprint", Value::Null);
1941            } else {
1942                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1943            }
1944            record.set("tags", vault_tags_value(&entry.metadata));
1945            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1946            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1947            record.set("value", Value::text("***"));
1948            record.set(
1949                "status",
1950                Value::text(if entry.tombstone {
1951                    "deleted"
1952                } else if key_available {
1953                    "sealed"
1954                } else {
1955                    "sealed_unavailable"
1956                }),
1957            );
1958            record.set("tombstone", Value::Boolean(entry.tombstone));
1959            record.set("op", Value::text(entry.op.clone()));
1960        }
1961        None => {
1962            record.set("version", Value::Null);
1963            record.set("fingerprint", Value::Null);
1964            record.set("tags", Value::Array(Vec::new()));
1965            record.set("created_at", Value::Null);
1966            record.set("updated_at", Value::Null);
1967            record.set("value", Value::text(""));
1968            record.set("status", Value::text("missing"));
1969            record.set("tombstone", Value::Boolean(false));
1970            record.set("op", Value::Null);
1971        }
1972    }
1973    result.push(record);
1974    result
1975}
1976
1977fn vault_fingerprint(value: &Value) -> String {
1978    match value {
1979        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
1980        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
1981    }
1982}
1983
1984fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
1985    let mut object = crate::json::Map::new();
1986    object.insert(
1987        "key".to_string(),
1988        crate::json::Value::String(entry.key.clone()),
1989    );
1990    object.insert(
1991        "version".to_string(),
1992        crate::json::Value::Number(entry.version as f64),
1993    );
1994    object.insert(
1995        "fingerprint".to_string(),
1996        if entry.tombstone {
1997            crate::json::Value::Null
1998        } else {
1999            crate::json::Value::String(vault_fingerprint(&entry.value))
2000        },
2001    );
2002    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2003    object.insert(
2004        "actor".to_string(),
2005        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2006    );
2007    object.insert(
2008        "sequence_id".to_string(),
2009        crate::json::Value::Number(entry.sequence_id as f64),
2010    );
2011    object.insert(
2012        "tombstone".to_string(),
2013        crate::json::Value::Bool(entry.tombstone),
2014    );
2015    object.insert(
2016        "op".to_string(),
2017        crate::json::Value::String(entry.op.clone()),
2018    );
2019    crate::json::Value::Object(object)
2020}
2021
2022fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2023    match vault_tags_value(metadata) {
2024        Value::Array(values) => crate::json::Value::Array(
2025            values
2026                .into_iter()
2027                .filter_map(|value| match value {
2028                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2029                    _ => None,
2030                })
2031                .collect(),
2032        ),
2033        _ => crate::json::Value::Array(Vec::new()),
2034    }
2035}
2036
2037fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2038    [(
2039        "tags".to_string(),
2040        MetadataValue::Array(
2041            tags.iter()
2042                .map(|tag| MetadataValue::String(tag.clone()))
2043                .collect(),
2044        ),
2045    )]
2046    .into_iter()
2047    .collect()
2048}
2049
2050fn vault_filter_watch_event(
2051    mut event: crate::replication::cdc::KvWatchEvent,
2052) -> crate::replication::cdc::KvWatchEvent {
2053    event.before = event.before.and_then(vault_metadata_json_only);
2054    event.after = event.after.and_then(vault_metadata_json_only);
2055    event
2056}
2057
2058fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2059    let object = value.as_object()?;
2060    let mut out = crate::json::Map::new();
2061    for field in [
2062        "key",
2063        "version",
2064        "fingerprint",
2065        "tags",
2066        "actor",
2067        "sequence_id",
2068        "tombstone",
2069        "op",
2070    ] {
2071        if let Some(value) = object.get(field) {
2072            out.insert(field.to_string(), value.clone());
2073        }
2074    }
2075    Some(crate::json::Value::Object(out))
2076}
2077
2078fn vault_tags_value(metadata: &Metadata) -> Value {
2079    match metadata.get("tags") {
2080        Some(MetadataValue::Array(values)) => Value::Array(
2081            values
2082                .iter()
2083                .filter_map(|value| match value {
2084                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2085                    _ => None,
2086                })
2087                .collect(),
2088        ),
2089        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2090            Value::Array(vec![Value::text(tag.clone())])
2091        }
2092        _ => Value::Array(Vec::new()),
2093    }
2094}
2095
2096fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2097    let bytes = hex::decode(hex_key)
2098        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2099    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2100        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2101    })?;
2102    Ok(key)
2103}
2104
2105fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2106    if tags.is_empty() {
2107        return None;
2108    }
2109    let values = tags
2110        .iter()
2111        .map(|tag| MetadataValue::String(tag.clone()))
2112        .collect();
2113    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2114}
2115
2116fn kv_tags_value(tags: &[String]) -> Value {
2117    let json = crate::json::Value::Array(
2118        tags.iter()
2119            .map(|tag| crate::json::Value::String(tag.clone()))
2120            .collect(),
2121    );
2122    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2123}
2124
2125fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2126    if let crate::storage::EntityData::Row(ref row) = entity.data {
2127        if let Some(ref named) = row.named {
2128            return named.get("value").cloned();
2129        }
2130    }
2131    None
2132}
2133
2134fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2135    let now = current_unix_ms();
2136    crate::physical::CollectionContract {
2137        name: name.to_string(),
2138        declared_model: crate::catalog::CollectionModel::Kv,
2139        schema_mode: crate::catalog::SchemaMode::Dynamic,
2140        origin: crate::physical::ContractOrigin::Implicit,
2141        version: 1,
2142        created_at_unix_ms: now,
2143        updated_at_unix_ms: now,
2144        default_ttl_ms: None,
2145        context_index_fields: Vec::new(),
2146        declared_columns: Vec::new(),
2147        table_def: None,
2148        timestamps_enabled: false,
2149        context_index_enabled: false,
2150        append_only: false,
2151        subscriptions: Vec::new(),
2152    }
2153}
2154
2155fn current_unix_ms() -> u128 {
2156    std::time::SystemTime::now()
2157        .duration_since(std::time::UNIX_EPOCH)
2158        .unwrap_or_default()
2159        .as_millis()
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164    use crate::api::RedDBOptions;
2165    use crate::catalog::CollectionModel;
2166    use crate::runtime::RedDBRuntime;
2167
2168    fn rt() -> RedDBRuntime {
2169        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2170    }
2171
2172    #[test]
2173    fn incr_missing_key_initialises_at_by() {
2174        let r = rt();
2175        let ops = super::KvAtomicOps::new(&r);
2176        let v = ops
2177            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2178            .unwrap();
2179        assert_eq!(v, 5);
2180    }
2181
2182    #[test]
2183    fn kv_runtime_stats_count_public_ops() {
2184        let r = rt();
2185        let ops = super::KvAtomicOps::new(&r);
2186
2187        ops.set(
2188            CollectionModel::Kv,
2189            "kv_default",
2190            "profile",
2191            crate::storage::schema::Value::text("alice"),
2192            None,
2193            false,
2194        )
2195        .unwrap();
2196        ops.get(CollectionModel::Kv, "kv_default", "profile")
2197            .unwrap();
2198        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2199            .unwrap();
2200        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2201            .unwrap();
2202        ops.cas(
2203            CollectionModel::Kv,
2204            "kv_default",
2205            "profile",
2206            None,
2207            crate::storage::schema::Value::text("created"),
2208            None,
2209        )
2210        .unwrap();
2211        ops.cas(
2212            CollectionModel::Kv,
2213            "kv_default",
2214            "profile",
2215            Some(&crate::storage::schema::Value::text("different")),
2216            crate::storage::schema::Value::text("ignored"),
2217            None,
2218        )
2219        .unwrap();
2220
2221        let stats = r.stats().kv;
2222        assert_eq!(stats.puts, 1);
2223        assert_eq!(stats.gets, 1);
2224        assert_eq!(stats.deletes, 1);
2225        assert_eq!(stats.incrs, 1);
2226        assert_eq!(stats.cas_success, 1);
2227        assert_eq!(stats.cas_conflict, 1);
2228    }
2229
2230    #[test]
2231    fn kv_invalidate_tags_removes_matching_entries_only() {
2232        let r = rt();
2233
2234        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2235            .unwrap();
2236
2237        let miss = r
2238            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2239            .unwrap();
2240        assert_eq!(miss.affected_rows, 0);
2241        assert!(matches!(
2242            r.execute_query("KV GET sessions.blob")
2243                .unwrap()
2244                .result
2245                .records[0]
2246                .get("value"),
2247            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2248        ));
2249
2250        let hit = r
2251            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2252            .unwrap();
2253        assert_eq!(hit.affected_rows, 1);
2254        assert!(matches!(
2255            r.execute_query("KV GET sessions.blob")
2256                .unwrap()
2257                .result
2258                .records[0]
2259                .get("value"),
2260            Some(crate::storage::schema::Value::Null)
2261        ));
2262    }
2263
2264    #[test]
2265    fn kv_runtime_stats_count_watch_streams_and_events() {
2266        let r = rt();
2267        let ops = super::KvAtomicOps::new(&r);
2268        assert_eq!(r.stats().kv.watch_streams_active, 0);
2269
2270        {
2271            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2272            assert_eq!(r.stats().kv.watch_streams_active, 1);
2273
2274            ops.set(
2275                CollectionModel::Kv,
2276                "kv_default",
2277                "watched",
2278                crate::storage::schema::Value::Integer(1),
2279                None,
2280                false,
2281            )
2282            .unwrap();
2283            let event = stream.poll_next().expect("watch event");
2284            assert_eq!(event.key, "watched");
2285            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2286
2287            stream.record_drop_count(3);
2288            assert_eq!(r.stats().kv.watch_drops, 3);
2289        }
2290
2291        assert_eq!(r.stats().kv.watch_streams_active, 0);
2292    }
2293
2294    #[test]
2295    fn incr_existing_integer_accumulates() {
2296        let r = rt();
2297        let ops = super::KvAtomicOps::new(&r);
2298        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2299            .unwrap();
2300        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2301            .unwrap();
2302        let v = ops
2303            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2304            .unwrap();
2305        assert_eq!(v, 3);
2306    }
2307
2308    #[test]
2309    fn decr_via_negative_by() {
2310        let r = rt();
2311        let ops = super::KvAtomicOps::new(&r);
2312        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2313            .unwrap();
2314        let v = ops
2315            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2316            .unwrap();
2317        assert_eq!(v, 7);
2318    }
2319
2320    #[test]
2321    fn incr_on_string_value_returns_error() {
2322        let r = rt();
2323        let ops = super::KvAtomicOps::new(&r);
2324        ops.set(
2325            CollectionModel::Kv,
2326            "kv_default",
2327            "name",
2328            crate::storage::schema::Value::text("alice"),
2329            None,
2330            false,
2331        )
2332        .unwrap();
2333        let err = ops
2334            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2335            .unwrap_err();
2336        assert!(err.to_string().contains("non-integer"));
2337    }
2338
2339    // --- CAS tests ---
2340
2341    #[test]
2342    fn cas_matching_value_succeeds() {
2343        let r = rt();
2344        let ops = super::KvAtomicOps::new(&r);
2345        ops.set(
2346            CollectionModel::Kv,
2347            "kv_default",
2348            "lock",
2349            crate::storage::schema::Value::text("free"),
2350            None,
2351            false,
2352        )
2353        .unwrap();
2354        let (ok, prev) = ops
2355            .cas(
2356                CollectionModel::Kv,
2357                "kv_default",
2358                "lock",
2359                Some(&crate::storage::schema::Value::text("free")),
2360                crate::storage::schema::Value::text("held"),
2361                None,
2362            )
2363            .unwrap();
2364        assert!(ok);
2365        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2366        // Value actually changed.
2367        assert_eq!(
2368            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2369            Some(crate::storage::schema::Value::text("held"))
2370        );
2371    }
2372
2373    #[test]
2374    fn cas_mismatching_value_fails() {
2375        let r = rt();
2376        let ops = super::KvAtomicOps::new(&r);
2377        ops.set(
2378            CollectionModel::Kv,
2379            "kv_default",
2380            "lock",
2381            crate::storage::schema::Value::text("free"),
2382            None,
2383            false,
2384        )
2385        .unwrap();
2386        let (ok, current) = ops
2387            .cas(
2388                CollectionModel::Kv,
2389                "kv_default",
2390                "lock",
2391                Some(&crate::storage::schema::Value::text("held")),
2392                crate::storage::schema::Value::text("worker-7"),
2393                None,
2394            )
2395            .unwrap();
2396        assert!(!ok);
2397        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2398        // Value unchanged.
2399        assert_eq!(
2400            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2401            Some(crate::storage::schema::Value::text("free"))
2402        );
2403    }
2404
2405    #[test]
2406    fn cas_expect_null_on_missing_key_creates() {
2407        let r = rt();
2408        let ops = super::KvAtomicOps::new(&r);
2409        let (ok, prev) = ops
2410            .cas(
2411                CollectionModel::Kv,
2412                "kv_default",
2413                "new_key",
2414                None,
2415                crate::storage::schema::Value::text("created"),
2416                None,
2417            )
2418            .unwrap();
2419        assert!(ok);
2420        assert_eq!(prev, None);
2421        assert_eq!(
2422            ops.get(CollectionModel::Kv, "kv_default", "new_key")
2423                .unwrap(),
2424            Some(crate::storage::schema::Value::text("created"))
2425        );
2426    }
2427
2428    #[test]
2429    fn cas_expect_null_on_existing_key_fails() {
2430        let r = rt();
2431        let ops = super::KvAtomicOps::new(&r);
2432        ops.set(
2433            CollectionModel::Kv,
2434            "kv_default",
2435            "taken",
2436            crate::storage::schema::Value::text("worker-1"),
2437            None,
2438            false,
2439        )
2440        .unwrap();
2441        let (ok, current) = ops
2442            .cas(
2443                CollectionModel::Kv,
2444                "kv_default",
2445                "taken",
2446                None,
2447                crate::storage::schema::Value::text("worker-2"),
2448                None,
2449            )
2450            .unwrap();
2451        assert!(!ok);
2452        assert_eq!(
2453            current,
2454            Some(crate::storage::schema::Value::text("worker-1"))
2455        );
2456    }
2457
2458    #[test]
2459    fn cas_via_sql_roundtrip() {
2460        let r = rt();
2461        // Seed value.
2462        r.execute_query("KV PUT lock = 'free'").unwrap();
2463        // CAS: free → held — should succeed.
2464        let res = r
2465            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2466            .unwrap();
2467        let row = &res.result.records[0];
2468        assert_eq!(
2469            row.get("ok"),
2470            Some(&crate::storage::schema::Value::Boolean(true))
2471        );
2472        // CAS: free → held again — should fail (value is now 'held').
2473        let res2 = r
2474            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2475            .unwrap();
2476        let row2 = &res2.result.records[0];
2477        assert_eq!(
2478            row2.get("ok"),
2479            Some(&crate::storage::schema::Value::Boolean(false))
2480        );
2481    }
2482
2483    #[test]
2484    fn cas_expect_null_via_sql() {
2485        let r = rt();
2486        let res = r
2487            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2488            .unwrap();
2489        let row = &res.result.records[0];
2490        assert_eq!(
2491            row.get("ok"),
2492            Some(&crate::storage::schema::Value::Boolean(true))
2493        );
2494        // Second call must fail.
2495        let res2 = r
2496            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2497            .unwrap();
2498        let row2 = &res2.result.records[0];
2499        assert_eq!(
2500            row2.get("ok"),
2501            Some(&crate::storage::schema::Value::Boolean(false))
2502        );
2503    }
2504
2505    #[test]
2506    fn incr_via_sql_roundtrip() {
2507        let r = rt();
2508        let res = r.execute_query("KV INCR hits").unwrap();
2509        let row = &res.result.records[0];
2510        assert_eq!(
2511            row.get("value"),
2512            Some(&crate::storage::schema::Value::Integer(1))
2513        );
2514        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2515        let row2 = &res2.result.records[0];
2516        assert_eq!(
2517            row2.get("value"),
2518            Some(&crate::storage::schema::Value::Integer(5))
2519        );
2520    }
2521
2522    #[test]
2523    fn watch_stream_delivers_key_events_in_lsn_order() {
2524        let r = rt();
2525        let ops = super::KvAtomicOps::new(&r);
2526        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
2527
2528        ops.set(
2529            CollectionModel::Kv,
2530            "kv_default",
2531            "seq",
2532            crate::storage::schema::Value::Integer(1),
2533            None,
2534            false,
2535        )
2536        .unwrap();
2537        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
2538            .unwrap();
2539        ops.delete(CollectionModel::Kv, "kv_default", "seq")
2540            .unwrap();
2541        ops.set(
2542            CollectionModel::Kv,
2543            "kv_default",
2544            "seq",
2545            crate::storage::schema::Value::Integer(9),
2546            None,
2547            false,
2548        )
2549        .unwrap();
2550
2551        let mut events = Vec::new();
2552        while let Some(event) = stream.poll_next() {
2553            events.push(event);
2554            if events.len() == 4 {
2555                break;
2556            }
2557        }
2558
2559        assert_eq!(events.len(), 4);
2560        assert_eq!(
2561            events[0].op,
2562            crate::replication::cdc::ChangeOperation::Insert
2563        );
2564        assert_eq!(
2565            events[1].op,
2566            crate::replication::cdc::ChangeOperation::Update
2567        );
2568        assert_eq!(
2569            events[2].op,
2570            crate::replication::cdc::ChangeOperation::Delete
2571        );
2572        assert_eq!(
2573            events[3].op,
2574            crate::replication::cdc::ChangeOperation::Insert
2575        );
2576        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
2577    }
2578
2579    #[test]
2580    fn watch_prefix_stream_delivers_matching_events_only() {
2581        let r = rt();
2582        let ops = super::KvAtomicOps::new(&r);
2583        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
2584
2585        ops.set(
2586            CollectionModel::Kv,
2587            "kv_default",
2588            "acct:1",
2589            crate::storage::schema::Value::Integer(1),
2590            None,
2591            false,
2592        )
2593        .unwrap();
2594        ops.set(
2595            CollectionModel::Kv,
2596            "kv_default",
2597            "session:1",
2598            crate::storage::schema::Value::Integer(2),
2599            None,
2600            false,
2601        )
2602        .unwrap();
2603        ops.set(
2604            CollectionModel::Kv,
2605            "kv_default",
2606            "acct:2",
2607            crate::storage::schema::Value::Integer(3),
2608            None,
2609            false,
2610        )
2611        .unwrap();
2612
2613        let first = stream.poll_next().expect("first prefix event");
2614        let second = stream.poll_next().expect("second prefix event");
2615        assert_eq!(first.key, "acct:1");
2616        assert_eq!(second.key, "acct:2");
2617        assert!(stream.poll_next().is_none());
2618    }
2619
2620    #[test]
2621    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
2622        let r = rt();
2623        let ops = super::KvAtomicOps::new(&r);
2624        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
2625
2626        let mut last_seen_lsn = 0;
2627        for value in 0..5 {
2628            ops.set(
2629                CollectionModel::Kv,
2630                "kv_default",
2631                "resume",
2632                crate::storage::schema::Value::Integer(value),
2633                None,
2634                false,
2635            )
2636            .unwrap();
2637            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
2638        }
2639        drop(stream);
2640
2641        for value in 5..55 {
2642            ops.set(
2643                CollectionModel::Kv,
2644                "kv_default",
2645                "resume",
2646                crate::storage::schema::Value::Integer(value),
2647                None,
2648                false,
2649            )
2650            .unwrap();
2651        }
2652
2653        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
2654        let mut lsns = Vec::new();
2655        while let Some(event) = resumed.poll_next() {
2656            lsns.push(event.lsn);
2657            if lsns.len() == 50 {
2658                break;
2659            }
2660        }
2661
2662        assert_eq!(lsns.len(), 50);
2663        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
2664        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
2665        assert!(resumed.poll_next().is_none());
2666    }
2667
2668    #[test]
2669    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
2670        let r = rt();
2671        let ops = super::KvAtomicOps::new(&r);
2672        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
2673
2674        for value in 0..10_000 {
2675            ops.set(
2676                CollectionModel::Kv,
2677                "kv_default",
2678                "slow",
2679                crate::storage::schema::Value::Integer(value),
2680                None,
2681                false,
2682            )
2683            .unwrap();
2684        }
2685
2686        let event = stream.poll_next().expect("tail event after drops");
2687        assert!(event.lsn > 1);
2688        assert!(event.dropped_event_count > 0);
2689        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
2690        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
2691    }
2692
2693    #[test]
2694    fn watch_stream_idle_timeout_closes_subscription() {
2695        let r = rt();
2696        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
2697            .unwrap();
2698
2699        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
2700        assert_eq!(r.stats().kv.watch_streams_active, 1);
2701        std::thread::sleep(std::time::Duration::from_millis(5));
2702
2703        assert!(stream.poll_next().is_none());
2704        assert_eq!(r.stats().kv.watch_streams_active, 0);
2705    }
2706
2707    #[test]
2708    fn watch_stream_does_not_emit_rolled_back_put() {
2709        let r = rt();
2710        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
2711
2712        r.execute_query("BEGIN").unwrap();
2713        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
2714        r.execute_query("ROLLBACK").unwrap();
2715
2716        assert!(stream.poll_next().is_none());
2717    }
2718}