Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Delete old entry so we can create fresh (handles TTL refresh).
224        if was_present {
225            self.delete(model, collection, key)?;
226        }
227
228        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
229            metadata.extend(ttl_metadata.fields);
230        }
231        if let Some(tags_metadata) = kv_tags_metadata(tags) {
232            metadata.push(tags_metadata);
233        }
234
235        let output = self
236            .runtime
237            .create_kv(crate::application::entity::CreateKvInput {
238                collection: collection.to_string(),
239                key: key.to_string(),
240                value,
241                metadata,
242            })?;
243        if model == crate::catalog::CollectionModel::Kv {
244            self.runtime
245                .inner
246                .kv_tag_index
247                .replace(collection, key, output.id, tags);
248        }
249
250        if model == crate::catalog::CollectionModel::Kv {
251            self.runtime
252                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
253        }
254
255        if model == crate::catalog::CollectionModel::Kv {
256            self.runtime.inner.kv_stats.incr_puts();
257        }
258        Ok((!was_present, output.id))
259    }
260
261    /// Retrieve a KV value by key. Returns `None` when not found.
262    pub fn get(
263        &self,
264        model: crate::catalog::CollectionModel,
265        collection: &str,
266        key: &str,
267    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
268        let result = self.get_entry(model, collection, key)?;
269        if model == crate::catalog::CollectionModel::Kv {
270            self.runtime.inner.kv_stats.incr_gets();
271        }
272        Ok(result.map(|(v, _)| v))
273    }
274
275    /// Delete a KV entry. Returns `true` if the key existed.
276    pub fn delete(
277        &self,
278        model: crate::catalog::CollectionModel,
279        collection: &str,
280        key: &str,
281    ) -> RedDBResult<bool> {
282        self.ensure_declared_model(model, collection)?;
283        let found = self.get_entry(model, collection, key)?;
284        if let Some((value, id)) = found {
285            let store = self.runtime.inner.db.store();
286            let deleted = store
287                .delete(collection, id)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289            if deleted {
290                store.context_index().remove_entity(id);
291                if model == crate::catalog::CollectionModel::Kv {
292                    self.runtime.inner.kv_tag_index.remove(collection, key);
293                    self.runtime.record_kv_watch_event(
294                        crate::replication::cdc::ChangeOperation::Delete,
295                        collection,
296                        key,
297                        id.raw(),
298                        Some(crate::presentation::entity_json::storage_value_to_json(
299                            &value,
300                        )),
301                        None,
302                    );
303                    self.runtime.inner.kv_stats.incr_deletes();
304                }
305            }
306            Ok(deleted)
307        } else {
308            Ok(false)
309        }
310    }
311
312    /// Atomically increment (or decrement) a counter key. Returns the new value.
313    ///
314    /// - Missing key initialises at `by` (Redis-compat).
315    /// - Non-integer value returns an error before any mutation.
316    pub fn incr(
317        &self,
318        model: crate::catalog::CollectionModel,
319        collection: &str,
320        key: &str,
321        by: i64,
322        ttl_ms: Option<u64>,
323    ) -> RedDBResult<i64> {
324        if model == crate::catalog::CollectionModel::Vault {
325            return Err(RedDBError::InvalidOperation(
326                "VAULT INCR is not supported for sealed secrets".to_string(),
327            ));
328        }
329        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
330        let _rmw_guard = rmw_lock.lock();
331        self.ensure_kv_collection(collection)?;
332        let existing = self.runtime.get_kv(collection, key)?;
333        let current: i64 = match existing.as_ref() {
334            None => 0,
335            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
336            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
337            Some((other, _)) => {
338                return Err(RedDBError::Internal(format!(
339                    "INCR on non-integer value: {:?}",
340                    other
341                )));
342            }
343        };
344
345        let next = current
346            .checked_add(by)
347            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
348
349        // Delete then re-create so TTL is refreshed.
350        if existing.is_some() {
351            self.runtime.delete_kv(collection, key)?;
352        }
353
354        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
355            .map(|m| m.fields.into_iter().collect())
356            .unwrap_or_default();
357
358        let output = self
359            .runtime
360            .create_kv(crate::application::entity::CreateKvInput {
361                collection: collection.to_string(),
362                key: key.to_string(),
363                value: crate::storage::schema::Value::Integer(next),
364                metadata: meta_vec,
365            })?;
366        self.runtime
367            .inner
368            .kv_tag_index
369            .replace(collection, key, output.id, &[]);
370
371        self.runtime.record_kv_watch_event(
372            if existing.is_some() {
373                crate::replication::cdc::ChangeOperation::Update
374            } else {
375                crate::replication::cdc::ChangeOperation::Insert
376            },
377            collection,
378            key,
379            output.id.raw(),
380            existing
381                .as_ref()
382                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
383            Some(crate::presentation::entity_json::storage_value_to_json(
384                &crate::storage::schema::Value::Integer(next),
385            )),
386        );
387
388        self.runtime.inner.kv_stats.incr_incrs();
389        Ok(next)
390    }
391
392    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
393    ///
394    /// Returns `(ok, current)`:
395    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
396    /// - `ok = false` → swap skipped; `current` holds the actual current value.
397    ///
398    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
399    pub fn cas(
400        &self,
401        model: crate::catalog::CollectionModel,
402        collection: &str,
403        key: &str,
404        expected: Option<&crate::storage::schema::Value>,
405        new_value: crate::storage::schema::Value,
406        ttl_ms: Option<u64>,
407    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
408        if model == crate::catalog::CollectionModel::Vault {
409            return Err(RedDBError::InvalidOperation(
410                "VAULT CAS is not supported for sealed secrets".to_string(),
411            ));
412        }
413        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
414        let _rmw_guard = rmw_lock.lock();
415        self.ensure_kv_collection(collection)?;
416        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
417
418        let matches = match (&current, expected) {
419            (None, None) => true,
420            (Some(cur), Some(exp)) => cur == exp,
421            _ => false,
422        };
423
424        if !matches {
425            self.runtime.inner.kv_stats.incr_cas_conflict();
426            return Ok((false, current));
427        }
428
429        // Swap: delete old entry (if present), write new one.
430        if current.is_some() {
431            self.runtime.delete_kv(collection, key)?;
432        }
433
434        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
435            .map(|m| m.fields.into_iter().collect())
436            .unwrap_or_default();
437
438        let output = self
439            .runtime
440            .create_kv(crate::application::entity::CreateKvInput {
441                collection: collection.to_string(),
442                key: key.to_string(),
443                value: new_value.clone(),
444                metadata: meta_vec,
445            })?;
446        self.runtime
447            .inner
448            .kv_tag_index
449            .replace(collection, key, output.id, &[]);
450
451        self.runtime.record_kv_watch_event(
452            if current.is_some() {
453                crate::replication::cdc::ChangeOperation::Update
454            } else {
455                crate::replication::cdc::ChangeOperation::Insert
456            },
457            collection,
458            key,
459            output.id.raw(),
460            current
461                .as_ref()
462                .map(crate::presentation::entity_json::storage_value_to_json),
463            Some(crate::presentation::entity_json::storage_value_to_json(
464                &new_value,
465            )),
466        );
467
468        self.runtime.inner.kv_stats.incr_cas_success();
469        Ok((true, current))
470    }
471
472    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
473        self.runtime
474            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
475        self.runtime.check_kv_invalidate_policy(collection)?;
476        self.ensure_kv_collection(collection)?;
477        let entries = self
478            .runtime
479            .inner
480            .kv_tag_index
481            .entries_for_tags(collection, tags);
482        if entries.is_empty() {
483            return Ok(0);
484        }
485
486        let store = self.runtime.inner.db.store();
487        let mut removed = 0usize;
488        for (key, id) in entries {
489            let before = store
490                .get(collection, id)
491                .and_then(|entity| kv_value_from_entity(&entity));
492            let deleted = store
493                .delete(collection, id)
494                .map_err(|err| RedDBError::Internal(err.to_string()))?;
495            if deleted {
496                store.context_index().remove_entity(id);
497                self.runtime.inner.kv_tag_index.remove(collection, &key);
498                self.runtime.record_kv_watch_event(
499                    crate::replication::cdc::ChangeOperation::Delete,
500                    collection,
501                    &key,
502                    id.raw(),
503                    before
504                        .as_ref()
505                        .map(crate::presentation::entity_json::storage_value_to_json),
506                    None,
507                );
508                removed += 1;
509            }
510        }
511        if removed > 0 {
512            self.runtime.inner.kv_stats.incr_deletes();
513        }
514        Ok(removed)
515    }
516
517    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
518        self.runtime
519            .inner
520            .kv_tag_index
521            .tags_for_key(collection, key)
522    }
523
524    /// Auto-create a KV collection if it does not exist yet.
525    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
526        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
527    }
528
529    fn ensure_keyed_collection(
530        &self,
531        model: crate::catalog::CollectionModel,
532        collection: &str,
533    ) -> RedDBResult<()> {
534        let store = self.runtime.inner.db.store();
535        if store.get_collection(collection).is_some() {
536            return self.ensure_declared_model(model, collection);
537        }
538        if model != crate::catalog::CollectionModel::Kv {
539            return Err(RedDBError::NotFound(format!(
540                "{} collection '{collection}' does not exist",
541                keyed_model_name(model)
542            )));
543        }
544        // Check config gate: red.config.kv.default_collection (default = true).
545        let auto_create = self
546            .runtime
547            .config_bool("red.config.kv.default_collection", true);
548        if !auto_create {
549            return Err(RedDBError::NotFound(format!(
550                "kv collection '{collection}' does not exist and auto-create is disabled \
551                 (red.config.kv.default_collection = false)"
552            )));
553        }
554        store
555            .create_collection(collection)
556            .map_err(|err| RedDBError::Internal(err.to_string()))?;
557        self.runtime
558            .inner
559            .db
560            .save_collection_contract(kv_collection_contract(collection))
561            .map_err(|err| RedDBError::Internal(err.to_string()))?;
562        Ok(())
563    }
564
565    fn get_entry(
566        &self,
567        model: crate::catalog::CollectionModel,
568        collection: &str,
569        key: &str,
570    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
571        let Some(entity) = self.get_entity(model, collection, key)? else {
572            return Ok(None);
573        };
574        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
575    }
576
577    fn get_entity(
578        &self,
579        model: crate::catalog::CollectionModel,
580        collection: &str,
581        key: &str,
582    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
583        self.ensure_declared_model(model, collection)?;
584        let store = self.runtime.inner.db.store();
585        let Some(manager) = store.get_collection(collection) else {
586            return Ok(None);
587        };
588        let entities = manager.query_all(|_| true);
589        for entity in entities {
590            if let crate::storage::EntityData::Row(ref row) = entity.data {
591                if let Some(ref named) = row.named {
592                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
593                        if &**k == key {
594                            return Ok(Some(entity));
595                        }
596                    }
597                }
598            }
599        }
600        Ok(None)
601    }
602
603    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
604        self.vault_versions(collection, key)
605            .map(super::keyed_spine::latest_version)
606    }
607
608    fn get_vault_entry_version(
609        &self,
610        collection: &str,
611        key: &str,
612        version: i64,
613    ) -> RedDBResult<Option<VaultEntry>> {
614        Ok(self
615            .vault_versions(collection, key)?
616            .into_iter()
617            .find(|entry| entry.version == version))
618    }
619
620    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
621        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
622        let store = self.runtime.inner.db.store();
623        let Some(manager) = store.get_collection(collection) else {
624            return Ok(Vec::new());
625        };
626        let entities = manager.query_all(|_| true);
627        let mut versions = Vec::new();
628        for entity in entities {
629            let crate::storage::EntityData::Row(ref row) = entity.data else {
630                continue;
631            };
632            let Some(version) =
633                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
634            else {
635                continue;
636            };
637            if version.key != key {
638                continue;
639            }
640            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
641            versions.push(VaultEntry::from_keyed_row(
642                version,
643                metadata,
644                entity.created_at,
645                entity.updated_at,
646                entity.sequence_id,
647            ));
648        }
649        Ok(versions)
650    }
651
652    fn latest_vault_entries(
653        &self,
654        collection: &str,
655        prefix: Option<&str>,
656    ) -> RedDBResult<Vec<VaultEntry>> {
657        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
658        let store = self.runtime.inner.db.store();
659        let Some(manager) = store.get_collection(collection) else {
660            return Ok(Vec::new());
661        };
662        let mut versions = Vec::new();
663        for entity in manager.query_all(|_| true) {
664            let crate::storage::EntityData::Row(ref row) = entity.data else {
665                continue;
666            };
667            let Some(version) =
668                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
669            else {
670                continue;
671            };
672            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
673            let entry = VaultEntry::from_keyed_row(
674                version,
675                metadata,
676                entity.created_at,
677                entity.updated_at,
678                entity.sequence_id,
679            );
680            versions.push(entry);
681        }
682        Ok(super::keyed_spine::latest_versions(versions, prefix))
683    }
684
685    fn append_vault_version(
686        &self,
687        collection: &str,
688        key: &str,
689        value: crate::storage::schema::Value,
690        op: &str,
691        tombstone: bool,
692        tags: &[String],
693    ) -> RedDBResult<VaultEntry> {
694        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
695        let version = self
696            .get_vault_entry(collection, key)?
697            .map(|entry| entry.version)
698            .unwrap_or(0)
699            + 1;
700        let stored_value = if tombstone {
701            crate::storage::schema::Value::Null
702        } else {
703            self.runtime.seal_vault_value(collection, value)?
704        };
705        let now = current_unix_ms() as i64;
706        let fields = vec![
707            (
708                "key".to_string(),
709                crate::storage::schema::Value::text(key.to_string()),
710            ),
711            ("value".to_string(), stored_value),
712            (
713                "version".to_string(),
714                crate::storage::schema::Value::Integer(version),
715            ),
716            (
717                "tombstone".to_string(),
718                crate::storage::schema::Value::Boolean(tombstone),
719            ),
720            (
721                "op".to_string(),
722                crate::storage::schema::Value::text(op.to_string()),
723            ),
724            (
725                "created_at_ms".to_string(),
726                crate::storage::schema::Value::Integer(now),
727            ),
728        ];
729        let mut row = crate::storage::RowData::new(Vec::new());
730        row.named = Some(fields.into_iter().collect());
731        let entity = crate::storage::UnifiedEntity::new(
732            crate::storage::EntityId::new(0),
733            crate::storage::EntityKind::TableRow {
734                table: std::sync::Arc::from(collection),
735                row_id: 0,
736            },
737            crate::storage::EntityData::Row(row),
738        );
739        let id = self
740            .runtime
741            .inner
742            .db
743            .store()
744            .insert(collection, entity)
745            .map_err(|err| RedDBError::Internal(err.to_string()))?;
746        if !tags.is_empty() {
747            self.runtime
748                .inner
749                .db
750                .store()
751                .set_metadata(
752                    collection,
753                    id,
754                    Metadata::with_fields(vault_tags_metadata(tags)),
755                )
756                .map_err(|err| RedDBError::Internal(err.to_string()))?;
757            self.runtime
758                .inner
759                .kv_tag_index
760                .replace(collection, key, id, tags);
761        }
762        self.get_vault_entry_version(collection, key, version)?
763            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
764    }
765
766    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
767        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
768        let versions = self.vault_versions(collection, key)?;
769        let store = self.runtime.inner.db.store();
770        let mut purged = 0usize;
771        for entry in versions {
772            if store
773                .delete(collection, entry.id)
774                .map_err(|err| RedDBError::Internal(err.to_string()))?
775            {
776                store.context_index().remove_entity(entry.id);
777                purged += 1;
778            }
779        }
780        Ok(purged)
781    }
782
783    fn ensure_declared_model(
784        &self,
785        model: crate::catalog::CollectionModel,
786        collection: &str,
787    ) -> RedDBResult<()> {
788        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
789            return Ok(());
790        };
791        if contract.declared_model == model
792            || contract.declared_model == crate::catalog::CollectionModel::Mixed
793        {
794            return Ok(());
795        }
796        Err(RedDBError::InvalidOperation(format!(
797            "collection '{}' is declared as '{}' and does not allow '{}' operations",
798            collection,
799            keyed_model_name(contract.declared_model),
800            keyed_model_name(model)
801        )))
802    }
803}
804
805impl RedDBRuntime {
806    pub(crate) fn seal_vault_value(
807        &self,
808        collection: &str,
809        value: crate::storage::schema::Value,
810    ) -> RedDBResult<crate::storage::schema::Value> {
811        let key = self.vault_encryption_key(collection)?;
812        let plaintext = value.to_bytes();
813        let nonce_bytes = crate::auth::store::random_bytes(12);
814        let mut nonce = [0u8; 12];
815        nonce.copy_from_slice(&nonce_bytes[..12]);
816        let aad = format!("reddb.vault.{collection}");
817        let ciphertext =
818            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
819        let mut payload = Vec::with_capacity(12 + ciphertext.len());
820        payload.extend_from_slice(&nonce);
821        payload.extend_from_slice(&ciphertext);
822        Ok(crate::storage::schema::Value::Secret(payload))
823    }
824
825    fn vault_key_available(&self, collection: &str) -> bool {
826        self.vault_encryption_key(collection).is_ok()
827    }
828
829    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
830        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
831            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
832        })?;
833        if !auth_store.is_vault_backed() {
834            return Err(RedDBError::Query(
835                "vault sealed_unavailable: key provider is sealed".to_string(),
836            ));
837        }
838
839        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
840            return decode_vault_key(&hex_key);
841        }
842        auth_store.vault_secret_key().ok_or_else(|| {
843            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
844        })
845    }
846
847    fn unseal_vault_value(
848        &self,
849        collection: &str,
850        sealed: &crate::storage::schema::Value,
851    ) -> RedDBResult<crate::storage::schema::Value> {
852        let crate::storage::schema::Value::Secret(payload) = sealed else {
853            return Err(RedDBError::Query(
854                "vault unseal failed: stored value is not sealed".to_string(),
855            ));
856        };
857        if payload.len() < 12 {
858            return Err(RedDBError::Query(
859                "vault unseal failed: sealed payload is truncated".to_string(),
860            ));
861        }
862        let key = self.vault_encryption_key(collection)?;
863        let mut nonce = [0u8; 12];
864        nonce.copy_from_slice(&payload[..12]);
865        let aad = format!("reddb.vault.{collection}");
866        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
867            &key,
868            &nonce,
869            aad.as_bytes(),
870            &payload[12..],
871        )
872        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
873        let (value, consumed) =
874            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
875                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
876            })?;
877        if consumed != plaintext.len() {
878            return Err(RedDBError::Query(
879                "vault unseal failed: trailing plaintext bytes".to_string(),
880            ));
881        }
882        Ok(value)
883    }
884
885    fn vault_target_resource(collection: &str, key: &str) -> String {
886        if collection == "red.vault" {
887            return format!("red.vault/{}", key.to_ascii_lowercase());
888        }
889        format!("{collection}.{key}")
890    }
891
892    fn current_vault_actor() -> String {
893        current_auth_identity()
894            .map(|(principal, _)| principal)
895            .unwrap_or_else(|| "anonymous".to_string())
896    }
897
898    fn vault_request_id() -> String {
899        let conn_id = current_connection_id();
900        if conn_id == 0 {
901            "embedded".to_string()
902        } else {
903            format!("conn-{conn_id}")
904        }
905    }
906
907    fn check_vault_capability(
908        &self,
909        action: &str,
910        collection: &str,
911        key: &str,
912    ) -> Result<(), String> {
913        let Some(auth_store) = self.inner.auth_store.read().clone() else {
914            return Ok(());
915        };
916        if !auth_store.iam_authorization_enabled() {
917            return Ok(());
918        }
919        let Some((principal, role)) = current_auth_identity() else {
920            return Err(
921                "IAM authorization is enabled; vault capability check requires an authenticated principal"
922                    .to_string(),
923            );
924        };
925        let tenant = current_tenant();
926        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
927        let mut resource = crate::auth::policies::ResourceRef::new(
928            "vault",
929            Self::vault_target_resource(collection, key),
930        );
931        if let Some(ref tenant) = tenant {
932            resource = resource.with_tenant(tenant.clone());
933        }
934        let ctx = crate::auth::policies::EvalContext {
935            principal_tenant: tenant.clone(),
936            current_tenant: tenant,
937            peer_ip: None,
938            mfa_present: false,
939            now_ms: crate::utils::now_unix_millis() as u128,
940            principal_is_admin_role: role == crate::auth::Role::Admin,
941            principal_is_system_owned: auth_store.principal_is_system_owned(&principal_id),
942            principal_is_platform_scoped: principal_id.tenant.is_none(),
943        };
944        if auth_store.check_policy_authz(&principal_id, action, &resource, &ctx) {
945            Ok(())
946        } else {
947            Err(format!(
948                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
949                principal,
950                action,
951                Self::vault_target_resource(collection, key)
952            ))
953        }
954    }
955
956    fn check_system_vault_capability(
957        &self,
958        action: &str,
959        collection: &str,
960        key: &str,
961    ) -> Result<(), String> {
962        if collection != "red.vault" {
963            return Ok(());
964        }
965        self.check_vault_capability(action, collection, key)
966    }
967
968    fn audit_vault_unseal(
969        &self,
970        collection: &str,
971        key: &str,
972        outcome: crate::runtime::audit_log::Outcome,
973        reason: &str,
974        entry: Option<&VaultEntry>,
975    ) {
976        let actor = Self::current_vault_actor();
977        let request_id = Self::vault_request_id();
978        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
979            .principal(actor.clone())
980            .source(crate::runtime::audit_log::AuditAuthSource::Password)
981            .resource(format!(
982                "vault:{}",
983                Self::vault_target_resource(collection, key)
984            ))
985            .outcome(outcome)
986            .correlation_id(request_id.clone())
987            .fields([
988                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
989                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
990                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
991                crate::runtime::audit_log::AuditFieldEscaper::field(
992                    "target",
993                    Self::vault_target_resource(collection, key),
994                ),
995                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
996                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
997                crate::runtime::audit_log::AuditFieldEscaper::field(
998                    "connection_id",
999                    current_connection_id(),
1000                ),
1001            ]);
1002        if let Some(tenant) = current_tenant() {
1003            builder = builder.tenant(tenant);
1004        }
1005        if let Some(entry) = entry {
1006            builder = builder.fields([
1007                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1008                crate::runtime::audit_log::AuditFieldEscaper::field(
1009                    "sequence_id",
1010                    entry.sequence_id,
1011                ),
1012            ]);
1013        }
1014        self.audit_log().record_event(builder.build());
1015    }
1016
1017    fn audit_vault_lifecycle(
1018        &self,
1019        operation: &str,
1020        collection: &str,
1021        key: &str,
1022        outcome: crate::runtime::audit_log::Outcome,
1023        reason: &str,
1024        entry: Option<&VaultEntry>,
1025    ) {
1026        let actor = Self::current_vault_actor();
1027        let request_id = Self::vault_request_id();
1028        let mut builder =
1029            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1030                .principal(actor.clone())
1031                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1032                .resource(format!(
1033                    "vault:{}",
1034                    Self::vault_target_resource(collection, key)
1035                ))
1036                .outcome(outcome)
1037                .correlation_id(request_id.clone())
1038                .fields([
1039                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1040                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1041                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1042                    crate::runtime::audit_log::AuditFieldEscaper::field(
1043                        "target",
1044                        Self::vault_target_resource(collection, key),
1045                    ),
1046                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1047                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1048                    crate::runtime::audit_log::AuditFieldEscaper::field(
1049                        "connection_id",
1050                        current_connection_id(),
1051                    ),
1052                ]);
1053        if let Some(tenant) = current_tenant() {
1054            builder = builder.tenant(tenant);
1055        }
1056        if let Some(entry) = entry {
1057            builder = builder.fields([
1058                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1059                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1060                crate::runtime::audit_log::AuditFieldEscaper::field(
1061                    "sequence_id",
1062                    entry.sequence_id,
1063                ),
1064            ]);
1065        }
1066        self.audit_log().record_event(builder.build());
1067    }
1068
1069    pub(crate) fn resolve_vault_secret_value(
1070        &self,
1071        collection: &str,
1072        key: &str,
1073    ) -> RedDBResult<Value> {
1074        let ops = KvAtomicOps::new(self);
1075        let entry = ops.get_vault_entry(collection, key)?;
1076        if let Err(reason) = self.check_vault_capability("vault:unseal", collection, key) {
1077            self.audit_vault_unseal(
1078                collection,
1079                key,
1080                crate::runtime::audit_log::Outcome::Denied,
1081                &reason,
1082                entry.as_ref(),
1083            );
1084            return Err(RedDBError::Query(reason));
1085        }
1086        let Some(entry) = entry else {
1087            let reason = "not_found";
1088            self.audit_vault_unseal(
1089                collection,
1090                key,
1091                crate::runtime::audit_log::Outcome::Denied,
1092                reason,
1093                None,
1094            );
1095            return Err(RedDBError::NotFound(format!(
1096                "vault secret '{}.{}' not found",
1097                collection, key
1098            )));
1099        };
1100        if entry.tombstone {
1101            let reason = "deleted";
1102            self.audit_vault_unseal(
1103                collection,
1104                key,
1105                crate::runtime::audit_log::Outcome::Denied,
1106                reason,
1107                Some(&entry),
1108            );
1109            return Err(RedDBError::NotFound(format!(
1110                "vault secret '{}.{}' is deleted",
1111                collection, key
1112            )));
1113        }
1114        match self.unseal_vault_value(collection, &entry.value) {
1115            Ok(value) => {
1116                self.audit_vault_unseal(
1117                    collection,
1118                    key,
1119                    crate::runtime::audit_log::Outcome::Success,
1120                    "ok",
1121                    Some(&entry),
1122                );
1123                Ok(value)
1124            }
1125            Err(err) => {
1126                let reason = err.to_string();
1127                self.audit_vault_unseal(
1128                    collection,
1129                    key,
1130                    crate::runtime::audit_log::Outcome::Error,
1131                    &reason,
1132                    Some(&entry),
1133                );
1134                Err(err)
1135            }
1136        }
1137    }
1138
1139    /// Dispatch a `KV PUT / GET / DELETE` command.
1140    pub fn execute_kv_command(
1141        &self,
1142        raw_query: &str,
1143        cmd: &crate::storage::query::ast::KvCommand,
1144    ) -> RedDBResult<RuntimeQueryResult> {
1145        use crate::storage::query::ast::KvCommand;
1146
1147        let ops = KvAtomicOps::new(self);
1148
1149        match cmd {
1150            KvCommand::Put {
1151                model,
1152                collection,
1153                key,
1154                value,
1155                ttl_ms,
1156                tags,
1157                if_not_exists,
1158            } => {
1159                if *model == crate::catalog::CollectionModel::Vault {
1160                    self.check_system_vault_capability("vault:write", collection, key)
1161                        .map_err(RedDBError::Query)?;
1162                }
1163                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1164                let (created, id) = ops.set_with_tags_for_model(
1165                    *model,
1166                    collection,
1167                    key,
1168                    value.clone(),
1169                    *ttl_ms,
1170                    tags,
1171                    *if_not_exists,
1172                )?;
1173
1174                let mut result = UnifiedResult::with_columns(vec![
1175                    "ok".into(),
1176                    "collection".into(),
1177                    "key".into(),
1178                    "id".into(),
1179                    "created".into(),
1180                    "tags".into(),
1181                ]);
1182                let mut record = UnifiedRecord::new();
1183                record.set("ok", Value::Boolean(true));
1184                record.set("collection", Value::text(collection.clone()));
1185                record.set("key", Value::text(key.clone()));
1186                record.set("id", Value::Integer(id.raw() as i64));
1187                record.set("created", Value::Boolean(created));
1188                record.set("tags", kv_tags_value(tags));
1189                result.push(record);
1190
1191                Ok(RuntimeQueryResult {
1192                    query: raw_query.to_string(),
1193                    mode: crate::storage::query::modes::QueryMode::Sql,
1194                    statement: if *model == crate::catalog::CollectionModel::Vault {
1195                        "vault_put"
1196                    } else {
1197                        "kv_put"
1198                    },
1199                    engine: if *model == crate::catalog::CollectionModel::Vault {
1200                        "vault"
1201                    } else {
1202                        "kv"
1203                    },
1204                    result,
1205                    affected_rows: 1,
1206                    statement_type: if created { "insert" } else { "update" },
1207                })
1208            }
1209            KvCommand::InvalidateTags { collection, tags } => {
1210                let invalidated = ops.invalidate_tags(collection, tags)?;
1211
1212                let mut result = UnifiedResult::with_columns(vec![
1213                    "ok".into(),
1214                    "collection".into(),
1215                    "invalidated".into(),
1216                    "tags".into(),
1217                ]);
1218                let mut record = UnifiedRecord::new();
1219                record.set("ok", Value::Boolean(true));
1220                record.set("collection", Value::text(collection.clone()));
1221                record.set("invalidated", Value::Integer(invalidated as i64));
1222                record.set("tags", kv_tags_value(tags));
1223                result.push(record);
1224
1225                Ok(RuntimeQueryResult {
1226                    query: raw_query.to_string(),
1227                    mode: crate::storage::query::modes::QueryMode::Sql,
1228                    statement: "kv_invalidate_tags",
1229                    engine: "kv",
1230                    result,
1231                    affected_rows: invalidated as u64,
1232                    statement_type: "delete",
1233                })
1234            }
1235
1236            KvCommand::Rotate {
1237                collection,
1238                key,
1239                value,
1240                tags,
1241            } => {
1242                self.check_system_vault_capability("vault:write", collection, key)
1243                    .map_err(RedDBError::Query)?;
1244                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1245                let entry = ops.append_vault_version(
1246                    collection,
1247                    key,
1248                    value.clone(),
1249                    "rotate",
1250                    false,
1251                    tags,
1252                )?;
1253                self.record_kv_watch_event(
1254                    crate::replication::cdc::ChangeOperation::Update,
1255                    collection,
1256                    key,
1257                    entry.id.raw(),
1258                    None,
1259                    Some(vault_entry_metadata_json(&entry)),
1260                );
1261                self.audit_vault_lifecycle(
1262                    "rotate",
1263                    collection,
1264                    key,
1265                    crate::runtime::audit_log::Outcome::Success,
1266                    "ok",
1267                    Some(&entry),
1268                );
1269                Ok(vault_write_result(
1270                    raw_query,
1271                    "vault_rotate",
1272                    "update",
1273                    collection,
1274                    key,
1275                    &entry,
1276                    1,
1277                ))
1278            }
1279
1280            KvCommand::List {
1281                model,
1282                collection,
1283                prefix,
1284                limit,
1285                offset,
1286            } => {
1287                if *model != crate::catalog::CollectionModel::Vault {
1288                    return Err(RedDBError::InvalidOperation(
1289                        "LIST is not supported through normal KV command execution".to_string(),
1290                    ));
1291                }
1292                let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1293                entries.sort_by(|left, right| left.key.cmp(&right.key));
1294                let mut result = UnifiedResult::with_columns(vec![
1295                    "collection".into(),
1296                    "key".into(),
1297                    "version".into(),
1298                    "fingerprint".into(),
1299                    "tags".into(),
1300                    "created_at".into(),
1301                    "updated_at".into(),
1302                    "status".into(),
1303                    "tombstone".into(),
1304                    "op".into(),
1305                ]);
1306                for entry in entries
1307                    .into_iter()
1308                    .filter(|entry| {
1309                        self.check_vault_capability("vault:read_metadata", collection, &entry.key)
1310                            .is_ok()
1311                    })
1312                    .skip(*offset)
1313                    .take(limit.unwrap_or(usize::MAX))
1314                {
1315                    push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1316                }
1317                Ok(RuntimeQueryResult {
1318                    query: raw_query.to_string(),
1319                    mode: crate::storage::query::modes::QueryMode::Sql,
1320                    statement: "vault_list",
1321                    engine: "vault",
1322                    result,
1323                    affected_rows: 0,
1324                    statement_type: "select",
1325                })
1326            }
1327
1328            KvCommand::History { collection, key } => {
1329                self.check_vault_capability("vault:read_metadata", collection, key)
1330                    .map_err(RedDBError::Query)?;
1331                let versions =
1332                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1333                let result = vault_history_result(collection, key, &versions);
1334                Ok(RuntimeQueryResult {
1335                    query: raw_query.to_string(),
1336                    mode: crate::storage::query::modes::QueryMode::Sql,
1337                    statement: "vault_history",
1338                    engine: "vault",
1339                    result,
1340                    affected_rows: 0,
1341                    statement_type: "select",
1342                })
1343            }
1344
1345            KvCommand::Purge { collection, key } => {
1346                let entry = ops.get_vault_entry(collection, key)?;
1347                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1348                    self.audit_vault_lifecycle(
1349                        "purge",
1350                        collection,
1351                        key,
1352                        crate::runtime::audit_log::Outcome::Denied,
1353                        &reason,
1354                        entry.as_ref(),
1355                    );
1356                    return Err(RedDBError::Query(reason));
1357                }
1358                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1359                let purged = ops.purge_vault_versions(collection, key)?;
1360                self.audit_vault_lifecycle(
1361                    "purge",
1362                    collection,
1363                    key,
1364                    crate::runtime::audit_log::Outcome::Success,
1365                    "ok",
1366                    entry.as_ref(),
1367                );
1368                let mut result = UnifiedResult::with_columns(vec![
1369                    "ok".into(),
1370                    "collection".into(),
1371                    "key".into(),
1372                    "purged".into(),
1373                ]);
1374                let mut record = UnifiedRecord::new();
1375                record.set("ok", Value::Boolean(true));
1376                record.set("collection", Value::text(collection.clone()));
1377                record.set("key", Value::text(key.clone()));
1378                record.set("purged", Value::Integer(purged as i64));
1379                result.push(record);
1380                Ok(RuntimeQueryResult {
1381                    query: raw_query.to_string(),
1382                    mode: crate::storage::query::modes::QueryMode::Sql,
1383                    statement: "vault_purge",
1384                    engine: "vault",
1385                    result,
1386                    affected_rows: purged as u64,
1387                    statement_type: "delete",
1388                })
1389            }
1390
1391            KvCommand::Get {
1392                model,
1393                collection,
1394                key,
1395            } => {
1396                if *model == crate::catalog::CollectionModel::Vault {
1397                    self.check_vault_capability("vault:read_metadata", collection, key)
1398                        .map_err(RedDBError::Query)?;
1399                    let entry = ops.get_vault_entry(collection, key)?;
1400                    let key_available = self.vault_key_available(collection);
1401                    let result =
1402                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
1403                    return Ok(RuntimeQueryResult {
1404                        query: raw_query.to_string(),
1405                        mode: crate::storage::query::modes::QueryMode::Sql,
1406                        statement: "vault_get",
1407                        engine: "vault",
1408                        result,
1409                        affected_rows: 0,
1410                        statement_type: "select",
1411                    });
1412                }
1413
1414                let entity = ops.get_entity(*model, collection, key)?;
1415                let value = entity.as_ref().and_then(kv_value_from_entity);
1416                if *model == crate::catalog::CollectionModel::Kv {
1417                    self.inner.kv_stats.incr_gets();
1418                }
1419                let mut result = UnifiedResult::with_columns(vec![
1420                    "rid".into(),
1421                    "collection".into(),
1422                    "kind".into(),
1423                    "tenant".into(),
1424                    "created_at".into(),
1425                    "updated_at".into(),
1426                    "key".into(),
1427                    "value".into(),
1428                    "tags".into(),
1429                ]);
1430                let mut record = UnifiedRecord::new();
1431                if let Some(entity) = entity.as_ref() {
1432                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1433                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
1434                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1435                } else {
1436                    record.set("rid", Value::Null);
1437                    record.set("created_at", Value::Null);
1438                    record.set("updated_at", Value::Null);
1439                }
1440                record.set("collection", Value::text(collection.clone()));
1441                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
1442                record.set("tenant", Value::Null);
1443                record.set("key", Value::text(key.clone()));
1444                record.set(
1445                    "value",
1446                    value.unwrap_or(crate::storage::schema::Value::Null),
1447                );
1448                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
1449                result.push(record);
1450
1451                Ok(RuntimeQueryResult {
1452                    query: raw_query.to_string(),
1453                    mode: crate::storage::query::modes::QueryMode::Sql,
1454                    statement: "kv_get",
1455                    engine: "kv",
1456                    result,
1457                    affected_rows: 0,
1458                    statement_type: "select",
1459                })
1460            }
1461            KvCommand::Watch {
1462                model,
1463                collection,
1464                key,
1465                prefix,
1466                from_lsn,
1467            } => {
1468                let watch_key = if *prefix {
1469                    format!("{key}.*")
1470                } else {
1471                    key.clone()
1472                };
1473                let endpoint = match from_lsn {
1474                    Some(lsn) => format!(
1475                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
1476                        keyed_model_name(*model)
1477                    ),
1478                    None => format!(
1479                        "/collections/{collection}/{}/{watch_key}/watch",
1480                        keyed_model_name(*model)
1481                    ),
1482                };
1483                let mut result = UnifiedResult::with_columns(vec![
1484                    "collection".into(),
1485                    "key".into(),
1486                    "prefix".into(),
1487                    "from_lsn".into(),
1488                    "watch_url".into(),
1489                    "streaming".into(),
1490                ]);
1491                let mut record = UnifiedRecord::new();
1492                record.set("collection", Value::text(collection.clone()));
1493                record.set("key", Value::text(watch_key));
1494                record.set("prefix", Value::Boolean(*prefix));
1495                record.set(
1496                    "from_lsn",
1497                    from_lsn
1498                        .map(Value::UnsignedInteger)
1499                        .unwrap_or(crate::storage::schema::Value::Null),
1500                );
1501                record.set("watch_url", Value::text(endpoint));
1502                record.set("streaming", Value::Boolean(true));
1503                result.push(record);
1504
1505                Ok(RuntimeQueryResult {
1506                    query: raw_query.to_string(),
1507                    mode: crate::storage::query::modes::QueryMode::Sql,
1508                    statement: "kv_watch",
1509                    engine: keyed_model_name(*model),
1510                    result,
1511                    affected_rows: 0,
1512                    statement_type: "stream",
1513                })
1514            }
1515
1516            KvCommand::Unseal {
1517                collection,
1518                key,
1519                version,
1520            } => {
1521                let latest = ops.get_vault_entry(collection, key)?;
1522                let entry = match version {
1523                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
1524                    None => latest.clone(),
1525                };
1526                let action = match (version, latest.as_ref()) {
1527                    (Some(requested), Some(latest)) if *requested == latest.version => {
1528                        "vault:unseal"
1529                    }
1530                    (Some(_), _) => "vault:unseal_history",
1531                    _ => "vault:unseal",
1532                };
1533                if let Err(reason) = self.check_vault_capability(action, collection, key) {
1534                    self.audit_vault_unseal(
1535                        collection,
1536                        key,
1537                        crate::runtime::audit_log::Outcome::Denied,
1538                        &reason,
1539                        entry.as_ref(),
1540                    );
1541                    return Err(RedDBError::Query(reason));
1542                }
1543                let Some(entry) = entry else {
1544                    let reason = "not_found";
1545                    self.audit_vault_unseal(
1546                        collection,
1547                        key,
1548                        crate::runtime::audit_log::Outcome::Denied,
1549                        reason,
1550                        None,
1551                    );
1552                    return Err(RedDBError::NotFound(format!(
1553                        "vault secret '{}.{}' not found",
1554                        collection, key
1555                    )));
1556                };
1557                if entry.tombstone {
1558                    let reason = "deleted";
1559                    self.audit_vault_unseal(
1560                        collection,
1561                        key,
1562                        crate::runtime::audit_log::Outcome::Denied,
1563                        reason,
1564                        Some(&entry),
1565                    );
1566                    return Err(RedDBError::NotFound(format!(
1567                        "vault secret '{}.{}' is deleted",
1568                        collection, key
1569                    )));
1570                }
1571                match self.unseal_vault_value(collection, &entry.value) {
1572                    Ok(value) => {
1573                        self.audit_vault_unseal(
1574                            collection,
1575                            key,
1576                            crate::runtime::audit_log::Outcome::Success,
1577                            "ok",
1578                            Some(&entry),
1579                        );
1580                        let mut result = UnifiedResult::with_columns(vec![
1581                            "collection".into(),
1582                            "key".into(),
1583                            "value".into(),
1584                        ]);
1585                        let mut record = UnifiedRecord::new();
1586                        record.set("collection", Value::text(collection.clone()));
1587                        record.set("key", Value::text(key.clone()));
1588                        record.set("value", value);
1589                        result.push(record);
1590                        Ok(RuntimeQueryResult {
1591                            query: raw_query.to_string(),
1592                            mode: crate::storage::query::modes::QueryMode::Sql,
1593                            statement: "vault_unseal",
1594                            engine: "vault",
1595                            result,
1596                            affected_rows: 0,
1597                            statement_type: "select",
1598                        })
1599                    }
1600                    Err(err) => {
1601                        let reason = err.to_string();
1602                        self.audit_vault_unseal(
1603                            collection,
1604                            key,
1605                            crate::runtime::audit_log::Outcome::Error,
1606                            &reason,
1607                            Some(&entry),
1608                        );
1609                        Err(err)
1610                    }
1611                }
1612            }
1613
1614            KvCommand::Incr {
1615                model,
1616                collection,
1617                key,
1618                by,
1619                ttl_ms,
1620            } => {
1621                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1622                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
1623
1624                let mut result = UnifiedResult::with_columns(vec![
1625                    "ok".into(),
1626                    "collection".into(),
1627                    "key".into(),
1628                    "value".into(),
1629                ]);
1630                let mut record = UnifiedRecord::new();
1631                record.set("ok", Value::Boolean(true));
1632                record.set("collection", Value::text(collection.clone()));
1633                record.set("key", Value::text(key.clone()));
1634                record.set("value", Value::Integer(new_value));
1635                result.push(record);
1636
1637                Ok(RuntimeQueryResult {
1638                    query: raw_query.to_string(),
1639                    mode: crate::storage::query::modes::QueryMode::Sql,
1640                    statement: "kv_incr",
1641                    engine: "kv",
1642                    result,
1643                    affected_rows: 1,
1644                    statement_type: "update",
1645                })
1646            }
1647
1648            KvCommand::Cas {
1649                model,
1650                collection,
1651                key,
1652                expected,
1653                new_value,
1654                ttl_ms,
1655            } => {
1656                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1657                let (ok, current) = ops.cas(
1658                    *model,
1659                    collection,
1660                    key,
1661                    expected.as_ref(),
1662                    new_value.clone(),
1663                    *ttl_ms,
1664                )?;
1665
1666                let mut result = UnifiedResult::with_columns(vec![
1667                    "ok".into(),
1668                    "collection".into(),
1669                    "key".into(),
1670                    "current".into(),
1671                ]);
1672                let mut record = UnifiedRecord::new();
1673                record.set("ok", Value::Boolean(ok));
1674                record.set("collection", Value::text(collection.clone()));
1675                record.set("key", Value::text(key.clone()));
1676                record.set(
1677                    "current",
1678                    current.unwrap_or(crate::storage::schema::Value::Null),
1679                );
1680                result.push(record);
1681
1682                Ok(RuntimeQueryResult {
1683                    query: raw_query.to_string(),
1684                    mode: crate::storage::query::modes::QueryMode::Sql,
1685                    statement: "kv_cas",
1686                    engine: "kv",
1687                    result,
1688                    affected_rows: if ok { 1 } else { 0 },
1689                    statement_type: "update",
1690                })
1691            }
1692
1693            KvCommand::Delete {
1694                model,
1695                collection,
1696                key,
1697            } => {
1698                if *model == crate::catalog::CollectionModel::Vault {
1699                    self.check_system_vault_capability("vault:write", collection, key)
1700                        .map_err(RedDBError::Query)?;
1701                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1702                    let entry = ops.append_vault_version(
1703                        collection,
1704                        key,
1705                        Value::Null,
1706                        "delete",
1707                        true,
1708                        &[],
1709                    )?;
1710                    self.record_kv_watch_event(
1711                        crate::replication::cdc::ChangeOperation::Delete,
1712                        collection,
1713                        key,
1714                        entry.id.raw(),
1715                        None,
1716                        Some(vault_entry_metadata_json(&entry)),
1717                    );
1718                    self.audit_vault_lifecycle(
1719                        "delete",
1720                        collection,
1721                        key,
1722                        crate::runtime::audit_log::Outcome::Success,
1723                        "ok",
1724                        Some(&entry),
1725                    );
1726                    return Ok(vault_write_result(
1727                        raw_query,
1728                        "vault_delete",
1729                        "delete",
1730                        collection,
1731                        key,
1732                        &entry,
1733                        1,
1734                    ));
1735                }
1736                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1737                let deleted = ops.delete(*model, collection, key)?;
1738
1739                let mut result = UnifiedResult::with_columns(vec![
1740                    "ok".into(),
1741                    "collection".into(),
1742                    "key".into(),
1743                    "deleted".into(),
1744                ]);
1745                let mut record = UnifiedRecord::new();
1746                record.set("ok", Value::Boolean(true));
1747                record.set("collection", Value::text(collection.clone()));
1748                record.set("key", Value::text(key.clone()));
1749                record.set("deleted", Value::Boolean(deleted));
1750                result.push(record);
1751
1752                Ok(RuntimeQueryResult {
1753                    query: raw_query.to_string(),
1754                    mode: crate::storage::query::modes::QueryMode::Sql,
1755                    statement: "kv_delete",
1756                    engine: "kv",
1757                    result,
1758                    affected_rows: if deleted { 1 } else { 0 },
1759                    statement_type: "delete",
1760                })
1761            }
1762        }
1763    }
1764
1765    pub fn vault_watch_events_since(
1766        &self,
1767        collection: &str,
1768        key: &str,
1769        since_lsn: u64,
1770        max_count: usize,
1771    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1772        self.kv_watch_events_since(collection, key, since_lsn, max_count)
1773            .into_iter()
1774            .filter(|event| {
1775                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1776                    .is_ok()
1777            })
1778            .map(vault_filter_watch_event)
1779            .collect()
1780    }
1781
1782    pub fn vault_watch_events_since_prefix(
1783        &self,
1784        collection: &str,
1785        prefix: &str,
1786        since_lsn: u64,
1787        max_count: usize,
1788    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
1789        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
1790            .into_iter()
1791            .filter(|event| {
1792                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
1793                    .is_ok()
1794            })
1795            .map(vault_filter_watch_event)
1796            .collect()
1797    }
1798
1799    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
1800        let auth_store = match self.inner.auth_store.read().clone() {
1801            Some(store) => store,
1802            None => return Ok(()),
1803        };
1804        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1805            Some(identity) => identity,
1806            None => return Ok(()),
1807        };
1808        if role < crate::auth::Role::Write {
1809            return Err(RedDBError::Query(format!(
1810                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
1811            )));
1812        }
1813        if !auth_store.iam_authorization_enabled() {
1814            return Ok(());
1815        }
1816
1817        let tenant = crate::runtime::impl_core::current_tenant();
1818        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1819        let mut resource =
1820            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
1821        if let Some(tenant) = tenant.clone() {
1822            resource = resource.with_tenant(tenant);
1823        }
1824        let ctx = crate::auth::policies::EvalContext {
1825            principal_tenant: tenant.clone(),
1826            current_tenant: tenant,
1827            peer_ip: None,
1828            mfa_present: false,
1829            now_ms: current_unix_ms(),
1830            principal_is_admin_role: role == crate::auth::Role::Admin,
1831            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1832            principal_is_platform_scoped: principal.tenant.is_none(),
1833        };
1834        if auth_store.check_policy_authz(&principal, "kv:invalidate", &resource, &ctx) {
1835            Ok(())
1836        } else {
1837            Err(RedDBError::Query(format!(
1838                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
1839            )))
1840        }
1841    }
1842}
1843
1844fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
1845    let ttl_ms = ttl_ms?;
1846    Some(Metadata::with_fields(
1847        [(
1848            "_ttl_ms".to_string(),
1849            if ttl_ms <= i64::MAX as u64 {
1850                MetadataValue::Int(ttl_ms as i64)
1851            } else {
1852                MetadataValue::Timestamp(ttl_ms)
1853            },
1854        )]
1855        .into_iter()
1856        .collect(),
1857    ))
1858}
1859
1860fn vault_write_result(
1861    raw_query: &str,
1862    statement: &'static str,
1863    statement_type: &'static str,
1864    collection: &str,
1865    key: &str,
1866    entry: &VaultEntry,
1867    affected_rows: u64,
1868) -> RuntimeQueryResult {
1869    let mut result = UnifiedResult::with_columns(vec![
1870        "ok".into(),
1871        "collection".into(),
1872        "key".into(),
1873        "version".into(),
1874        "fingerprint".into(),
1875        "tombstone".into(),
1876        "op".into(),
1877        "id".into(),
1878    ]);
1879    let mut record = UnifiedRecord::new();
1880    record.set("ok", Value::Boolean(true));
1881    record.set("collection", Value::text(collection.to_string()));
1882    record.set("key", Value::text(key.to_string()));
1883    record.set("version", Value::Integer(entry.version));
1884    if entry.tombstone {
1885        record.set("fingerprint", Value::Null);
1886    } else {
1887        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1888    }
1889    record.set("tombstone", Value::Boolean(entry.tombstone));
1890    record.set("op", Value::text(entry.op.clone()));
1891    record.set("id", Value::Integer(entry.id.raw() as i64));
1892    result.push(record);
1893    RuntimeQueryResult {
1894        query: raw_query.to_string(),
1895        mode: crate::storage::query::modes::QueryMode::Sql,
1896        statement,
1897        engine: "vault",
1898        result,
1899        affected_rows,
1900        statement_type,
1901    }
1902}
1903
1904fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
1905    let mut result = UnifiedResult::with_columns(vec![
1906        "collection".into(),
1907        "key".into(),
1908        "version".into(),
1909        "fingerprint".into(),
1910        "tags".into(),
1911        "created_at".into(),
1912        "updated_at".into(),
1913        "status".into(),
1914        "tombstone".into(),
1915        "op".into(),
1916    ]);
1917    for entry in versions {
1918        push_vault_metadata_record(&mut result, collection, key, entry);
1919    }
1920    result
1921}
1922
1923fn push_vault_metadata_record(
1924    result: &mut UnifiedResult,
1925    collection: &str,
1926    key: &str,
1927    entry: &VaultEntry,
1928) {
1929    let mut record = UnifiedRecord::new();
1930    record.set("collection", Value::text(collection.to_string()));
1931    record.set("key", Value::text(key.to_string()));
1932    record.set("version", Value::Integer(entry.version));
1933    if entry.tombstone {
1934        record.set("fingerprint", Value::Null);
1935        record.set("status", Value::text("deleted"));
1936    } else {
1937        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1938        record.set("status", Value::text("sealed"));
1939    }
1940    record.set("tags", vault_tags_value(&entry.metadata));
1941    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1942    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1943    record.set("tombstone", Value::Boolean(entry.tombstone));
1944    record.set("op", Value::text(entry.op.clone()));
1945    result.push(record);
1946}
1947
1948fn vault_metadata_result(
1949    collection: &str,
1950    key: &str,
1951    entry: Option<&VaultEntry>,
1952    key_available: bool,
1953) -> UnifiedResult {
1954    let mut result = UnifiedResult::with_columns(vec![
1955        "collection".into(),
1956        "key".into(),
1957        "version".into(),
1958        "fingerprint".into(),
1959        "tags".into(),
1960        "created_at".into(),
1961        "updated_at".into(),
1962        "value".into(),
1963        "status".into(),
1964        "tombstone".into(),
1965        "op".into(),
1966    ]);
1967    let mut record = UnifiedRecord::new();
1968    record.set("collection", Value::text(collection.to_string()));
1969    record.set("key", Value::text(key.to_string()));
1970    match entry {
1971        Some(entry) => {
1972            record.set("version", Value::Integer(entry.version));
1973            if entry.tombstone {
1974                record.set("fingerprint", Value::Null);
1975            } else {
1976                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
1977            }
1978            record.set("tags", vault_tags_value(&entry.metadata));
1979            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
1980            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
1981            record.set("value", Value::text("***"));
1982            record.set(
1983                "status",
1984                Value::text(if entry.tombstone {
1985                    "deleted"
1986                } else if key_available {
1987                    "sealed"
1988                } else {
1989                    "sealed_unavailable"
1990                }),
1991            );
1992            record.set("tombstone", Value::Boolean(entry.tombstone));
1993            record.set("op", Value::text(entry.op.clone()));
1994        }
1995        None => {
1996            record.set("version", Value::Null);
1997            record.set("fingerprint", Value::Null);
1998            record.set("tags", Value::Array(Vec::new()));
1999            record.set("created_at", Value::Null);
2000            record.set("updated_at", Value::Null);
2001            record.set("value", Value::text(""));
2002            record.set("status", Value::text("missing"));
2003            record.set("tombstone", Value::Boolean(false));
2004            record.set("op", Value::Null);
2005        }
2006    }
2007    result.push(record);
2008    result
2009}
2010
2011fn vault_fingerprint(value: &Value) -> String {
2012    match value {
2013        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2014        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2015    }
2016}
2017
2018fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2019    let mut object = crate::json::Map::new();
2020    object.insert(
2021        "key".to_string(),
2022        crate::json::Value::String(entry.key.clone()),
2023    );
2024    object.insert(
2025        "version".to_string(),
2026        crate::json::Value::Number(entry.version as f64),
2027    );
2028    object.insert(
2029        "fingerprint".to_string(),
2030        if entry.tombstone {
2031            crate::json::Value::Null
2032        } else {
2033            crate::json::Value::String(vault_fingerprint(&entry.value))
2034        },
2035    );
2036    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2037    object.insert(
2038        "actor".to_string(),
2039        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2040    );
2041    object.insert(
2042        "sequence_id".to_string(),
2043        crate::json::Value::Number(entry.sequence_id as f64),
2044    );
2045    object.insert(
2046        "tombstone".to_string(),
2047        crate::json::Value::Bool(entry.tombstone),
2048    );
2049    object.insert(
2050        "op".to_string(),
2051        crate::json::Value::String(entry.op.clone()),
2052    );
2053    crate::json::Value::Object(object)
2054}
2055
2056fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2057    match vault_tags_value(metadata) {
2058        Value::Array(values) => crate::json::Value::Array(
2059            values
2060                .into_iter()
2061                .filter_map(|value| match value {
2062                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2063                    _ => None,
2064                })
2065                .collect(),
2066        ),
2067        _ => crate::json::Value::Array(Vec::new()),
2068    }
2069}
2070
2071fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2072    [(
2073        "tags".to_string(),
2074        MetadataValue::Array(
2075            tags.iter()
2076                .map(|tag| MetadataValue::String(tag.clone()))
2077                .collect(),
2078        ),
2079    )]
2080    .into_iter()
2081    .collect()
2082}
2083
2084fn vault_filter_watch_event(
2085    mut event: crate::replication::cdc::KvWatchEvent,
2086) -> crate::replication::cdc::KvWatchEvent {
2087    event.before = event.before.and_then(vault_metadata_json_only);
2088    event.after = event.after.and_then(vault_metadata_json_only);
2089    event
2090}
2091
2092fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2093    let object = value.as_object()?;
2094    let mut out = crate::json::Map::new();
2095    for field in [
2096        "key",
2097        "version",
2098        "fingerprint",
2099        "tags",
2100        "actor",
2101        "sequence_id",
2102        "tombstone",
2103        "op",
2104    ] {
2105        if let Some(value) = object.get(field) {
2106            out.insert(field.to_string(), value.clone());
2107        }
2108    }
2109    Some(crate::json::Value::Object(out))
2110}
2111
2112fn vault_tags_value(metadata: &Metadata) -> Value {
2113    match metadata.get("tags") {
2114        Some(MetadataValue::Array(values)) => Value::Array(
2115            values
2116                .iter()
2117                .filter_map(|value| match value {
2118                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2119                    _ => None,
2120                })
2121                .collect(),
2122        ),
2123        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2124            Value::Array(vec![Value::text(tag.clone())])
2125        }
2126        _ => Value::Array(Vec::new()),
2127    }
2128}
2129
2130fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2131    let bytes = hex::decode(hex_key)
2132        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2133    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2134        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2135    })?;
2136    Ok(key)
2137}
2138
2139fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2140    if tags.is_empty() {
2141        return None;
2142    }
2143    let values = tags
2144        .iter()
2145        .map(|tag| MetadataValue::String(tag.clone()))
2146        .collect();
2147    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2148}
2149
2150fn kv_tags_value(tags: &[String]) -> Value {
2151    let json = crate::json::Value::Array(
2152        tags.iter()
2153            .map(|tag| crate::json::Value::String(tag.clone()))
2154            .collect(),
2155    );
2156    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2157}
2158
2159fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2160    if let crate::storage::EntityData::Row(ref row) = entity.data {
2161        if let Some(ref named) = row.named {
2162            return named.get("value").cloned();
2163        }
2164    }
2165    None
2166}
2167
2168fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2169    let now = current_unix_ms();
2170    crate::physical::CollectionContract {
2171        name: name.to_string(),
2172        declared_model: crate::catalog::CollectionModel::Kv,
2173        schema_mode: crate::catalog::SchemaMode::Dynamic,
2174        origin: crate::physical::ContractOrigin::Implicit,
2175        version: 1,
2176        created_at_unix_ms: now,
2177        updated_at_unix_ms: now,
2178        default_ttl_ms: None,
2179        vector_dimension: None,
2180        vector_metric: None,
2181        context_index_fields: Vec::new(),
2182        declared_columns: Vec::new(),
2183        table_def: None,
2184        timestamps_enabled: false,
2185        context_index_enabled: false,
2186        metrics_raw_retention_ms: None,
2187        metrics_rollup_policies: Vec::new(),
2188        metrics_tenant_identity: None,
2189        metrics_namespace: None,
2190        append_only: false,
2191        subscriptions: Vec::new(),
2192        session_key: None,
2193        session_gap_ms: None,
2194        retention_duration_ms: None,
2195    }
2196}
2197
2198fn current_unix_ms() -> u128 {
2199    std::time::SystemTime::now()
2200        .duration_since(std::time::UNIX_EPOCH)
2201        .unwrap_or_default()
2202        .as_millis()
2203}
2204
2205#[cfg(test)]
2206mod tests {
2207    use crate::api::RedDBOptions;
2208    use crate::catalog::CollectionModel;
2209    use crate::runtime::RedDBRuntime;
2210
2211    fn rt() -> RedDBRuntime {
2212        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2213    }
2214
2215    #[test]
2216    fn incr_missing_key_initialises_at_by() {
2217        let r = rt();
2218        let ops = super::KvAtomicOps::new(&r);
2219        let v = ops
2220            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2221            .unwrap();
2222        assert_eq!(v, 5);
2223    }
2224
2225    #[test]
2226    fn kv_runtime_stats_count_public_ops() {
2227        let r = rt();
2228        let ops = super::KvAtomicOps::new(&r);
2229
2230        ops.set(
2231            CollectionModel::Kv,
2232            "kv_default",
2233            "profile",
2234            crate::storage::schema::Value::text("alice"),
2235            None,
2236            false,
2237        )
2238        .unwrap();
2239        ops.get(CollectionModel::Kv, "kv_default", "profile")
2240            .unwrap();
2241        ops.delete(CollectionModel::Kv, "kv_default", "profile")
2242            .unwrap();
2243        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
2244            .unwrap();
2245        ops.cas(
2246            CollectionModel::Kv,
2247            "kv_default",
2248            "profile",
2249            None,
2250            crate::storage::schema::Value::text("created"),
2251            None,
2252        )
2253        .unwrap();
2254        ops.cas(
2255            CollectionModel::Kv,
2256            "kv_default",
2257            "profile",
2258            Some(&crate::storage::schema::Value::text("different")),
2259            crate::storage::schema::Value::text("ignored"),
2260            None,
2261        )
2262        .unwrap();
2263
2264        let stats = r.stats().kv;
2265        assert_eq!(stats.puts, 1);
2266        assert_eq!(stats.gets, 1);
2267        assert_eq!(stats.deletes, 1);
2268        assert_eq!(stats.incrs, 1);
2269        assert_eq!(stats.cas_success, 1);
2270        assert_eq!(stats.cas_conflict, 1);
2271    }
2272
2273    #[test]
2274    fn kv_invalidate_tags_removes_matching_entries_only() {
2275        let r = rt();
2276
2277        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
2278            .unwrap();
2279
2280        let miss = r
2281            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
2282            .unwrap();
2283        assert_eq!(miss.affected_rows, 0);
2284        assert!(matches!(
2285            r.execute_query("KV GET sessions.blob")
2286                .unwrap()
2287                .result
2288                .records[0]
2289                .get("value"),
2290            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
2291        ));
2292
2293        let hit = r
2294            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
2295            .unwrap();
2296        assert_eq!(hit.affected_rows, 1);
2297        assert!(matches!(
2298            r.execute_query("KV GET sessions.blob")
2299                .unwrap()
2300                .result
2301                .records[0]
2302                .get("value"),
2303            Some(crate::storage::schema::Value::Null)
2304        ));
2305    }
2306
2307    #[test]
2308    fn kv_runtime_stats_count_watch_streams_and_events() {
2309        let r = rt();
2310        let ops = super::KvAtomicOps::new(&r);
2311        assert_eq!(r.stats().kv.watch_streams_active, 0);
2312
2313        {
2314            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
2315            assert_eq!(r.stats().kv.watch_streams_active, 1);
2316
2317            ops.set(
2318                CollectionModel::Kv,
2319                "kv_default",
2320                "watched",
2321                crate::storage::schema::Value::Integer(1),
2322                None,
2323                false,
2324            )
2325            .unwrap();
2326            let event = stream.poll_next().expect("watch event");
2327            assert_eq!(event.key, "watched");
2328            assert_eq!(r.stats().kv.watch_events_emitted, 1);
2329
2330            stream.record_drop_count(3);
2331            assert_eq!(r.stats().kv.watch_drops, 3);
2332        }
2333
2334        assert_eq!(r.stats().kv.watch_streams_active, 0);
2335    }
2336
2337    #[test]
2338    fn incr_existing_integer_accumulates() {
2339        let r = rt();
2340        let ops = super::KvAtomicOps::new(&r);
2341        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2342            .unwrap();
2343        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2344            .unwrap();
2345        let v = ops
2346            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
2347            .unwrap();
2348        assert_eq!(v, 3);
2349    }
2350
2351    #[test]
2352    fn decr_via_negative_by() {
2353        let r = rt();
2354        let ops = super::KvAtomicOps::new(&r);
2355        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
2356            .unwrap();
2357        let v = ops
2358            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
2359            .unwrap();
2360        assert_eq!(v, 7);
2361    }
2362
2363    #[test]
2364    fn concurrent_incr_single_key_is_atomic() {
2365        const THREADS: usize = 8;
2366        const ITERS: usize = 1000;
2367
2368        let runtime = std::sync::Arc::new(rt());
2369        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2370        let mut handles = Vec::new();
2371
2372        for _ in 0..THREADS {
2373            let runtime = std::sync::Arc::clone(&runtime);
2374            let barrier = std::sync::Arc::clone(&barrier);
2375            handles.push(std::thread::spawn(move || {
2376                let ops = super::KvAtomicOps::new(&runtime);
2377                barrier.wait();
2378                for _ in 0..ITERS {
2379                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
2380                        .unwrap();
2381                }
2382            }));
2383        }
2384
2385        for handle in handles {
2386            handle.join().expect("worker should finish");
2387        }
2388
2389        let ops = super::KvAtomicOps::new(&runtime);
2390        assert_eq!(
2391            ops.get(CollectionModel::Kv, "kv_default", "counter")
2392                .unwrap(),
2393            Some(crate::storage::schema::Value::Integer(
2394                (THREADS * ITERS) as i64
2395            ))
2396        );
2397    }
2398
2399    #[test]
2400    fn incr_on_string_value_returns_error() {
2401        let r = rt();
2402        let ops = super::KvAtomicOps::new(&r);
2403        ops.set(
2404            CollectionModel::Kv,
2405            "kv_default",
2406            "name",
2407            crate::storage::schema::Value::text("alice"),
2408            None,
2409            false,
2410        )
2411        .unwrap();
2412        let err = ops
2413            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
2414            .unwrap_err();
2415        assert!(err.to_string().contains("non-integer"));
2416    }
2417
2418    // --- CAS tests ---
2419
2420    #[test]
2421    fn cas_matching_value_succeeds() {
2422        let r = rt();
2423        let ops = super::KvAtomicOps::new(&r);
2424        ops.set(
2425            CollectionModel::Kv,
2426            "kv_default",
2427            "lock",
2428            crate::storage::schema::Value::text("free"),
2429            None,
2430            false,
2431        )
2432        .unwrap();
2433        let (ok, prev) = ops
2434            .cas(
2435                CollectionModel::Kv,
2436                "kv_default",
2437                "lock",
2438                Some(&crate::storage::schema::Value::text("free")),
2439                crate::storage::schema::Value::text("held"),
2440                None,
2441            )
2442            .unwrap();
2443        assert!(ok);
2444        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
2445        // Value actually changed.
2446        assert_eq!(
2447            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2448            Some(crate::storage::schema::Value::text("held"))
2449        );
2450    }
2451
2452    #[test]
2453    fn concurrent_cas_allows_one_success_per_round() {
2454        const THREADS: usize = 8;
2455        const ROUNDS: usize = 100;
2456
2457        let runtime = std::sync::Arc::new(rt());
2458        let ops = super::KvAtomicOps::new(&runtime);
2459        ops.set(
2460            CollectionModel::Kv,
2461            "kv_default",
2462            "cas_counter",
2463            crate::storage::schema::Value::Integer(0),
2464            None,
2465            false,
2466        )
2467        .unwrap();
2468
2469        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2470        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2471        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
2472        let mut handles = Vec::new();
2473
2474        for _ in 0..THREADS {
2475            let runtime = std::sync::Arc::clone(&runtime);
2476            let start_round = std::sync::Arc::clone(&start_round);
2477            let finish_round = std::sync::Arc::clone(&finish_round);
2478            let successes = std::sync::Arc::clone(&successes);
2479            handles.push(std::thread::spawn(move || {
2480                let ops = super::KvAtomicOps::new(&runtime);
2481                for round in 0..ROUNDS {
2482                    start_round.wait();
2483                    let (ok, _) = ops
2484                        .cas(
2485                            CollectionModel::Kv,
2486                            "kv_default",
2487                            "cas_counter",
2488                            Some(&crate::storage::schema::Value::Integer(round as i64)),
2489                            crate::storage::schema::Value::Integer((round + 1) as i64),
2490                            None,
2491                        )
2492                        .unwrap();
2493                    if ok {
2494                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2495                    }
2496                    finish_round.wait();
2497                }
2498            }));
2499        }
2500
2501        for handle in handles {
2502            handle.join().expect("worker should finish");
2503        }
2504
2505        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
2506        assert_eq!(
2507            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
2508                .unwrap(),
2509            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
2510        );
2511    }
2512
2513    #[test]
2514    fn cas_mismatching_value_fails() {
2515        let r = rt();
2516        let ops = super::KvAtomicOps::new(&r);
2517        ops.set(
2518            CollectionModel::Kv,
2519            "kv_default",
2520            "lock",
2521            crate::storage::schema::Value::text("free"),
2522            None,
2523            false,
2524        )
2525        .unwrap();
2526        let (ok, current) = ops
2527            .cas(
2528                CollectionModel::Kv,
2529                "kv_default",
2530                "lock",
2531                Some(&crate::storage::schema::Value::text("held")),
2532                crate::storage::schema::Value::text("worker-7"),
2533                None,
2534            )
2535            .unwrap();
2536        assert!(!ok);
2537        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
2538        // Value unchanged.
2539        assert_eq!(
2540            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
2541            Some(crate::storage::schema::Value::text("free"))
2542        );
2543    }
2544
2545    #[test]
2546    fn cas_expect_null_on_missing_key_creates() {
2547        let r = rt();
2548        let ops = super::KvAtomicOps::new(&r);
2549        let (ok, prev) = ops
2550            .cas(
2551                CollectionModel::Kv,
2552                "kv_default",
2553                "new_key",
2554                None,
2555                crate::storage::schema::Value::text("created"),
2556                None,
2557            )
2558            .unwrap();
2559        assert!(ok);
2560        assert_eq!(prev, None);
2561        assert_eq!(
2562            ops.get(CollectionModel::Kv, "kv_default", "new_key")
2563                .unwrap(),
2564            Some(crate::storage::schema::Value::text("created"))
2565        );
2566    }
2567
2568    #[test]
2569    fn cas_expect_null_on_existing_key_fails() {
2570        let r = rt();
2571        let ops = super::KvAtomicOps::new(&r);
2572        ops.set(
2573            CollectionModel::Kv,
2574            "kv_default",
2575            "taken",
2576            crate::storage::schema::Value::text("worker-1"),
2577            None,
2578            false,
2579        )
2580        .unwrap();
2581        let (ok, current) = ops
2582            .cas(
2583                CollectionModel::Kv,
2584                "kv_default",
2585                "taken",
2586                None,
2587                crate::storage::schema::Value::text("worker-2"),
2588                None,
2589            )
2590            .unwrap();
2591        assert!(!ok);
2592        assert_eq!(
2593            current,
2594            Some(crate::storage::schema::Value::text("worker-1"))
2595        );
2596    }
2597
2598    #[test]
2599    fn cas_via_sql_roundtrip() {
2600        let r = rt();
2601        // Seed value.
2602        r.execute_query("KV PUT lock = 'free'").unwrap();
2603        // CAS: free → held — should succeed.
2604        let res = r
2605            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2606            .unwrap();
2607        let row = &res.result.records[0];
2608        assert_eq!(
2609            row.get("ok"),
2610            Some(&crate::storage::schema::Value::Boolean(true))
2611        );
2612        // CAS: free → held again — should fail (value is now 'held').
2613        let res2 = r
2614            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
2615            .unwrap();
2616        let row2 = &res2.result.records[0];
2617        assert_eq!(
2618            row2.get("ok"),
2619            Some(&crate::storage::schema::Value::Boolean(false))
2620        );
2621    }
2622
2623    #[test]
2624    fn cas_expect_null_via_sql() {
2625        let r = rt();
2626        let res = r
2627            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
2628            .unwrap();
2629        let row = &res.result.records[0];
2630        assert_eq!(
2631            row.get("ok"),
2632            Some(&crate::storage::schema::Value::Boolean(true))
2633        );
2634        // Second call must fail.
2635        let res2 = r
2636            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
2637            .unwrap();
2638        let row2 = &res2.result.records[0];
2639        assert_eq!(
2640            row2.get("ok"),
2641            Some(&crate::storage::schema::Value::Boolean(false))
2642        );
2643    }
2644
2645    #[test]
2646    fn incr_via_sql_roundtrip() {
2647        let r = rt();
2648        let res = r.execute_query("KV INCR hits").unwrap();
2649        let row = &res.result.records[0];
2650        assert_eq!(
2651            row.get("value"),
2652            Some(&crate::storage::schema::Value::Integer(1))
2653        );
2654        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
2655        let row2 = &res2.result.records[0];
2656        assert_eq!(
2657            row2.get("value"),
2658            Some(&crate::storage::schema::Value::Integer(5))
2659        );
2660    }
2661
2662    #[test]
2663    fn concurrent_self_referential_update_is_atomic() {
2664        const THREADS: usize = 8;
2665        const ITERS: usize = 100;
2666
2667        let runtime = std::sync::Arc::new(rt());
2668        runtime
2669            .execute_query("CREATE TABLE counters (id INT, n INT)")
2670            .unwrap();
2671        runtime
2672            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
2673            .unwrap();
2674
2675        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
2676        let mut handles = Vec::new();
2677        for _ in 0..THREADS {
2678            let runtime = std::sync::Arc::clone(&runtime);
2679            let barrier = std::sync::Arc::clone(&barrier);
2680            handles.push(std::thread::spawn(move || {
2681                barrier.wait();
2682                for _ in 0..ITERS {
2683                    runtime
2684                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
2685                        .unwrap();
2686                }
2687            }));
2688        }
2689
2690        for handle in handles {
2691            handle.join().expect("worker should finish");
2692        }
2693
2694        let selected = runtime
2695            .execute_query("SELECT n FROM counters WHERE id = 1")
2696            .unwrap();
2697        assert_eq!(
2698            selected.result.records[0].get("n"),
2699            Some(&crate::storage::schema::Value::Integer(
2700                (THREADS * ITERS) as i64
2701            ))
2702        );
2703    }
2704
2705    #[test]
2706    fn watch_stream_delivers_key_events_in_lsn_order() {
2707        let r = rt();
2708        let ops = super::KvAtomicOps::new(&r);
2709        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
2710
2711        ops.set(
2712            CollectionModel::Kv,
2713            "kv_default",
2714            "seq",
2715            crate::storage::schema::Value::Integer(1),
2716            None,
2717            false,
2718        )
2719        .unwrap();
2720        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
2721            .unwrap();
2722        ops.delete(CollectionModel::Kv, "kv_default", "seq")
2723            .unwrap();
2724        ops.set(
2725            CollectionModel::Kv,
2726            "kv_default",
2727            "seq",
2728            crate::storage::schema::Value::Integer(9),
2729            None,
2730            false,
2731        )
2732        .unwrap();
2733
2734        let mut events = Vec::new();
2735        while let Some(event) = stream.poll_next() {
2736            events.push(event);
2737            if events.len() == 4 {
2738                break;
2739            }
2740        }
2741
2742        assert_eq!(events.len(), 4);
2743        assert_eq!(
2744            events[0].op,
2745            crate::replication::cdc::ChangeOperation::Insert
2746        );
2747        assert_eq!(
2748            events[1].op,
2749            crate::replication::cdc::ChangeOperation::Update
2750        );
2751        assert_eq!(
2752            events[2].op,
2753            crate::replication::cdc::ChangeOperation::Delete
2754        );
2755        assert_eq!(
2756            events[3].op,
2757            crate::replication::cdc::ChangeOperation::Insert
2758        );
2759        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
2760    }
2761
2762    #[test]
2763    fn watch_prefix_stream_delivers_matching_events_only() {
2764        let r = rt();
2765        let ops = super::KvAtomicOps::new(&r);
2766        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
2767
2768        ops.set(
2769            CollectionModel::Kv,
2770            "kv_default",
2771            "acct:1",
2772            crate::storage::schema::Value::Integer(1),
2773            None,
2774            false,
2775        )
2776        .unwrap();
2777        ops.set(
2778            CollectionModel::Kv,
2779            "kv_default",
2780            "session:1",
2781            crate::storage::schema::Value::Integer(2),
2782            None,
2783            false,
2784        )
2785        .unwrap();
2786        ops.set(
2787            CollectionModel::Kv,
2788            "kv_default",
2789            "acct:2",
2790            crate::storage::schema::Value::Integer(3),
2791            None,
2792            false,
2793        )
2794        .unwrap();
2795
2796        let first = stream.poll_next().expect("first prefix event");
2797        let second = stream.poll_next().expect("second prefix event");
2798        assert_eq!(first.key, "acct:1");
2799        assert_eq!(second.key, "acct:2");
2800        assert!(stream.poll_next().is_none());
2801    }
2802
2803    #[test]
2804    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
2805        let r = rt();
2806        let ops = super::KvAtomicOps::new(&r);
2807        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
2808
2809        let mut last_seen_lsn = 0;
2810        for value in 0..5 {
2811            ops.set(
2812                CollectionModel::Kv,
2813                "kv_default",
2814                "resume",
2815                crate::storage::schema::Value::Integer(value),
2816                None,
2817                false,
2818            )
2819            .unwrap();
2820            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
2821        }
2822        drop(stream);
2823
2824        for value in 5..55 {
2825            ops.set(
2826                CollectionModel::Kv,
2827                "kv_default",
2828                "resume",
2829                crate::storage::schema::Value::Integer(value),
2830                None,
2831                false,
2832            )
2833            .unwrap();
2834        }
2835
2836        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
2837        let mut lsns = Vec::new();
2838        while let Some(event) = resumed.poll_next() {
2839            lsns.push(event.lsn);
2840            if lsns.len() == 50 {
2841                break;
2842            }
2843        }
2844
2845        assert_eq!(lsns.len(), 50);
2846        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
2847        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
2848        assert!(resumed.poll_next().is_none());
2849    }
2850
2851    #[test]
2852    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
2853        let r = rt();
2854        let ops = super::KvAtomicOps::new(&r);
2855        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
2856
2857        for value in 0..10_000 {
2858            ops.set(
2859                CollectionModel::Kv,
2860                "kv_default",
2861                "slow",
2862                crate::storage::schema::Value::Integer(value),
2863                None,
2864                false,
2865            )
2866            .unwrap();
2867        }
2868
2869        let event = stream.poll_next().expect("tail event after drops");
2870        assert!(event.lsn > 1);
2871        assert!(event.dropped_event_count > 0);
2872        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
2873        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
2874    }
2875
2876    #[test]
2877    fn watch_stream_idle_timeout_closes_subscription() {
2878        let r = rt();
2879        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
2880            .unwrap();
2881
2882        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
2883        assert_eq!(r.stats().kv.watch_streams_active, 1);
2884        std::thread::sleep(std::time::Duration::from_millis(5));
2885
2886        assert!(stream.poll_next().is_none());
2887        assert_eq!(r.stats().kv.watch_streams_active, 0);
2888    }
2889
2890    #[test]
2891    fn watch_stream_does_not_emit_rolled_back_put() {
2892        let r = rt();
2893        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
2894
2895        r.execute_query("BEGIN").unwrap();
2896        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
2897        r.execute_query("ROLLBACK").unwrap();
2898
2899        assert!(stream.poll_next().is_none());
2900    }
2901}