Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604        self.vault_versions(collection, key)
605            .map(super::keyed_spine::latest_version)
606    }
607
608    fn get_vault_entry_version(
609        &self,
610        collection: &str,
611        key: &str,
612        version: i64,
613    ) -> RedDBResult<Option<VaultEntry>> {
614        Ok(self
615            .vault_versions(collection, key)?
616            .into_iter()
617            .find(|entry| entry.version == version))
618    }
619
620    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622        let store = self.runtime.inner.db.store();
623        let Some(manager) = store.get_collection(collection) else {
624            return Ok(Vec::new());
625        };
626        let entities = manager.query_all(|_| true);
627        let mut versions = Vec::new();
628        for entity in entities {
629            let crate::storage::EntityData::Row(ref row) = entity.data else {
630                continue;
631            };
632            let Some(version) =
633                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634            else {
635                continue;
636            };
637            if version.key != key {
638                continue;
639            }
640            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641            versions.push(VaultEntry::from_keyed_row(
642                version,
643                metadata,
644                entity.created_at,
645                entity.updated_at,
646                entity.sequence_id,
647            ));
648        }
649        Ok(versions)
650    }
651
652    fn latest_vault_entries(
653        &self,
654        collection: &str,
655        prefix: Option<&str>,
656    ) -> RedDBResult<Vec<VaultEntry>> {
657        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658        let store = self.runtime.inner.db.store();
659        let Some(manager) = store.get_collection(collection) else {
660            return Ok(Vec::new());
661        };
662        let mut versions = Vec::new();
663        for entity in manager.query_all(|_| true) {
664            let crate::storage::EntityData::Row(ref row) = entity.data else {
665                continue;
666            };
667            let Some(version) =
668                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669            else {
670                continue;
671            };
672            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673            let entry = VaultEntry::from_keyed_row(
674                version,
675                metadata,
676                entity.created_at,
677                entity.updated_at,
678                entity.sequence_id,
679            );
680            versions.push(entry);
681        }
682        Ok(super::keyed_spine::latest_versions(versions, prefix))
683    }
684
685    fn append_vault_version(
686        &self,
687        collection: &str,
688        key: &str,
689        value: crate::storage::schema::Value,
690        op: &str,
691        tombstone: bool,
692        tags: &[String],
693    ) -> RedDBResult<VaultEntry> {
694        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695        let version = self
696            .get_vault_entry(collection, key)?
697            .map(|entry| entry.version)
698            .unwrap_or(0)
699            + 1;
700        let stored_value = if tombstone {
701            crate::storage::schema::Value::Null
702        } else {
703            self.runtime.seal_vault_value(collection, value)?
704        };
705        let now = current_unix_ms() as i64;
706        let fields = vec![
707            (
708                "key".to_string(),
709                crate::storage::schema::Value::text(key.to_string()),
710            ),
711            ("value".to_string(), stored_value),
712            (
713                "version".to_string(),
714                crate::storage::schema::Value::Integer(version),
715            ),
716            (
717                "tombstone".to_string(),
718                crate::storage::schema::Value::Boolean(tombstone),
719            ),
720            (
721                "op".to_string(),
722                crate::storage::schema::Value::text(op.to_string()),
723            ),
724            (
725                "created_at_ms".to_string(),
726                crate::storage::schema::Value::Integer(now),
727            ),
728        ];
729        let mut row = crate::storage::RowData::new(Vec::new());
730        row.named = Some(fields.into_iter().collect());
731        let entity = crate::storage::UnifiedEntity::new(
732            crate::storage::EntityId::new(0),
733            crate::storage::EntityKind::TableRow {
734                table: std::sync::Arc::from(collection),
735                row_id: 0,
736            },
737            crate::storage::EntityData::Row(row),
738        );
739        let id = self
740            .runtime
741            .inner
742            .db
743            .store()
744            .insert(collection, entity)
745            .map_err(|err| RedDBError::Internal(err.to_string()))?;
746        if !tags.is_empty() {
747            self.runtime
748                .inner
749                .db
750                .store()
751                .set_metadata(
752                    collection,
753                    id,
754                    Metadata::with_fields(vault_tags_metadata(tags)),
755                )
756                .map_err(|err| RedDBError::Internal(err.to_string()))?;
757            self.runtime
758                .inner
759                .kv_tag_index
760                .replace(collection, key, id, tags);
761        }
762        self.get_vault_entry_version(collection, key, version)?
763            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764    }
765
766    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768        let versions = self.vault_versions(collection, key)?;
769        let store = self.runtime.inner.db.store();
770        let mut purged = 0usize;
771        for entry in versions {
772            if store
773                .delete(collection, entry.id)
774                .map_err(|err| RedDBError::Internal(err.to_string()))?
775            {
776                store.context_index().remove_entity(entry.id);
777                purged += 1;
778            }
779        }
780        Ok(purged)
781    }
782
783    fn ensure_declared_model(
784        &self,
785        model: crate::catalog::CollectionModel,
786        collection: &str,
787    ) -> RedDBResult<()> {
788        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789            return Ok(());
790        };
791        if contract.declared_model == model
792            || contract.declared_model == crate::catalog::CollectionModel::Mixed
793        {
794            return Ok(());
795        }
796        Err(RedDBError::InvalidOperation(format!(
797            "collection '{}' is declared as '{}' and does not allow '{}' operations",
798            collection,
799            keyed_model_name(contract.declared_model),
800            keyed_model_name(model)
801        )))
802    }
803}
804
805impl RedDBRuntime {
806    pub(crate) fn seal_vault_value(
807        &self,
808        collection: &str,
809        value: crate::storage::schema::Value,
810    ) -> RedDBResult<crate::storage::schema::Value> {
811        let key = self.vault_encryption_key(collection)?;
812        let plaintext = value.to_bytes();
813        let nonce_bytes = crate::auth::store::random_bytes(12);
814        let mut nonce = [0u8; 12];
815        nonce.copy_from_slice(&nonce_bytes[..12]);
816        let aad = format!("reddb.vault.{collection}");
817        let ciphertext =
818            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819        let mut payload = Vec::with_capacity(12 + ciphertext.len());
820        payload.extend_from_slice(&nonce);
821        payload.extend_from_slice(&ciphertext);
822        Ok(crate::storage::schema::Value::Secret(payload))
823    }
824
825    fn vault_key_available(&self, collection: &str) -> bool {
826        self.vault_encryption_key(collection).is_ok()
827    }
828
829    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832        })?;
833        if !auth_store.is_vault_backed() {
834            return Err(RedDBError::Query(
835                "vault sealed_unavailable: key provider is sealed".to_string(),
836            ));
837        }
838
839        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840            return decode_vault_key(&hex_key);
841        }
842        auth_store.vault_secret_key().ok_or_else(|| {
843            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844        })
845    }
846
847    fn unseal_vault_value(
848        &self,
849        collection: &str,
850        sealed: &crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let crate::storage::schema::Value::Secret(payload) = sealed else {
853            return Err(RedDBError::Query(
854                "vault unseal failed: stored value is not sealed".to_string(),
855            ));
856        };
857        if payload.len() < 12 {
858            return Err(RedDBError::Query(
859                "vault unseal failed: sealed payload is truncated".to_string(),
860            ));
861        }
862        let key = self.vault_encryption_key(collection)?;
863        let mut nonce = [0u8; 12];
864        nonce.copy_from_slice(&payload[..12]);
865        let aad = format!("reddb.vault.{collection}");
866        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867            &key,
868            &nonce,
869            aad.as_bytes(),
870            &payload[12..],
871        )
872        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873        let (value, consumed) =
874            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876            })?;
877        if consumed != plaintext.len() {
878            return Err(RedDBError::Query(
879                "vault unseal failed: trailing plaintext bytes".to_string(),
880            ));
881        }
882        Ok(value)
883    }
884
885    fn vault_target_resource(collection: &str, key: &str) -> String {
886        if collection == "red.vault" {
887            return format!("red.vault/{}", key.to_ascii_lowercase());
888        }
889        format!("{collection}.{key}")
890    }
891
892    fn current_vault_actor() -> String {
893        current_auth_identity()
894            .map(|(principal, _)| principal)
895            .unwrap_or_else(|| "anonymous".to_string())
896    }
897
898    fn vault_request_id() -> String {
899        let conn_id = current_connection_id();
900        if conn_id == 0 {
901            "embedded".to_string()
902        } else {
903            format!("conn-{conn_id}")
904        }
905    }
906
907    fn check_vault_capability(
908        &self,
909        action: &str,
910        collection: &str,
911        key: &str,
912    ) -> Result<(), String> {
913        let Some(auth_store) = self.inner.auth_store.read().clone() else {
914            return Ok(());
915        };
916        if !auth_store.iam_authorization_enabled() {
917            return Ok(());
918        }
919        let Some((principal, role)) = current_auth_identity() else {
920            return Err(
921                "IAM authorization is enabled; vault capability check requires an authenticated principal"
922                    .to_string(),
923            );
924        };
925        let tenant = current_tenant();
926        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
927        let mut resource = crate::auth::policies::ResourceRef::new(
928            "vault",
929            Self::vault_target_resource(collection, key),
930        );
931        if let Some(ref tenant) = tenant {
932            resource = resource.with_tenant(tenant.clone());
933        }
934        let ctx = crate::auth::policies::EvalContext {
935            principal_tenant: tenant.clone(),
936            current_tenant: tenant,
937            peer_ip: None,
938            mfa_present: false,
939            now_ms: crate::utils::now_unix_millis() as u128,
940            principal_is_admin_role: role == crate::auth::Role::Admin,
941        };
942        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
943            Ok(())
944        } else {
945            Err(format!(
946                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
947                principal,
948                action,
949                Self::vault_target_resource(collection, key)
950            ))
951        }
952    }
953
954    fn check_system_vault_capability(
955        &self,
956        action: &str,
957        collection: &str,
958        key: &str,
959    ) -> Result<(), String> {
960        if collection != "red.vault" {
961            return Ok(());
962        }
963        self.check_vault_capability(action, collection, key)
964    }
965
966    fn audit_vault_unseal(
967        &self,
968        collection: &str,
969        key: &str,
970        outcome: crate::runtime::audit_log::Outcome,
971        reason: &str,
972        entry: Option<&VaultEntry>,
973    ) {
974        let actor = Self::current_vault_actor();
975        let request_id = Self::vault_request_id();
976        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
977            .principal(actor.clone())
978            .source(crate::runtime::audit_log::AuditAuthSource::Password)
979            .resource(format!(
980                "vault:{}",
981                Self::vault_target_resource(collection, key)
982            ))
983            .outcome(outcome)
984            .correlation_id(request_id.clone())
985            .fields([
986                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
987                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
988                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
989                crate::runtime::audit_log::AuditFieldEscaper::field(
990                    "target",
991                    Self::vault_target_resource(collection, key),
992                ),
993                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
994                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
995                crate::runtime::audit_log::AuditFieldEscaper::field(
996                    "connection_id",
997                    current_connection_id(),
998                ),
999            ]);
1000        if let Some(tenant) = current_tenant() {
1001            builder = builder.tenant(tenant);
1002        }
1003        if let Some(entry) = entry {
1004            builder = builder.fields([
1005                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1006                crate::runtime::audit_log::AuditFieldEscaper::field(
1007                    "sequence_id",
1008                    entry.sequence_id,
1009                ),
1010            ]);
1011        }
1012        self.audit_log().record_event(builder.build());
1013    }
1014
1015    fn audit_vault_lifecycle(
1016        &self,
1017        operation: &str,
1018        collection: &str,
1019        key: &str,
1020        outcome: crate::runtime::audit_log::Outcome,
1021        reason: &str,
1022        entry: Option<&VaultEntry>,
1023    ) {
1024        let actor = Self::current_vault_actor();
1025        let request_id = Self::vault_request_id();
1026        let mut builder =
1027            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1028                .principal(actor.clone())
1029                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1030                .resource(format!(
1031                    "vault:{}",
1032                    Self::vault_target_resource(collection, key)
1033                ))
1034                .outcome(outcome)
1035                .correlation_id(request_id.clone())
1036                .fields([
1037                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1038                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1039                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1040                    crate::runtime::audit_log::AuditFieldEscaper::field(
1041                        "target",
1042                        Self::vault_target_resource(collection, key),
1043                    ),
1044                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1045                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1046                    crate::runtime::audit_log::AuditFieldEscaper::field(
1047                        "connection_id",
1048                        current_connection_id(),
1049                    ),
1050                ]);
1051        if let Some(tenant) = current_tenant() {
1052            builder = builder.tenant(tenant);
1053        }
1054        if let Some(entry) = entry {
1055            builder = builder.fields([
1056                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1057                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1058                crate::runtime::audit_log::AuditFieldEscaper::field(
1059                    "sequence_id",
1060                    entry.sequence_id,
1061                ),
1062            ]);
1063        }
1064        self.audit_log().record_event(builder.build());
1065    }
1066
1067    pub(crate) fn resolve_vault_secret_value(
1068        &self,
1069        collection: &str,
1070        key: &str,
1071    ) -> RedDBResult<Value> {
1072        let ops = KvAtomicOps::new(self);
1073        let entry = ops.get_vault_entry(collection, key)?;
1074        if let Err(reason) = self.check_vault_capability("vault:unseal", collection, key) {
1075            self.audit_vault_unseal(
1076                collection,
1077                key,
1078                crate::runtime::audit_log::Outcome::Denied,
1079                &reason,
1080                entry.as_ref(),
1081            );
1082            return Err(RedDBError::Query(reason));
1083        }
1084        let Some(entry) = entry else {
1085            let reason = "not_found";
1086            self.audit_vault_unseal(
1087                collection,
1088                key,
1089                crate::runtime::audit_log::Outcome::Denied,
1090                reason,
1091                None,
1092            );
1093            return Err(RedDBError::NotFound(format!(
1094                "vault secret '{}.{}' not found",
1095                collection, key
1096            )));
1097        };
1098        if entry.tombstone {
1099            let reason = "deleted";
1100            self.audit_vault_unseal(
1101                collection,
1102                key,
1103                crate::runtime::audit_log::Outcome::Denied,
1104                reason,
1105                Some(&entry),
1106            );
1107            return Err(RedDBError::NotFound(format!(
1108                "vault secret '{}.{}' is deleted",
1109                collection, key
1110            )));
1111        }
1112        match self.unseal_vault_value(collection, &entry.value) {
1113            Ok(value) => {
1114                self.audit_vault_unseal(
1115                    collection,
1116                    key,
1117                    crate::runtime::audit_log::Outcome::Success,
1118                    "ok",
1119                    Some(&entry),
1120                );
1121                Ok(value)
1122            }
1123            Err(err) => {
1124                let reason = err.to_string();
1125                self.audit_vault_unseal(
1126                    collection,
1127                    key,
1128                    crate::runtime::audit_log::Outcome::Error,
1129                    &reason,
1130                    Some(&entry),
1131                );
1132                Err(err)
1133            }
1134        }
1135    }
1136
1137    /// Dispatch a `KV PUT / GET / DELETE` command.
1138    pub fn execute_kv_command(
1139        &self,
1140        raw_query: &str,
1141        cmd: &crate::storage::query::ast::KvCommand,
1142    ) -> RedDBResult<RuntimeQueryResult> {
1143        use crate::storage::query::ast::KvCommand;
1144
1145        let ops = KvAtomicOps::new(self);
1146
1147        match cmd {
1148            KvCommand::Put {
1149                model,
1150                collection,
1151                key,
1152                value,
1153                ttl_ms,
1154                tags,
1155                if_not_exists,
1156            } => {
1157                if *model == crate::catalog::CollectionModel::Vault {
1158                    self.check_system_vault_capability("vault:write", collection, key)
1159                        .map_err(RedDBError::Query)?;
1160                }
1161                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1162                let (created, id) = ops.set_with_tags_for_model(
1163                    *model,
1164                    collection,
1165                    key,
1166                    value.clone(),
1167                    *ttl_ms,
1168                    tags,
1169                    *if_not_exists,
1170                )?;
1171
1172                let mut result = UnifiedResult::with_columns(vec![
1173                    "ok".into(),
1174                    "collection".into(),
1175                    "key".into(),
1176                    "id".into(),
1177                    "created".into(),
1178                    "tags".into(),
1179                ]);
1180                let mut record = UnifiedRecord::new();
1181                record.set("ok", Value::Boolean(true));
1182                record.set("collection", Value::text(collection.clone()));
1183                record.set("key", Value::text(key.clone()));
1184                record.set("id", Value::Integer(id.raw() as i64));
1185                record.set("created", Value::Boolean(created));
1186                record.set("tags", kv_tags_value(tags));
1187                result.push(record);
1188
1189                Ok(RuntimeQueryResult {
1190                    query: raw_query.to_string(),
1191                    mode: crate::storage::query::modes::QueryMode::Sql,
1192                    statement: if *model == crate::catalog::CollectionModel::Vault {
1193                        "vault_put"
1194                    } else {
1195                        "kv_put"
1196                    },
1197                    engine: if *model == crate::catalog::CollectionModel::Vault {
1198                        "vault"
1199                    } else {
1200                        "kv"
1201                    },
1202                    result,
1203                    affected_rows: 1,
1204                    statement_type: if created { "insert" } else { "update" },
1205                })
1206            }
1207            KvCommand::InvalidateTags { collection, tags } => {
1208                let invalidated = ops.invalidate_tags(collection, tags)?;
1209
1210                let mut result = UnifiedResult::with_columns(vec![
1211                    "ok".into(),
1212                    "collection".into(),
1213                    "invalidated".into(),
1214                    "tags".into(),
1215                ]);
1216                let mut record = UnifiedRecord::new();
1217                record.set("ok", Value::Boolean(true));
1218                record.set("collection", Value::text(collection.clone()));
1219                record.set("invalidated", Value::Integer(invalidated as i64));
1220                record.set("tags", kv_tags_value(tags));
1221                result.push(record);
1222
1223                Ok(RuntimeQueryResult {
1224                    query: raw_query.to_string(),
1225                    mode: crate::storage::query::modes::QueryMode::Sql,
1226                    statement: "kv_invalidate_tags",
1227                    engine: "kv",
1228                    result,
1229                    affected_rows: invalidated as u64,
1230                    statement_type: "delete",
1231                })
1232            }
1233
1234            KvCommand::Rotate {
1235                collection,
1236                key,
1237                value,
1238                tags,
1239            } => {
1240                self.check_system_vault_capability("vault:write", collection, key)
1241                    .map_err(RedDBError::Query)?;
1242                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1243                let entry = ops.append_vault_version(
1244                    collection,
1245                    key,
1246                    value.clone(),
1247                    "rotate",
1248                    false,
1249                    tags,
1250                )?;
1251                self.record_kv_watch_event(
1252                    crate::replication::cdc::ChangeOperation::Update,
1253                    collection,
1254                    key,
1255                    entry.id.raw(),
1256                    None,
1257                    Some(vault_entry_metadata_json(&entry)),
1258                );
1259                self.audit_vault_lifecycle(
1260                    "rotate",
1261                    collection,
1262                    key,
1263                    crate::runtime::audit_log::Outcome::Success,
1264                    "ok",
1265                    Some(&entry),
1266                );
1267                Ok(vault_write_result(
1268                    raw_query,
1269                    "vault_rotate",
1270                    "update",
1271                    collection,
1272                    key,
1273                    &entry,
1274                    1,
1275                ))
1276            }
1277
1278            KvCommand::List {
1279                model,
1280                collection,
1281                prefix,
1282                limit,
1283                offset,
1284            } => {
1285                if *model != crate::catalog::CollectionModel::Vault {
1286                    return Err(RedDBError::InvalidOperation(
1287                        "LIST is not supported through normal KV command execution".to_string(),
1288                    ));
1289                }
1290                let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1291                entries.sort_by(|left, right| left.key.cmp(&right.key));
1292                let mut result = UnifiedResult::with_columns(vec![
1293                    "collection".into(),
1294                    "key".into(),
1295                    "version".into(),
1296                    "fingerprint".into(),
1297                    "tags".into(),
1298                    "created_at".into(),
1299                    "updated_at".into(),
1300                    "status".into(),
1301                    "tombstone".into(),
1302                    "op".into(),
1303                ]);
1304                for entry in entries
1305                    .into_iter()
1306                    .filter(|entry| {
1307                        self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1308                            .is_ok()
1309                    })
1310                    .skip(*offset)
1311                    .take(limit.unwrap_or(usize::MAX))
1312                {
1313                    push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1314                }
1315                Ok(RuntimeQueryResult {
1316                    query: raw_query.to_string(),
1317                    mode: crate::storage::query::modes::QueryMode::Sql,
1318                    statement: "vault_list",
1319                    engine: "vault",
1320                    result,
1321                    affected_rows: 0,
1322                    statement_type: "select",
1323                })
1324            }
1325
1326            KvCommand::History { collection, key } => {
1327                self.check_vault_capability("vault:read_metadata", collection, key)
1328                    .map_err(RedDBError::Query)?;
1329                let versions =
1330                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1331                let result = vault_history_result(collection, key, &versions);
1332                Ok(RuntimeQueryResult {
1333                    query: raw_query.to_string(),
1334                    mode: crate::storage::query::modes::QueryMode::Sql,
1335                    statement: "vault_history",
1336                    engine: "vault",
1337                    result,
1338                    affected_rows: 0,
1339                    statement_type: "select",
1340                })
1341            }
1342
1343            KvCommand::Purge { collection, key } => {
1344                let entry = ops.get_vault_entry(collection, key)?;
1345                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1346                    self.audit_vault_lifecycle(
1347                        "purge",
1348                        collection,
1349                        key,
1350                        crate::runtime::audit_log::Outcome::Denied,
1351                        &reason,
1352                        entry.as_ref(),
1353                    );
1354                    return Err(RedDBError::Query(reason));
1355                }
1356                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1357                let purged = ops.purge_vault_versions(collection, key)?;
1358                self.audit_vault_lifecycle(
1359                    "purge",
1360                    collection,
1361                    key,
1362                    crate::runtime::audit_log::Outcome::Success,
1363                    "ok",
1364                    entry.as_ref(),
1365                );
1366                let mut result = UnifiedResult::with_columns(vec![
1367                    "ok".into(),
1368                    "collection".into(),
1369                    "key".into(),
1370                    "purged".into(),
1371                ]);
1372                let mut record = UnifiedRecord::new();
1373                record.set("ok", Value::Boolean(true));
1374                record.set("collection", Value::text(collection.clone()));
1375                record.set("key", Value::text(key.clone()));
1376                record.set("purged", Value::Integer(purged as i64));
1377                result.push(record);
1378                Ok(RuntimeQueryResult {
1379                    query: raw_query.to_string(),
1380                    mode: crate::storage::query::modes::QueryMode::Sql,
1381                    statement: "vault_purge",
1382                    engine: "vault",
1383                    result,
1384                    affected_rows: purged as u64,
1385                    statement_type: "delete",
1386                })
1387            }
1388
1389            KvCommand::Get {
1390                model,
1391                collection,
1392                key,
1393            } => {
1394                if *model == crate::catalog::CollectionModel::Vault {
1395                    self.check_vault_capability("vault:read_metadata", collection, key)
1396                        .map_err(RedDBError::Query)?;
1397                    let entry = ops.get_vault_entry(collection, key)?;
1398                    let key_available = self.vault_key_available(collection);
1399                    let result =
1400                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1401                    return Ok(RuntimeQueryResult {
1402                        query: raw_query.to_string(),
1403                        mode: crate::storage::query::modes::QueryMode::Sql,
1404                        statement: "vault_get",
1405                        engine: "vault",
1406                        result,
1407                        affected_rows: 0,
1408                        statement_type: "select",
1409                    });
1410                }
1411
1412                let entity = ops.get_entity(*model, collection, key)?;
1413                let value = entity.as_ref().and_then(kv_value_from_entity);
1414                if *model == crate::catalog::CollectionModel::Kv {
1415                    self.inner.kv_stats.incr_gets();
1416                }
1417                let mut result = UnifiedResult::with_columns(vec![
1418                    "rid".into(),
1419                    "collection".into(),
1420                    "kind".into(),
1421                    "tenant".into(),
1422                    "created_at".into(),
1423                    "updated_at".into(),
1424                    "key".into(),
1425                    "value".into(),
1426                    "tags".into(),
1427                ]);
1428                let mut record = UnifiedRecord::new();
1429                if let Some(entity) = entity.as_ref() {
1430                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1431                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1432                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1433                } else {
1434                    record.set("rid", Value::Null);
1435                    record.set("created_at", Value::Null);
1436                    record.set("updated_at", Value::Null);
1437                }
1438                record.set("collection", Value::text(collection.clone()));
1439                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1440                record.set("tenant", Value::Null);
1441                record.set("key", Value::text(key.clone()));
1442                record.set(
1443                    "value",
1444                    value.unwrap_or(crate::storage::schema::Value::Null),
1445                );
1446                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1447                result.push(record);
1448
1449                Ok(RuntimeQueryResult {
1450                    query: raw_query.to_string(),
1451                    mode: crate::storage::query::modes::QueryMode::Sql,
1452                    statement: "kv_get",
1453                    engine: "kv",
1454                    result,
1455                    affected_rows: 0,
1456                    statement_type: "select",
1457                })
1458            }
1459            KvCommand::Watch {
1460                model,
1461                collection,
1462                key,
1463                prefix,
1464                from_lsn,
1465            } => {
1466                let watch_key = if *prefix {
1467                    format!("{key}.*")
1468                } else {
1469                    key.clone()
1470                };
1471                let endpoint = match from_lsn {
1472                    Some(lsn) => format!(
1473                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1474                        keyed_model_name(*model)
1475                    ),
1476                    None => format!(
1477                        "/collections/{collection}/{}/{watch_key}/watch",
1478                        keyed_model_name(*model)
1479                    ),
1480                };
1481                let mut result = UnifiedResult::with_columns(vec![
1482                    "collection".into(),
1483                    "key".into(),
1484                    "prefix".into(),
1485                    "from_lsn".into(),
1486                    "watch_url".into(),
1487                    "streaming".into(),
1488                ]);
1489                let mut record = UnifiedRecord::new();
1490                record.set("collection", Value::text(collection.clone()));
1491                record.set("key", Value::text(watch_key));
1492                record.set("prefix", Value::Boolean(*prefix));
1493                record.set(
1494                    "from_lsn",
1495                    from_lsn
1496                        .map(Value::UnsignedInteger)
1497                        .unwrap_or(crate::storage::schema::Value::Null),
1498                );
1499                record.set("watch_url", Value::text(endpoint));
1500                record.set("streaming", Value::Boolean(true));
1501                result.push(record);
1502
1503                Ok(RuntimeQueryResult {
1504                    query: raw_query.to_string(),
1505                    mode: crate::storage::query::modes::QueryMode::Sql,
1506                    statement: "kv_watch",
1507                    engine: keyed_model_name(*model),
1508                    result,
1509                    affected_rows: 0,
1510                    statement_type: "stream",
1511                })
1512            }
1513
1514            KvCommand::Unseal {
1515                collection,
1516                key,
1517                version,
1518            } => {
1519                let latest = ops.get_vault_entry(collection, key)?;
1520                let entry = match version {
1521                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1522                    None => latest.clone(),
1523                };
1524                let action = match (version, latest.as_ref()) {
1525                    (Some(requested), Some(latest)) if *requested == latest.version => {
1526                        "vault:unseal"
1527                    }
1528                    (Some(_), _) => "vault:unseal_history",
1529                    _ => "vault:unseal",
1530                };
1531                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1532                    self.audit_vault_unseal(
1533                        collection,
1534                        key,
1535                        crate::runtime::audit_log::Outcome::Denied,
1536                        &reason,
1537                        entry.as_ref(),
1538                    );
1539                    return Err(RedDBError::Query(reason));
1540                }
1541                let Some(entry) = entry else {
1542                    let reason = "not_found";
1543                    self.audit_vault_unseal(
1544                        collection,
1545                        key,
1546                        crate::runtime::audit_log::Outcome::Denied,
1547                        reason,
1548                        None,
1549                    );
1550                    return Err(RedDBError::NotFound(format!(
1551                        "vault secret '{}.{}' not found",
1552                        collection, key
1553                    )));
1554                };
1555                if entry.tombstone {
1556                    let reason = "deleted";
1557                    self.audit_vault_unseal(
1558                        collection,
1559                        key,
1560                        crate::runtime::audit_log::Outcome::Denied,
1561                        reason,
1562                        Some(&entry),
1563                    );
1564                    return Err(RedDBError::NotFound(format!(
1565                        "vault secret '{}.{}' is deleted",
1566                        collection, key
1567                    )));
1568                }
1569                match self.unseal_vault_value(collection, &entry.value) {
1570                    Ok(value) => {
1571                        self.audit_vault_unseal(
1572                            collection,
1573                            key,
1574                            crate::runtime::audit_log::Outcome::Success,
1575                            "ok",
1576                            Some(&entry),
1577                        );
1578                        let mut result = UnifiedResult::with_columns(vec![
1579                            "collection".into(),
1580                            "key".into(),
1581                            "value".into(),
1582                        ]);
1583                        let mut record = UnifiedRecord::new();
1584                        record.set("collection", Value::text(collection.clone()));
1585                        record.set("key", Value::text(key.clone()));
1586                        record.set("value", value);
1587                        result.push(record);
1588                        Ok(RuntimeQueryResult {
1589                            query: raw_query.to_string(),
1590                            mode: crate::storage::query::modes::QueryMode::Sql,
1591                            statement: "vault_unseal",
1592                            engine: "vault",
1593                            result,
1594                            affected_rows: 0,
1595                            statement_type: "select",
1596                        })
1597                    }
1598                    Err(err) => {
1599                        let reason = err.to_string();
1600                        self.audit_vault_unseal(
1601                            collection,
1602                            key,
1603                            crate::runtime::audit_log::Outcome::Error,
1604                            &reason,
1605                            Some(&entry),
1606                        );
1607                        Err(err)
1608                    }
1609                }
1610            }
1611
1612            KvCommand::Incr {
1613                model,
1614                collection,
1615                key,
1616                by,
1617                ttl_ms,
1618            } => {
1619                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1620                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1621
1622                let mut result = UnifiedResult::with_columns(vec![
1623                    "ok".into(),
1624                    "collection".into(),
1625                    "key".into(),
1626                    "value".into(),
1627                ]);
1628                let mut record = UnifiedRecord::new();
1629                record.set("ok", Value::Boolean(true));
1630                record.set("collection", Value::text(collection.clone()));
1631                record.set("key", Value::text(key.clone()));
1632                record.set("value", Value::Integer(new_value));
1633                result.push(record);
1634
1635                Ok(RuntimeQueryResult {
1636                    query: raw_query.to_string(),
1637                    mode: crate::storage::query::modes::QueryMode::Sql,
1638                    statement: "kv_incr",
1639                    engine: "kv",
1640                    result,
1641                    affected_rows: 1,
1642                    statement_type: "update",
1643                })
1644            }
1645
1646            KvCommand::Cas {
1647                model,
1648                collection,
1649                key,
1650                expected,
1651                new_value,
1652                ttl_ms,
1653            } => {
1654                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1655                let (ok, current) = ops.cas(
1656                    *model,
1657                    collection,
1658                    key,
1659                    expected.as_ref(),
1660                    new_value.clone(),
1661                    *ttl_ms,
1662                )?;
1663
1664                let mut result = UnifiedResult::with_columns(vec![
1665                    "ok".into(),
1666                    "collection".into(),
1667                    "key".into(),
1668                    "current".into(),
1669                ]);
1670                let mut record = UnifiedRecord::new();
1671                record.set("ok", Value::Boolean(ok));
1672                record.set("collection", Value::text(collection.clone()));
1673                record.set("key", Value::text(key.clone()));
1674                record.set(
1675                    "current",
1676                    current.unwrap_or(crate::storage::schema::Value::Null),
1677                );
1678                result.push(record);
1679
1680                Ok(RuntimeQueryResult {
1681                    query: raw_query.to_string(),
1682                    mode: crate::storage::query::modes::QueryMode::Sql,
1683                    statement: "kv_cas",
1684                    engine: "kv",
1685                    result,
1686                    affected_rows: if ok { 1 } else { 0 },
1687                    statement_type: "update",
1688                })
1689            }
1690
1691            KvCommand::Delete {
1692                model,
1693                collection,
1694                key,
1695            } => {
1696                if *model == crate::catalog::CollectionModel::Vault {
1697                    self.check_system_vault_capability("vault:write", collection, key)
1698                        .map_err(RedDBError::Query)?;
1699                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1700                    let entry = ops.append_vault_version(
1701                        collection,
1702                        key,
1703                        Value::Null,
1704                        "delete",
1705                        true,
1706                        &[],
1707                    )?;
1708                    self.record_kv_watch_event(
1709                        crate::replication::cdc::ChangeOperation::Delete,
1710                        collection,
1711                        key,
1712                        entry.id.raw(),
1713                        None,
1714                        Some(vault_entry_metadata_json(&entry)),
1715                    );
1716                    self.audit_vault_lifecycle(
1717                        "delete",
1718                        collection,
1719                        key,
1720                        crate::runtime::audit_log::Outcome::Success,
1721                        "ok",
1722                        Some(&entry),
1723                    );
1724                    return Ok(vault_write_result(
1725                        raw_query,
1726                        "vault_delete",
1727                        "delete",
1728                        collection,
1729                        key,
1730                        &entry,
1731                        1,
1732                    ));
1733                }
1734                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1735                let deleted = ops.delete(*model, collection, key)?;
1736
1737                let mut result = UnifiedResult::with_columns(vec![
1738                    "ok".into(),
1739                    "collection".into(),
1740                    "key".into(),
1741                    "deleted".into(),
1742                ]);
1743                let mut record = UnifiedRecord::new();
1744                record.set("ok", Value::Boolean(true));
1745                record.set("collection", Value::text(collection.clone()));
1746                record.set("key", Value::text(key.clone()));
1747                record.set("deleted", Value::Boolean(deleted));
1748                result.push(record);
1749
1750                Ok(RuntimeQueryResult {
1751                    query: raw_query.to_string(),
1752                    mode: crate::storage::query::modes::QueryMode::Sql,
1753                    statement: "kv_delete",
1754                    engine: "kv",
1755                    result,
1756                    affected_rows: if deleted { 1 } else { 0 },
1757                    statement_type: "delete",
1758                })
1759            }
1760        }
1761    }
1762
1763    pub fn vault_watch_events_since(
1764        &self,
1765        collection: &str,
1766        key: &str,
1767        since_lsn: u64,
1768        max_count: usize,
1769    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1770        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1771            .into_iter()
1772            .filter(|event| {
1773                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1774                    .is_ok()
1775            })
1776            .map(vault_filter_watch_event)
1777            .collect()
1778    }
1779
1780    pub fn vault_watch_events_since_prefix(
1781        &self,
1782        collection: &str,
1783        prefix: &str,
1784        since_lsn: u64,
1785        max_count: usize,
1786    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1787        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1788            .into_iter()
1789            .filter(|event| {
1790                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1791                    .is_ok()
1792            })
1793            .map(vault_filter_watch_event)
1794            .collect()
1795    }
1796
1797    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
1798        let auth_store = match self.inner.auth_store.read().clone() {
1799            Some(store) => store,
1800            None => return Ok(()),
1801        };
1802        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1803            Some(identity) => identity,
1804            None => return Ok(()),
1805        };
1806        if role < crate::auth::Role::Write {
1807            return Err(RedDBError::Query(format!(
1808                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
1809            )));
1810        }
1811        if !auth_store.iam_authorization_enabled() {
1812            return Ok(());
1813        }
1814
1815        let tenant = crate::runtime::impl_core::current_tenant();
1816        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1817        let mut resource =
1818            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
1819        if let Some(tenant) = tenant.clone() {
1820            resource = resource.with_tenant(tenant);
1821        }
1822        let ctx = crate::auth::policies::EvalContext {
1823            principal_tenant: tenant.clone(),
1824            current_tenant: tenant,
1825            peer_ip: None,
1826            mfa_present: false,
1827            now_ms: current_unix_ms(),
1828            principal_is_admin_role: role == crate::auth::Role::Admin,
1829        };
1830        if auth_store.check_policy_authz(&principal, "kv:invalidate", &resource, &ctx) {
1831            Ok(())
1832        } else {
1833            Err(RedDBError::Query(format!(
1834                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
1835            )))
1836        }
1837    }
1838}
1839
1840fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
1841    let ttl_ms = ttl_ms?;
1842    Some(Metadata::with_fields(
1843        [(
1844            "_ttl_ms".to_string(),
1845            if ttl_ms <= i64::MAX as u64 {
1846                MetadataValue::Int(ttl_ms as i64)
1847            } else {
1848                MetadataValue::Timestamp(ttl_ms)
1849            },
1850        )]
1851        .into_iter()
1852        .collect(),
1853    ))
1854}
1855
1856fn vault_write_result(
1857    raw_query: &str,
1858    statement: &'static str,
1859    statement_type: &'static str,
1860    collection: &str,
1861    key: &str,
1862    entry: &VaultEntry,
1863    affected_rows: u64,
1864) -> RuntimeQueryResult {
1865    let mut result = UnifiedResult::with_columns(vec![
1866        "ok".into(),
1867        "collection".into(),
1868        "key".into(),
1869        "version".into(),
1870        "fingerprint".into(),
1871        "tombstone".into(),
1872        "op".into(),
1873        "id".into(),
1874    ]);
1875    let mut record = UnifiedRecord::new();
1876    record.set("ok", Value::Boolean(true));
1877    record.set("collection", Value::text(collection.to_string()));
1878    record.set("key", Value::text(key.to_string()));
1879    record.set("version", Value::Integer(entry.version));
1880    if entry.tombstone {
1881        record.set("fingerprint", Value::Null);
1882    } else {
1883        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1884    }
1885    record.set("tombstone", Value::Boolean(entry.tombstone));
1886    record.set("op", Value::text(entry.op.clone()));
1887    record.set("id", Value::Integer(entry.id.raw() as i64));
1888    result.push(record);
1889    RuntimeQueryResult {
1890        query: raw_query.to_string(),
1891        mode: crate::storage::query::modes::QueryMode::Sql,
1892        statement,
1893        engine: "vault",
1894        result,
1895        affected_rows,
1896        statement_type,
1897    }
1898}
1899
1900fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
1901    let mut result = UnifiedResult::with_columns(vec![
1902        "collection".into(),
1903        "key".into(),
1904        "version".into(),
1905        "fingerprint".into(),
1906        "tags".into(),
1907        "created_at".into(),
1908        "updated_at".into(),
1909        "status".into(),
1910        "tombstone".into(),
1911        "op".into(),
1912    ]);
1913    for entry in versions {
1914        push_vault_metadata_record(&mut result, collection, key, entry);
1915    }
1916    result
1917}
1918
1919fn push_vault_metadata_record(
1920    result: &mut UnifiedResult,
1921    collection: &str,
1922    key: &str,
1923    entry: &VaultEntry,
1924) {
1925    let mut record = UnifiedRecord::new();
1926    record.set("collection", Value::text(collection.to_string()));
1927    record.set("key", Value::text(key.to_string()));
1928    record.set("version", Value::Integer(entry.version));
1929    if entry.tombstone {
1930        record.set("fingerprint", Value::Null);
1931        record.set("status", Value::text("deleted"));
1932    } else {
1933        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1934        record.set("status", Value::text("sealed"));
1935    }
1936    record.set("tags", vault_tags_value(&entry.metadata));
1937    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1938    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1939    record.set("tombstone", Value::Boolean(entry.tombstone));
1940    record.set("op", Value::text(entry.op.clone()));
1941    result.push(record);
1942}
1943
1944fn vault_metadata_result(
1945    collection: &str,
1946    key: &str,
1947    entry: Option<&VaultEntry>,
1948    key_available: bool,
1949) -> UnifiedResult {
1950    let mut result = UnifiedResult::with_columns(vec![
1951        "collection".into(),
1952        "key".into(),
1953        "version".into(),
1954        "fingerprint".into(),
1955        "tags".into(),
1956        "created_at".into(),
1957        "updated_at".into(),
1958        "value".into(),
1959        "status".into(),
1960        "tombstone".into(),
1961        "op".into(),
1962    ]);
1963    let mut record = UnifiedRecord::new();
1964    record.set("collection", Value::text(collection.to_string()));
1965    record.set("key", Value::text(key.to_string()));
1966    match entry {
1967        Some(entry) => {
1968            record.set("version", Value::Integer(entry.version));
1969            if entry.tombstone {
1970                record.set("fingerprint", Value::Null);
1971            } else {
1972                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1973            }
1974            record.set("tags", vault_tags_value(&entry.metadata));
1975            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1976            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1977            record.set("value", Value::text("***"));
1978            record.set(
1979                "status",
1980                Value::text(if entry.tombstone {
1981                    "deleted"
1982                } else if key_available {
1983                    "sealed"
1984                } else {
1985                    "sealed_unavailable"
1986                }),
1987            );
1988            record.set("tombstone", Value::Boolean(entry.tombstone));
1989            record.set("op", Value::text(entry.op.clone()));
1990        }
1991        None => {
1992            record.set("version", Value::Null);
1993            record.set("fingerprint", Value::Null);
1994            record.set("tags", Value::Array(Vec::new()));
1995            record.set("created_at", Value::Null);
1996            record.set("updated_at", Value::Null);
1997            record.set("value", Value::text(""));
1998            record.set("status", Value::text("missing"));
1999            record.set("tombstone", Value::Boolean(false));
2000            record.set("op", Value::Null);
2001        }
2002    }
2003    result.push(record);
2004    result
2005}
2006
2007fn vault_fingerprint(value: &Value) -> String {
2008    match value {
2009        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2010        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2011    }
2012}
2013
2014fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2015    let mut object = crate::json::Map::new();
2016    object.insert(
2017        "key".to_string(),
2018        crate::json::Value::String(entry.key.clone()),
2019    );
2020    object.insert(
2021        "version".to_string(),
2022        crate::json::Value::Number(entry.version as f64),
2023    );
2024    object.insert(
2025        "fingerprint".to_string(),
2026        if entry.tombstone {
2027            crate::json::Value::Null
2028        } else {
2029            crate::json::Value::String(vault_fingerprint(&entry.value))
2030        },
2031    );
2032    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2033    object.insert(
2034        "actor".to_string(),
2035        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2036    );
2037    object.insert(
2038        "sequence_id".to_string(),
2039        crate::json::Value::Number(entry.sequence_id as f64),
2040    );
2041    object.insert(
2042        "tombstone".to_string(),
2043        crate::json::Value::Bool(entry.tombstone),
2044    );
2045    object.insert(
2046        "op".to_string(),
2047        crate::json::Value::String(entry.op.clone()),
2048    );
2049    crate::json::Value::Object(object)
2050}
2051
2052fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2053    match vault_tags_value(metadata) {
2054        Value::Array(values) => crate::json::Value::Array(
2055            values
2056                .into_iter()
2057                .filter_map(|value| match value {
2058                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2059                    _ => None,
2060                })
2061                .collect(),
2062        ),
2063        _ => crate::json::Value::Array(Vec::new()),
2064    }
2065}
2066
2067fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2068    [(
2069        "tags".to_string(),
2070        MetadataValue::Array(
2071            tags.iter()
2072                .map(|tag| MetadataValue::String(tag.clone()))
2073                .collect(),
2074        ),
2075    )]
2076    .into_iter()
2077    .collect()
2078}
2079
2080fn vault_filter_watch_event(
2081    mut event: crate::replication::cdc::KvWatchEvent,
2082) -> crate::replication::cdc::KvWatchEvent {
2083    event.before = event.before.and_then(vault_metadata_json_only);
2084    event.after = event.after.and_then(vault_metadata_json_only);
2085    event
2086}
2087
2088fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2089    let object = value.as_object()?;
2090    let mut out = crate::json::Map::new();
2091    for field in [
2092        "key",
2093        "version",
2094        "fingerprint",
2095        "tags",
2096        "actor",
2097        "sequence_id",
2098        "tombstone",
2099        "op",
2100    ] {
2101        if let Some(value) = object.get(field) {
2102            out.insert(field.to_string(), value.clone());
2103        }
2104    }
2105    Some(crate::json::Value::Object(out))
2106}
2107
2108fn vault_tags_value(metadata: &Metadata) -> Value {
2109    match metadata.get("tags") {
2110        Some(MetadataValue::Array(values)) => Value::Array(
2111            values
2112                .iter()
2113                .filter_map(|value| match value {
2114                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2115                    _ => None,
2116                })
2117                .collect(),
2118        ),
2119        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2120            Value::Array(vec![Value::text(tag.clone())])
2121        }
2122        _ => Value::Array(Vec::new()),
2123    }
2124}
2125
2126fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2127    let bytes = hex::decode(hex_key)
2128        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2129    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2130        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2131    })?;
2132    Ok(key)
2133}
2134
2135fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2136    if tags.is_empty() {
2137        return None;
2138    }
2139    let values = tags
2140        .iter()
2141        .map(|tag| MetadataValue::String(tag.clone()))
2142        .collect();
2143    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2144}
2145
2146fn kv_tags_value(tags: &[String]) -> Value {
2147    let json = crate::json::Value::Array(
2148        tags.iter()
2149            .map(|tag| crate::json::Value::String(tag.clone()))
2150            .collect(),
2151    );
2152    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2153}
2154
2155fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2156    if let crate::storage::EntityData::Row(ref row) = entity.data {
2157        if let Some(ref named) = row.named {
2158            return named.get("value").cloned();
2159        }
2160    }
2161    None
2162}
2163
2164fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2165    let now = current_unix_ms();
2166    crate::physical::CollectionContract {
2167        name: name.to_string(),
2168        declared_model: crate::catalog::CollectionModel::Kv,
2169        schema_mode: crate::catalog::SchemaMode::Dynamic,
2170        origin: crate::physical::ContractOrigin::Implicit,
2171        version: 1,
2172        created_at_unix_ms: now,
2173        updated_at_unix_ms: now,
2174        default_ttl_ms: None,
2175        vector_dimension: None,
2176        vector_metric: None,
2177        context_index_fields: Vec::new(),
2178        declared_columns: Vec::new(),
2179        table_def: None,
2180        timestamps_enabled: false,
2181        context_index_enabled: false,
2182        metrics_raw_retention_ms: None,
2183        metrics_rollup_policies: Vec::new(),
2184        metrics_tenant_identity: None,
2185        metrics_namespace: None,
2186        append_only: false,
2187        subscriptions: Vec::new(),
2188    }
2189}
2190
2191fn current_unix_ms() -> u128 {
2192    std::time::SystemTime::now()
2193        .duration_since(std::time::UNIX_EPOCH)
2194        .unwrap_or_default()
2195        .as_millis()
2196}
2197
2198#[cfg(test)]
2199mod tests {
2200    use crate::api::RedDBOptions;
2201    use crate::catalog::CollectionModel;
2202    use crate::runtime::RedDBRuntime;
2203
2204    fn rt() -> RedDBRuntime {
2205        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2206    }
2207
2208    #[test]
2209    fn incr_missing_key_initialises_at_by() {
2210        let r = rt();
2211        let ops = super::KvAtomicOps::new(&r);
2212        let v = ops
2213            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2214            .unwrap();
2215        assert_eq!(v, 5);
2216    }
2217
2218    #[test]
2219    fn kv_runtime_stats_count_public_ops() {
2220        let r = rt();
2221        let ops = super::KvAtomicOps::new(&r);
2222
2223        ops.set(
2224            CollectionModel::Kv,
2225            "kv_default",
2226            "profile",
2227            crate::storage::schema::Value::text("alice"),
2228            None,
2229            false,
2230        )
2231        .unwrap();
2232        ops.get(CollectionModel::Kv, "kv_default", "profile")
2233            .unwrap();
2234        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2235            .unwrap();
2236        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2237            .unwrap();
2238        ops.cas(
2239            CollectionModel::Kv,
2240            "kv_default",
2241            "profile",
2242            None,
2243            crate::storage::schema::Value::text("created"),
2244            None,
2245        )
2246        .unwrap();
2247        ops.cas(
2248            CollectionModel::Kv,
2249            "kv_default",
2250            "profile",
2251            Some(&crate::storage::schema::Value::text("different")),
2252            crate::storage::schema::Value::text("ignored"),
2253            None,
2254        )
2255        .unwrap();
2256
2257        let stats = r.stats().kv;
2258        assert_eq!(stats.puts, 1);
2259        assert_eq!(stats.gets, 1);
2260        assert_eq!(stats.deletes, 1);
2261        assert_eq!(stats.incrs, 1);
2262        assert_eq!(stats.cas_success, 1);
2263        assert_eq!(stats.cas_conflict, 1);
2264    }
2265
2266    #[test]
2267    fn kv_invalidate_tags_removes_matching_entries_only() {
2268        let r = rt();
2269
2270        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2271            .unwrap();
2272
2273        let miss = r
2274            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2275            .unwrap();
2276        assert_eq!(miss.affected_rows, 0);
2277        assert!(matches!(
2278            r.execute_query("KV GET sessions.blob")
2279                .unwrap()
2280                .result
2281                .records[0]
2282                .get("value"),
2283            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2284        ));
2285
2286        let hit = r
2287            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2288            .unwrap();
2289        assert_eq!(hit.affected_rows, 1);
2290        assert!(matches!(
2291            r.execute_query("KV GET sessions.blob")
2292                .unwrap()
2293                .result
2294                .records[0]
2295                .get("value"),
2296            Some(crate::storage::schema::Value::Null)
2297        ));
2298    }
2299
2300    #[test]
2301    fn kv_runtime_stats_count_watch_streams_and_events() {
2302        let r = rt();
2303        let ops = super::KvAtomicOps::new(&r);
2304        assert_eq!(r.stats().kv.watch_streams_active, 0);
2305
2306        {
2307            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2308            assert_eq!(r.stats().kv.watch_streams_active, 1);
2309
2310            ops.set(
2311                CollectionModel::Kv,
2312                "kv_default",
2313                "watched",
2314                crate::storage::schema::Value::Integer(1),
2315                None,
2316                false,
2317            )
2318            .unwrap();
2319            let event = stream.poll_next().expect("watch event");
2320            assert_eq!(event.key, "watched");
2321            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2322
2323            stream.record_drop_count(3);
2324            assert_eq!(r.stats().kv.watch_drops, 3);
2325        }
2326
2327        assert_eq!(r.stats().kv.watch_streams_active, 0);
2328    }
2329
2330    #[test]
2331    fn incr_existing_integer_accumulates() {
2332        let r = rt();
2333        let ops = super::KvAtomicOps::new(&r);
2334        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2335            .unwrap();
2336        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2337            .unwrap();
2338        let v = ops
2339            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2340            .unwrap();
2341        assert_eq!(v, 3);
2342    }
2343
2344    #[test]
2345    fn decr_via_negative_by() {
2346        let r = rt();
2347        let ops = super::KvAtomicOps::new(&r);
2348        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2349            .unwrap();
2350        let v = ops
2351            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2352            .unwrap();
2353        assert_eq!(v, 7);
2354    }
2355
2356    #[test]
2357    fn concurrent_incr_single_key_is_atomic() {
2358        const THREADS: usize = 8;
2359        const ITERS: usize = 1000;
2360
2361        let runtime = std::sync::Arc::new(rt());
2362        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2363        let mut handles = Vec::new();
2364
2365        for _ in 0..THREADS {
2366            let runtime = std::sync::Arc::clone(&runtime);
2367            let barrier = std::sync::Arc::clone(&barrier);
2368            handles.push(std::thread::spawn(move || {
2369                let ops = super::KvAtomicOps::new(&runtime);
2370                barrier.wait();
2371                for _ in 0..ITERS {
2372                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2373                        .unwrap();
2374                }
2375            }));
2376        }
2377
2378        for handle in handles {
2379            handle.join().expect("worker should finish");
2380        }
2381
2382        let ops = super::KvAtomicOps::new(&runtime);
2383        assert_eq!(
2384            ops.get(CollectionModel::Kv, "kv_default", "counter")
2385                .unwrap(),
2386            Some(crate::storage::schema::Value::Integer(
2387                (THREADS * ITERS) as i64
2388            ))
2389        );
2390    }
2391
2392    #[test]
2393    fn incr_on_string_value_returns_error() {
2394        let r = rt();
2395        let ops = super::KvAtomicOps::new(&r);
2396        ops.set(
2397            CollectionModel::Kv,
2398            "kv_default",
2399            "name",
2400            crate::storage::schema::Value::text("alice"),
2401            None,
2402            false,
2403        )
2404        .unwrap();
2405        let err = ops
2406            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2407            .unwrap_err();
2408        assert!(err.to_string().contains("non-integer"));
2409    }
2410
2411    // --- CAS tests ---
2412
2413    #[test]
2414    fn cas_matching_value_succeeds() {
2415        let r = rt();
2416        let ops = super::KvAtomicOps::new(&r);
2417        ops.set(
2418            CollectionModel::Kv,
2419            "kv_default",
2420            "lock",
2421            crate::storage::schema::Value::text("free"),
2422            None,
2423            false,
2424        )
2425        .unwrap();
2426        let (ok, prev) = ops
2427            .cas(
2428                CollectionModel::Kv,
2429                "kv_default",
2430                "lock",
2431                Some(&crate::storage::schema::Value::text("free")),
2432                crate::storage::schema::Value::text("held"),
2433                None,
2434            )
2435            .unwrap();
2436        assert!(ok);
2437        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2438        // Value actually changed.
2439        assert_eq!(
2440            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2441            Some(crate::storage::schema::Value::text("held"))
2442        );
2443    }
2444
2445    #[test]
2446    fn concurrent_cas_allows_one_success_per_round() {
2447        const THREADS: usize = 8;
2448        const ROUNDS: usize = 100;
2449
2450        let runtime = std::sync::Arc::new(rt());
2451        let ops = super::KvAtomicOps::new(&runtime);
2452        ops.set(
2453            CollectionModel::Kv,
2454            "kv_default",
2455            "cas_counter",
2456            crate::storage::schema::Value::Integer(0),
2457            None,
2458            false,
2459        )
2460        .unwrap();
2461
2462        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2463        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2464        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2465        let mut handles = Vec::new();
2466
2467        for _ in 0..THREADS {
2468            let runtime = std::sync::Arc::clone(&runtime);
2469            let start_round = std::sync::Arc::clone(&start_round);
2470            let finish_round = std::sync::Arc::clone(&finish_round);
2471            let successes = std::sync::Arc::clone(&successes);
2472            handles.push(std::thread::spawn(move || {
2473                let ops = super::KvAtomicOps::new(&runtime);
2474                for round in 0..ROUNDS {
2475                    start_round.wait();
2476                    let (ok, _) = ops
2477                        .cas(
2478                            CollectionModel::Kv,
2479                            "kv_default",
2480                            "cas_counter",
2481                            Some(&crate::storage::schema::Value::Integer(round as i64)),
2482                            crate::storage::schema::Value::Integer((round + 1) as i64),
2483                            None,
2484                        )
2485                        .unwrap();
2486                    if ok {
2487                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2488                    }
2489                    finish_round.wait();
2490                }
2491            }));
2492        }
2493
2494        for handle in handles {
2495            handle.join().expect("worker should finish");
2496        }
2497
2498        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2499        assert_eq!(
2500            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2501                .unwrap(),
2502            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2503        );
2504    }
2505
2506    #[test]
2507    fn cas_mismatching_value_fails() {
2508        let r = rt();
2509        let ops = super::KvAtomicOps::new(&r);
2510        ops.set(
2511            CollectionModel::Kv,
2512            "kv_default",
2513            "lock",
2514            crate::storage::schema::Value::text("free"),
2515            None,
2516            false,
2517        )
2518        .unwrap();
2519        let (ok, current) = ops
2520            .cas(
2521                CollectionModel::Kv,
2522                "kv_default",
2523                "lock",
2524                Some(&crate::storage::schema::Value::text("held")),
2525                crate::storage::schema::Value::text("worker-7"),
2526                None,
2527            )
2528            .unwrap();
2529        assert!(!ok);
2530        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2531        // Value unchanged.
2532        assert_eq!(
2533            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2534            Some(crate::storage::schema::Value::text("free"))
2535        );
2536    }
2537
2538    #[test]
2539    fn cas_expect_null_on_missing_key_creates() {
2540        let r = rt();
2541        let ops = super::KvAtomicOps::new(&r);
2542        let (ok, prev) = ops
2543            .cas(
2544                CollectionModel::Kv,
2545                "kv_default",
2546                "new_key",
2547                None,
2548                crate::storage::schema::Value::text("created"),
2549                None,
2550            )
2551            .unwrap();
2552        assert!(ok);
2553        assert_eq!(prev, None);
2554        assert_eq!(
2555            ops.get(CollectionModel::Kv, "kv_default", "new_key")
2556                .unwrap(),
2557            Some(crate::storage::schema::Value::text("created"))
2558        );
2559    }
2560
2561    #[test]
2562    fn cas_expect_null_on_existing_key_fails() {
2563        let r = rt();
2564        let ops = super::KvAtomicOps::new(&r);
2565        ops.set(
2566            CollectionModel::Kv,
2567            "kv_default",
2568            "taken",
2569            crate::storage::schema::Value::text("worker-1"),
2570            None,
2571            false,
2572        )
2573        .unwrap();
2574        let (ok, current) = ops
2575            .cas(
2576                CollectionModel::Kv,
2577                "kv_default",
2578                "taken",
2579                None,
2580                crate::storage::schema::Value::text("worker-2"),
2581                None,
2582            )
2583            .unwrap();
2584        assert!(!ok);
2585        assert_eq!(
2586            current,
2587            Some(crate::storage::schema::Value::text("worker-1"))
2588        );
2589    }
2590
2591    #[test]
2592    fn cas_via_sql_roundtrip() {
2593        let r = rt();
2594        // Seed value.
2595        r.execute_query("KV PUT lock = 'free'").unwrap();
2596        // CAS: free → held — should succeed.
2597        let res = r
2598            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2599            .unwrap();
2600        let row = &res.result.records[0];
2601        assert_eq!(
2602            row.get("ok"),
2603            Some(&crate::storage::schema::Value::Boolean(true))
2604        );
2605        // CAS: free → held again — should fail (value is now 'held').
2606        let res2 = r
2607            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2608            .unwrap();
2609        let row2 = &res2.result.records[0];
2610        assert_eq!(
2611            row2.get("ok"),
2612            Some(&crate::storage::schema::Value::Boolean(false))
2613        );
2614    }
2615
2616    #[test]
2617    fn cas_expect_null_via_sql() {
2618        let r = rt();
2619        let res = r
2620            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2621            .unwrap();
2622        let row = &res.result.records[0];
2623        assert_eq!(
2624            row.get("ok"),
2625            Some(&crate::storage::schema::Value::Boolean(true))
2626        );
2627        // Second call must fail.
2628        let res2 = r
2629            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2630            .unwrap();
2631        let row2 = &res2.result.records[0];
2632        assert_eq!(
2633            row2.get("ok"),
2634            Some(&crate::storage::schema::Value::Boolean(false))
2635        );
2636    }
2637
2638    #[test]
2639    fn incr_via_sql_roundtrip() {
2640        let r = rt();
2641        let res = r.execute_query("KV INCR hits").unwrap();
2642        let row = &res.result.records[0];
2643        assert_eq!(
2644            row.get("value"),
2645            Some(&crate::storage::schema::Value::Integer(1))
2646        );
2647        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2648        let row2 = &res2.result.records[0];
2649        assert_eq!(
2650            row2.get("value"),
2651            Some(&crate::storage::schema::Value::Integer(5))
2652        );
2653    }
2654
2655    #[test]
2656    fn concurrent_self_referential_update_is_atomic() {
2657        const THREADS: usize = 8;
2658        const ITERS: usize = 100;
2659
2660        let runtime = std::sync::Arc::new(rt());
2661        runtime
2662            .execute_query("CREATE TABLE counters (id INT, n INT)")
2663            .unwrap();
2664        runtime
2665            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
2666            .unwrap();
2667
2668        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2669        let mut handles = Vec::new();
2670        for _ in 0..THREADS {
2671            let runtime = std::sync::Arc::clone(&runtime);
2672            let barrier = std::sync::Arc::clone(&barrier);
2673            handles.push(std::thread::spawn(move || {
2674                barrier.wait();
2675                for _ in 0..ITERS {
2676                    runtime
2677                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
2678                        .unwrap();
2679                }
2680            }));
2681        }
2682
2683        for handle in handles {
2684            handle.join().expect("worker should finish");
2685        }
2686
2687        let selected = runtime
2688            .execute_query("SELECT n FROM counters WHERE id = 1")
2689            .unwrap();
2690        assert_eq!(
2691            selected.result.records[0].get("n"),
2692            Some(&crate::storage::schema::Value::Integer(
2693                (THREADS * ITERS) as i64
2694            ))
2695        );
2696    }
2697
2698    #[test]
2699    fn watch_stream_delivers_key_events_in_lsn_order() {
2700        let r = rt();
2701        let ops = super::KvAtomicOps::new(&r);
2702        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
2703
2704        ops.set(
2705            CollectionModel::Kv,
2706            "kv_default",
2707            "seq",
2708            crate::storage::schema::Value::Integer(1),
2709            None,
2710            false,
2711        )
2712        .unwrap();
2713        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
2714            .unwrap();
2715        ops.delete(CollectionModel::Kv, "kv_default", "seq")
2716            .unwrap();
2717        ops.set(
2718            CollectionModel::Kv,
2719            "kv_default",
2720            "seq",
2721            crate::storage::schema::Value::Integer(9),
2722            None,
2723            false,
2724        )
2725        .unwrap();
2726
2727        let mut events = Vec::new();
2728        while let Some(event) = stream.poll_next() {
2729            events.push(event);
2730            if events.len() == 4 {
2731                break;
2732            }
2733        }
2734
2735        assert_eq!(events.len(), 4);
2736        assert_eq!(
2737            events[0].op,
2738            crate::replication::cdc::ChangeOperation::Insert
2739        );
2740        assert_eq!(
2741            events[1].op,
2742            crate::replication::cdc::ChangeOperation::Update
2743        );
2744        assert_eq!(
2745            events[2].op,
2746            crate::replication::cdc::ChangeOperation::Delete
2747        );
2748        assert_eq!(
2749            events[3].op,
2750            crate::replication::cdc::ChangeOperation::Insert
2751        );
2752        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
2753    }
2754
2755    #[test]
2756    fn watch_prefix_stream_delivers_matching_events_only() {
2757        let r = rt();
2758        let ops = super::KvAtomicOps::new(&r);
2759        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
2760
2761        ops.set(
2762            CollectionModel::Kv,
2763            "kv_default",
2764            "acct:1",
2765            crate::storage::schema::Value::Integer(1),
2766            None,
2767            false,
2768        )
2769        .unwrap();
2770        ops.set(
2771            CollectionModel::Kv,
2772            "kv_default",
2773            "session:1",
2774            crate::storage::schema::Value::Integer(2),
2775            None,
2776            false,
2777        )
2778        .unwrap();
2779        ops.set(
2780            CollectionModel::Kv,
2781            "kv_default",
2782            "acct:2",
2783            crate::storage::schema::Value::Integer(3),
2784            None,
2785            false,
2786        )
2787        .unwrap();
2788
2789        let first = stream.poll_next().expect("first prefix event");
2790        let second = stream.poll_next().expect("second prefix event");
2791        assert_eq!(first.key, "acct:1");
2792        assert_eq!(second.key, "acct:2");
2793        assert!(stream.poll_next().is_none());
2794    }
2795
2796    #[test]
2797    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
2798        let r = rt();
2799        let ops = super::KvAtomicOps::new(&r);
2800        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
2801
2802        let mut last_seen_lsn = 0;
2803        for value in 0..5 {
2804            ops.set(
2805                CollectionModel::Kv,
2806                "kv_default",
2807                "resume",
2808                crate::storage::schema::Value::Integer(value),
2809                None,
2810                false,
2811            )
2812            .unwrap();
2813            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
2814        }
2815        drop(stream);
2816
2817        for value in 5..55 {
2818            ops.set(
2819                CollectionModel::Kv,
2820                "kv_default",
2821                "resume",
2822                crate::storage::schema::Value::Integer(value),
2823                None,
2824                false,
2825            )
2826            .unwrap();
2827        }
2828
2829        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
2830        let mut lsns = Vec::new();
2831        while let Some(event) = resumed.poll_next() {
2832            lsns.push(event.lsn);
2833            if lsns.len() == 50 {
2834                break;
2835            }
2836        }
2837
2838        assert_eq!(lsns.len(), 50);
2839        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
2840        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
2841        assert!(resumed.poll_next().is_none());
2842    }
2843
2844    #[test]
2845    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
2846        let r = rt();
2847        let ops = super::KvAtomicOps::new(&r);
2848        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
2849
2850        for value in 0..10_000 {
2851            ops.set(
2852                CollectionModel::Kv,
2853                "kv_default",
2854                "slow",
2855                crate::storage::schema::Value::Integer(value),
2856                None,
2857                false,
2858            )
2859            .unwrap();
2860        }
2861
2862        let event = stream.poll_next().expect("tail event after drops");
2863        assert!(event.lsn > 1);
2864        assert!(event.dropped_event_count > 0);
2865        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
2866        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
2867    }
2868
2869    #[test]
2870    fn watch_stream_idle_timeout_closes_subscription() {
2871        let r = rt();
2872        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
2873            .unwrap();
2874
2875        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
2876        assert_eq!(r.stats().kv.watch_streams_active, 1);
2877        std::thread::sleep(std::time::Duration::from_millis(5));
2878
2879        assert!(stream.poll_next().is_none());
2880        assert_eq!(r.stats().kv.watch_streams_active, 0);
2881    }
2882
2883    #[test]
2884    fn watch_stream_does_not_emit_rolled_back_put() {
2885        let r = rt();
2886        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
2887
2888        r.execute_query("BEGIN").unwrap();
2889        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
2890        r.execute_query("ROLLBACK").unwrap();
2891
2892        assert!(stream.poll_next().is_none());
2893    }
2894}