Skip to main content

reddb_server/runtime/
impl_kv.rs

1//! KV DSL command execution and KvAtomicOps module.
2//!
3//! Handles `KV PUT key = value [EXPIRE n] [TAGS [...]] [IF NOT EXISTS]`,
4//! `KV GET key`, and `KV DELETE key`.
5
6use crate::application::ports::RuntimeEntityPort;
7use crate::storage::unified::{Metadata, MetadataValue};
8
9use super::impl_core::{current_auth_identity, current_connection_id, current_tenant};
10use super::*;
11
12/// Default collection name for bare-key KV operations.
13pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19fn keyed_model_name(model: crate::catalog::CollectionModel) -> &'static str {
20    match model {
21        crate::catalog::CollectionModel::Kv => "kv",
22        crate::catalog::CollectionModel::Vault => "vault",
23        crate::catalog::CollectionModel::Config => "config",
24        _ => "collection",
25    }
26}
27
28#[derive(Debug, Clone)]
29struct VaultEntry {
30    id: crate::storage::EntityId,
31    key: String,
32    value: crate::storage::schema::Value,
33    metadata: Metadata,
34    created_at: u64,
35    updated_at: u64,
36    sequence_id: u64,
37    version: i64,
38    tombstone: bool,
39    op: String,
40}
41
42impl super::keyed_spine::KeyedVersion for VaultEntry {
43    fn key(&self) -> &str {
44        &self.key
45    }
46
47    fn version(&self) -> i64 {
48        self.version
49    }
50}
51
52impl VaultEntry {
53    fn from_keyed_row(
54        version: super::keyed_spine::KeyedRowVersion,
55        metadata: Metadata,
56        created_at: u64,
57        updated_at: u64,
58        sequence_id: u64,
59    ) -> Self {
60        Self {
61            id: version.id,
62            key: version.key,
63            value: version.value,
64            metadata,
65            created_at,
66            updated_at,
67            sequence_id,
68            version: version.version,
69            tombstone: version.tombstone,
70            op: version.op,
71        }
72    }
73}
74
75/// Atomic KV operations interface — the seam that transports and drivers depend on.
76///
77/// All three verbs delegate to the runtime's existing `create_kv` / `get_kv` /
78/// `delete_kv` plumbing; this struct adds the auto-create and upsert logic.
79pub struct KvAtomicOps<'a> {
80    runtime: &'a RedDBRuntime,
81}
82
83impl<'a> KvAtomicOps<'a> {
84    pub fn new(runtime: &'a RedDBRuntime) -> Self {
85        Self { runtime }
86    }
87
88    /// Insert or update a KV entry. Auto-creates the collection when needed.
89    ///
90    /// Insert or update a KV entry. Returns `(created: bool, id: EntityId)`.
91    pub fn set(
92        &self,
93        model: crate::catalog::CollectionModel,
94        collection: &str,
95        key: &str,
96        value: crate::storage::schema::Value,
97        ttl_ms: Option<u64>,
98        if_not_exists: bool,
99    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
100        self.set_with_tags_for_model(model, collection, key, value, ttl_ms, &[], if_not_exists)
101    }
102
103    pub fn set_with_tags(
104        &self,
105        collection: &str,
106        key: &str,
107        value: crate::storage::schema::Value,
108        ttl_ms: Option<u64>,
109        tags: &[String],
110        if_not_exists: bool,
111    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
112        self.set_with_tags_for_model(
113            crate::catalog::CollectionModel::Kv,
114            collection,
115            key,
116            value,
117            ttl_ms,
118            tags,
119            if_not_exists,
120        )
121    }
122
123    fn set_with_tags_for_model(
124        &self,
125        model: crate::catalog::CollectionModel,
126        collection: &str,
127        key: &str,
128        value: crate::storage::schema::Value,
129        ttl_ms: Option<u64>,
130        tags: &[String],
131        if_not_exists: bool,
132    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
133        self.set_with_tags_and_metadata_for_model(
134            model,
135            collection,
136            key,
137            value,
138            ttl_ms,
139            tags,
140            if_not_exists,
141            Vec::new(),
142        )
143    }
144
145    pub fn set_with_tags_and_metadata(
146        &self,
147        collection: &str,
148        key: &str,
149        value: crate::storage::schema::Value,
150        ttl_ms: Option<u64>,
151        tags: &[String],
152        if_not_exists: bool,
153        metadata: Vec<(String, MetadataValue)>,
154    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
155        self.set_with_tags_and_metadata_for_model(
156            crate::catalog::CollectionModel::Kv,
157            collection,
158            key,
159            value,
160            ttl_ms,
161            tags,
162            if_not_exists,
163            metadata,
164        )
165    }
166
167    fn set_with_tags_and_metadata_for_model(
168        &self,
169        model: crate::catalog::CollectionModel,
170        collection: &str,
171        key: &str,
172        value: crate::storage::schema::Value,
173        ttl_ms: Option<u64>,
174        tags: &[String],
175        if_not_exists: bool,
176        mut metadata: Vec<(String, MetadataValue)>,
177    ) -> RedDBResult<(bool, crate::storage::EntityId)> {
178        self.ensure_keyed_collection(model, collection)?;
179
180        if model == crate::catalog::CollectionModel::Vault {
181            let latest = self.get_vault_entry(collection, key)?;
182            let was_present = latest.as_ref().is_some_and(|entry| !entry.tombstone);
183            if if_not_exists && was_present {
184                return Ok((false, latest.expect("checked present").id));
185            }
186            let entry = self.append_vault_version(collection, key, value, "put", false, tags)?;
187            self.runtime.record_kv_watch_event(
188                if latest.is_some() {
189                    crate::replication::cdc::ChangeOperation::Update
190                } else {
191                    crate::replication::cdc::ChangeOperation::Insert
192                },
193                collection,
194                key,
195                entry.id.raw(),
196                latest.as_ref().map(vault_entry_metadata_json),
197                Some(vault_entry_metadata_json(&entry)),
198            );
199            return Ok((!was_present, entry.id));
200        }
201
202        let existing = self.get_entry(model, collection, key)?;
203        let was_present = existing.is_some();
204
205        if if_not_exists && was_present {
206            let (_, id) = existing.unwrap();
207            self.runtime.inner.kv_stats.incr_puts();
208            return Ok((false, id));
209        }
210
211        let before = existing
212            .as_ref()
213            .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value));
214        let op = if was_present {
215            crate::replication::cdc::ChangeOperation::Update
216        } else {
217            crate::replication::cdc::ChangeOperation::Insert
218        };
219        let after = Some(crate::presentation::entity_json::storage_value_to_json(
220            &value,
221        ));
222
223        // Versioned KV retains MVCC history: the prior visible version is
224        // tombstoned (set_xmax) *after* the new version is created, so both
225        // coexist physically — exactly like a table-row update. Non-versioned
226        // KV keeps last-writer-wins: pre-delete the old row before creating
227        // fresh (also handles TTL refresh).
228        let versioned = self.is_versioned_collection(model, collection);
229        // Capture the prior *visible* version (if any) before the write so we
230        // can tombstone it. On a re-PUT after a delete (`was_present` false)
231        // there is no visible prior, but earlier versions still exist
232        // physically — see the xmin restamp below.
233        let prior_versioned_entity = if versioned && was_present {
234            self.get_entity(model, collection, key)?
235        } else {
236            None
237        };
238        if was_present && !versioned {
239            self.delete(model, collection, key)?;
240        }
241
242        if let Some(ttl_metadata) = ttl_metadata(ttl_ms) {
243            metadata.extend(ttl_metadata.fields);
244        }
245        if let Some(tags_metadata) = kv_tags_metadata(tags) {
246            metadata.push(tags_metadata);
247        }
248
249        let output = self
250            .runtime
251            .create_kv(crate::application::entity::CreateKvInput {
252                collection: collection.to_string(),
253                key: key.to_string(),
254                value,
255                metadata,
256            })?;
257
258        // Every versioned PUT stamps the new version's xmin from ONE monotonic
259        // xid (begin()/commit() or the active tx xid) — NOT the autocommit
260        // pool, whose batched reservation is not monotonic against commit
261        // root_xids and would let AS OF resolve the wrong version. The prior
262        // visible version (if any) is tombstoned with that SAME xid, so
263        // old.xmax == new.xmin and exactly one version is visible to any
264        // snapshot (mirrors `apply_loaded_sql_update_row_core`). Restamping
265        // unconditionally also fixes the re-PUT-after-delete case, where no
266        // visible prior exists but the pool xmin would still scramble ordering
267        // against existing tombstoned versions.
268        if versioned {
269            let version_xid = match self.runtime.current_xid() {
270                Some(xid) => xid,
271                None => {
272                    let mgr = self.runtime.snapshot_manager();
273                    let xid = mgr.begin();
274                    mgr.commit(xid);
275                    xid
276                }
277            };
278            if let Some(new_entity) = output.entity.clone() {
279                self.restamp_xmin(collection, new_entity, version_xid)?;
280            }
281            if let Some(prior) = prior_versioned_entity {
282                // First-committer-wins (ADR 0014): inside an explicit
283                // transaction, defer the prior version's tombstone to the
284                // commit-time conflict check by recording it as a pending
285                // versioned update — exactly like a table-row versioned
286                // UPDATE (`persist_applied_entity_mutations`). The check
287                // rejects this txn's COMMIT if the prior version was
288                // already tombstoned by a concurrent committed writer.
289                // Autocommit (`current_xid()` is None) commits eagerly and
290                // records nothing, matching the table path.
291                let previous_xmax = prior.xmax;
292                let old_id = prior.id;
293                self.tombstone_version(collection, prior, version_xid)?;
294                if self.runtime.current_xid().is_some() {
295                    self.runtime.record_pending_versioned_update(
296                        crate::runtime::impl_core::current_connection_id(),
297                        collection,
298                        old_id,
299                        output.id,
300                        version_xid,
301                        previous_xmax,
302                    );
303                }
304            }
305        }
306        if model == crate::catalog::CollectionModel::Kv {
307            self.runtime
308                .inner
309                .kv_tag_index
310                .replace(collection, key, output.id, tags);
311        }
312
313        if model == crate::catalog::CollectionModel::Kv {
314            self.runtime
315                .record_kv_watch_event(op, collection, key, output.id.raw(), before, after);
316        }
317
318        if model == crate::catalog::CollectionModel::Kv {
319            self.runtime.inner.kv_stats.incr_puts();
320        }
321        Ok((!was_present, output.id))
322    }
323
324    /// Retrieve a KV value by key. Returns `None` when not found.
325    pub fn get(
326        &self,
327        model: crate::catalog::CollectionModel,
328        collection: &str,
329        key: &str,
330    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
331        let result = self.get_entry(model, collection, key)?;
332        if model == crate::catalog::CollectionModel::Kv {
333            self.runtime.inner.kv_stats.incr_gets();
334        }
335        Ok(result.map(|(v, _)| v))
336    }
337
338    /// Delete a KV entry. Returns `true` if the key existed.
339    pub fn delete(
340        &self,
341        model: crate::catalog::CollectionModel,
342        collection: &str,
343        key: &str,
344    ) -> RedDBResult<bool> {
345        self.ensure_declared_model(model, collection)?;
346
347        // Versioned KV deletes tombstone the prior visible version
348        // (set_xmax) instead of reclaiming it physically, so history and
349        // time-travel reads survive — VACUUM reclaims the version later.
350        if self.is_versioned_collection(model, collection) {
351            let Some(entity) = self.get_entity(model, collection, key)? else {
352                return Ok(false);
353            };
354            let id = entity.id;
355            let value = kv_value_from_entity(&entity);
356            let xid = self.runtime.current_xid().unwrap_or_else(|| {
357                let mgr = self.runtime.snapshot_manager();
358                let xid = mgr.begin();
359                mgr.commit(xid);
360                xid
361            });
362            // First-committer-wins (ADR 0014): inside an explicit
363            // transaction, defer this tombstone to the commit-time
364            // conflict check by recording it as a pending tombstone —
365            // exactly like a table-row DELETE (`delete_entities_batch`).
366            // The check rejects this txn's COMMIT if the version was
367            // already tombstoned by a concurrent committed writer.
368            // Autocommit (`current_xid()` is None) commits eagerly and
369            // records nothing, matching the table path.
370            let previous_xmax = entity.xmax;
371            self.tombstone_version(collection, entity, xid)?;
372            if self.runtime.current_xid().is_some() {
373                self.runtime.record_pending_tombstone(
374                    crate::runtime::impl_core::current_connection_id(),
375                    collection,
376                    id,
377                    xid,
378                    previous_xmax,
379                );
380            }
381            self.runtime.inner.kv_tag_index.remove(collection, key);
382            self.runtime.record_kv_watch_event(
383                crate::replication::cdc::ChangeOperation::Delete,
384                collection,
385                key,
386                id.raw(),
387                value
388                    .as_ref()
389                    .map(crate::presentation::entity_json::storage_value_to_json),
390                None,
391            );
392            self.runtime.inner.kv_stats.incr_deletes();
393            return Ok(true);
394        }
395
396        let found = self.get_entry(model, collection, key)?;
397        if let Some((value, id)) = found {
398            let store = self.runtime.inner.db.store();
399            let deleted = store
400                .delete(collection, id)
401                .map_err(|err| RedDBError::Internal(err.to_string()))?;
402            if deleted {
403                store.context_index().remove_entity(id);
404                if model == crate::catalog::CollectionModel::Kv {
405                    self.runtime.inner.kv_tag_index.remove(collection, key);
406                    self.runtime.record_kv_watch_event(
407                        crate::replication::cdc::ChangeOperation::Delete,
408                        collection,
409                        key,
410                        id.raw(),
411                        Some(crate::presentation::entity_json::storage_value_to_json(
412                            &value,
413                        )),
414                        None,
415                    );
416                    self.runtime.inner.kv_stats.incr_deletes();
417                }
418            }
419            Ok(deleted)
420        } else {
421            Ok(false)
422        }
423    }
424
425    /// Tombstone a KV version by stamping `xmax = xid` and persisting it.
426    ///
427    /// The physical row stays alive (history); VACUUM reclaims it once no
428    /// snapshot can see it. Used by versioned PUT (old version) and
429    /// versioned DELETE.
430    fn tombstone_version(
431        &self,
432        collection: &str,
433        mut entity: crate::storage::UnifiedEntity,
434        xid: crate::storage::transaction::snapshot::Xid,
435    ) -> RedDBResult<()> {
436        let store = self.runtime.inner.db.store();
437        let Some(manager) = store.get_collection(collection) else {
438            return Ok(());
439        };
440        let id = entity.id;
441        entity.set_xmax(xid);
442        manager
443            .update(entity.clone())
444            .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
445        store
446            .persist_entities_to_pager(collection, std::slice::from_ref(&entity))
447            .map_err(|err| RedDBError::Internal(err.to_string()))?;
448        // The tombstoned version drops out of the live context index but
449        // stays physically resident for AS OF reads until VACUUM.
450        store.context_index().remove_entity(id);
451        Ok(())
452    }
453
454    /// Restamp a freshly-created KV version's `xmin` to a monotonic version
455    /// xid and persist it. `create_kv` stamps a (non-monotonic) autocommit
456    /// pool xid; the versioned write path overrides it so the new version's
457    /// xmin equals the old version's xmax, preserving AS OF ordering against
458    /// commit root_xids.
459    fn restamp_xmin(
460        &self,
461        collection: &str,
462        mut entity: crate::storage::UnifiedEntity,
463        xid: crate::storage::transaction::snapshot::Xid,
464    ) -> RedDBResult<()> {
465        let store = self.runtime.inner.db.store();
466        let Some(manager) = store.get_collection(collection) else {
467            return Ok(());
468        };
469        entity.set_xmin(xid);
470        manager
471            .update(entity.clone())
472            .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
473        store
474            .persist_entities_to_pager(collection, std::slice::from_ref(&entity))
475            .map_err(|err| RedDBError::Internal(err.to_string()))?;
476        Ok(())
477    }
478
479    /// Atomically increment (or decrement) a counter key. Returns the new value.
480    ///
481    /// - Missing key initialises at `by` (Redis-compat).
482    /// - Non-integer value returns an error before any mutation.
483    pub fn incr(
484        &self,
485        model: crate::catalog::CollectionModel,
486        collection: &str,
487        key: &str,
488        by: i64,
489        ttl_ms: Option<u64>,
490    ) -> RedDBResult<i64> {
491        if model == crate::catalog::CollectionModel::Vault {
492            return Err(RedDBError::InvalidOperation(
493                "VAULT INCR is not supported for sealed secrets".to_string(),
494            ));
495        }
496        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
497        let _rmw_guard = rmw_lock.lock();
498        self.ensure_kv_collection(collection)?;
499        let existing = self.runtime.get_kv(collection, key)?;
500        let current: i64 = match existing.as_ref() {
501            None => 0,
502            Some((crate::storage::schema::Value::Integer(n), _)) => *n,
503            Some((crate::storage::schema::Value::Float(f), _)) => *f as i64,
504            Some((other, _)) => {
505                return Err(RedDBError::Internal(format!(
506                    "INCR on non-integer value: {:?}",
507                    other
508                )));
509            }
510        };
511
512        let next = current
513            .checked_add(by)
514            .ok_or_else(|| RedDBError::Internal(format!("INCR overflow: {current} + {by}")))?;
515
516        // Delete then re-create so TTL is refreshed.
517        if existing.is_some() {
518            self.runtime.delete_kv(collection, key)?;
519        }
520
521        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
522            .map(|m| m.fields.into_iter().collect())
523            .unwrap_or_default();
524
525        let output = self
526            .runtime
527            .create_kv(crate::application::entity::CreateKvInput {
528                collection: collection.to_string(),
529                key: key.to_string(),
530                value: crate::storage::schema::Value::Integer(next),
531                metadata: meta_vec,
532            })?;
533        self.runtime
534            .inner
535            .kv_tag_index
536            .replace(collection, key, output.id, &[]);
537
538        self.runtime.record_kv_watch_event(
539            if existing.is_some() {
540                crate::replication::cdc::ChangeOperation::Update
541            } else {
542                crate::replication::cdc::ChangeOperation::Insert
543            },
544            collection,
545            key,
546            output.id.raw(),
547            existing
548                .as_ref()
549                .map(|(value, _)| crate::presentation::entity_json::storage_value_to_json(value)),
550            Some(crate::presentation::entity_json::storage_value_to_json(
551                &crate::storage::schema::Value::Integer(next),
552            )),
553        );
554
555        self.runtime.inner.kv_stats.incr_incrs();
556        Ok(next)
557    }
558
559    /// Compare-and-set: atomically swap `key` from `expected` to `new_value`.
560    ///
561    /// Returns `(ok, current)`:
562    /// - `ok = true`  → swap applied; `current` is the value *before* the swap.
563    /// - `ok = false` → swap skipped; `current` holds the actual current value.
564    ///
565    /// `expected = None` means the caller expects the key to be *absent* (create-if-absent).
566    pub fn cas(
567        &self,
568        model: crate::catalog::CollectionModel,
569        collection: &str,
570        key: &str,
571        expected: Option<&crate::storage::schema::Value>,
572        new_value: crate::storage::schema::Value,
573        ttl_ms: Option<u64>,
574    ) -> RedDBResult<(bool, Option<crate::storage::schema::Value>)> {
575        if model == crate::catalog::CollectionModel::Vault {
576            return Err(RedDBError::InvalidOperation(
577                "VAULT CAS is not supported for sealed secrets".to_string(),
578            ));
579        }
580        let rmw_lock = self.runtime.inner.rmw_locks.lock_for(collection, key);
581        let _rmw_guard = rmw_lock.lock();
582        self.ensure_kv_collection(collection)?;
583        let current = self.runtime.get_kv(collection, key)?.map(|(v, _)| v);
584
585        let matches = match (&current, expected) {
586            (None, None) => true,
587            (Some(cur), Some(exp)) => cur == exp,
588            _ => false,
589        };
590
591        if !matches {
592            self.runtime.inner.kv_stats.incr_cas_conflict();
593            return Ok((false, current));
594        }
595
596        // Swap: delete old entry (if present), write new one.
597        if current.is_some() {
598            self.runtime.delete_kv(collection, key)?;
599        }
600
601        let meta_vec: Vec<(String, crate::storage::unified::MetadataValue)> = ttl_metadata(ttl_ms)
602            .map(|m| m.fields.into_iter().collect())
603            .unwrap_or_default();
604
605        let output = self
606            .runtime
607            .create_kv(crate::application::entity::CreateKvInput {
608                collection: collection.to_string(),
609                key: key.to_string(),
610                value: new_value.clone(),
611                metadata: meta_vec,
612            })?;
613        self.runtime
614            .inner
615            .kv_tag_index
616            .replace(collection, key, output.id, &[]);
617
618        self.runtime.record_kv_watch_event(
619            if current.is_some() {
620                crate::replication::cdc::ChangeOperation::Update
621            } else {
622                crate::replication::cdc::ChangeOperation::Insert
623            },
624            collection,
625            key,
626            output.id.raw(),
627            current
628                .as_ref()
629                .map(crate::presentation::entity_json::storage_value_to_json),
630            Some(crate::presentation::entity_json::storage_value_to_json(
631                &new_value,
632            )),
633        );
634
635        self.runtime.inner.kv_stats.incr_cas_success();
636        Ok((true, current))
637    }
638
639    pub fn invalidate_tags(&self, collection: &str, tags: &[String]) -> RedDBResult<usize> {
640        self.runtime
641            .check_write(crate::runtime::write_gate::WriteKind::Dml)?;
642        self.runtime.check_kv_invalidate_policy(collection)?;
643        self.ensure_kv_collection(collection)?;
644        let entries = self
645            .runtime
646            .inner
647            .kv_tag_index
648            .entries_for_tags(collection, tags);
649        if entries.is_empty() {
650            return Ok(0);
651        }
652
653        let store = self.runtime.inner.db.store();
654        let mut removed = 0usize;
655        for (key, id) in entries {
656            let before = store
657                .get(collection, id)
658                .and_then(|entity| kv_value_from_entity(&entity));
659            let deleted = store
660                .delete(collection, id)
661                .map_err(|err| RedDBError::Internal(err.to_string()))?;
662            if deleted {
663                store.context_index().remove_entity(id);
664                self.runtime.inner.kv_tag_index.remove(collection, &key);
665                self.runtime.record_kv_watch_event(
666                    crate::replication::cdc::ChangeOperation::Delete,
667                    collection,
668                    &key,
669                    id.raw(),
670                    before
671                        .as_ref()
672                        .map(crate::presentation::entity_json::storage_value_to_json),
673                    None,
674                );
675                removed += 1;
676            }
677        }
678        if removed > 0 {
679            self.runtime.inner.kv_stats.incr_deletes();
680        }
681        Ok(removed)
682    }
683
684    pub fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
685        self.runtime
686            .inner
687            .kv_tag_index
688            .tags_for_key(collection, key)
689    }
690
691    /// Auto-create a KV collection if it does not exist yet.
692    fn ensure_kv_collection(&self, collection: &str) -> RedDBResult<()> {
693        self.ensure_keyed_collection(crate::catalog::CollectionModel::Kv, collection)
694    }
695
696    fn ensure_keyed_collection(
697        &self,
698        model: crate::catalog::CollectionModel,
699        collection: &str,
700    ) -> RedDBResult<()> {
701        let store = self.runtime.inner.db.store();
702        if store.get_collection(collection).is_some() {
703            return self.ensure_declared_model(model, collection);
704        }
705        if model != crate::catalog::CollectionModel::Kv {
706            return Err(RedDBError::NotFound(format!(
707                "{} collection '{collection}' does not exist",
708                keyed_model_name(model)
709            )));
710        }
711        // Check config gate: red.config.kv.default_collection (default = true).
712        let auto_create = self
713            .runtime
714            .config_bool("red.config.kv.default_collection", true);
715        if !auto_create {
716            return Err(RedDBError::NotFound(format!(
717                "kv collection '{collection}' does not exist and auto-create is disabled \
718                 (red.config.kv.default_collection = false)"
719            )));
720        }
721        store
722            .create_collection(collection)
723            .map_err(|err| RedDBError::Internal(err.to_string()))?;
724        self.runtime
725            .inner
726            .db
727            .save_collection_contract(kv_collection_contract(collection))
728            .map_err(|err| RedDBError::Internal(err.to_string()))?;
729        Ok(())
730    }
731
732    fn get_entry(
733        &self,
734        model: crate::catalog::CollectionModel,
735        collection: &str,
736        key: &str,
737    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
738        let Some(entity) = self.get_entity(model, collection, key)? else {
739            return Ok(None);
740        };
741        Ok(kv_value_from_entity(&entity).map(|value| (value, entity.id)))
742    }
743
744    /// Whether `collection` retains MVCC history (opted into versioning).
745    ///
746    /// Versioned KV reads must do MVCC version-selection; non-versioned
747    /// collections (config `red_config`, the secrets vault) keep the
748    /// last-writer-wins fast path. Internal `red_*` collections are
749    /// never versioned, so this stays cheap for them.
750    fn is_versioned_collection(
751        &self,
752        model: crate::catalog::CollectionModel,
753        collection: &str,
754    ) -> bool {
755        // Only plain KV ever opts into versioning; the vault and config
756        // tiers are non-versioned by construction.
757        if model != crate::catalog::CollectionModel::Kv {
758            return false;
759        }
760        self.runtime.vcs_is_versioned(collection).unwrap_or(false)
761    }
762
763    fn get_entity(
764        &self,
765        model: crate::catalog::CollectionModel,
766        collection: &str,
767        key: &str,
768    ) -> RedDBResult<Option<crate::storage::UnifiedEntity>> {
769        self.ensure_declared_model(model, collection)?;
770        let store = self.runtime.inner.db.store();
771        let Some(manager) = store.get_collection(collection) else {
772            return Ok(None);
773        };
774
775        // Versioned KV: multiple physical versions per logical key can
776        // coexist (tombstoned + live). Select the snapshot-visible
777        // version with the greatest xmin — mirroring the table-row
778        // resolver's per-logical-id rule, keyed on the KV `key` field.
779        if self.is_versioned_collection(model, collection) {
780            let resolver =
781                crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
782            let mut best: Option<crate::storage::UnifiedEntity> = None;
783            for entity in manager.query_all(|_| true) {
784                if !kv_entity_has_key(&entity, key) {
785                    continue;
786                }
787                if resolver.resolve_read_candidate(&entity).is_none() {
788                    continue;
789                }
790                let better = match &best {
791                    Some(current) => entity.xmin >= current.xmin,
792                    None => true,
793                };
794                if better {
795                    best = Some(entity);
796                }
797            }
798            return Ok(best);
799        }
800
801        let entities = manager.query_all(|_| true);
802        for entity in entities {
803            if kv_entity_has_key(&entity, key) {
804                return Ok(Some(entity));
805            }
806        }
807        Ok(None)
808    }
809
810    fn latest_kv_entries(
811        &self,
812        collection: &str,
813        prefix: Option<&str>,
814    ) -> RedDBResult<Vec<(String, crate::storage::UnifiedEntity)>> {
815        self.ensure_declared_model(crate::catalog::CollectionModel::Kv, collection)?;
816        let store = self.runtime.inner.db.store();
817        let Some(manager) = store.get_collection(collection) else {
818            return Ok(Vec::new());
819        };
820        // Versioned KV: per logical key, keep the snapshot-visible
821        // version with the greatest xmin; keys whose every version is
822        // tombstoned (no visible version) drop out entirely. Mirrors
823        // the per-key version-selection used by `get_entity`.
824        let versioned =
825            self.is_versioned_collection(crate::catalog::CollectionModel::Kv, collection);
826        let resolver = versioned.then(|| {
827            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
828        });
829
830        let mut entries: std::collections::BTreeMap<String, crate::storage::UnifiedEntity> =
831            std::collections::BTreeMap::new();
832        for entity in manager.query_all(|_| true) {
833            let key = match &entity.data {
834                crate::storage::EntityData::Row(row) => row
835                    .named
836                    .as_ref()
837                    .and_then(|named| named.get("key"))
838                    .and_then(|value| match value {
839                        crate::storage::schema::Value::Text(key) => Some(key.to_string()),
840                        _ => None,
841                    }),
842                _ => None,
843            };
844            let Some(key) = key else {
845                continue;
846            };
847            if prefix.is_some_and(|prefix| !key.starts_with(prefix)) {
848                continue;
849            }
850            if let Some(resolver) = resolver.as_ref() {
851                // Skip versions not visible to the current snapshot
852                // (tombstoned by a visible deleter, or not yet created).
853                if resolver.resolve_read_candidate(&entity).is_none() {
854                    continue;
855                }
856            }
857            let should_replace = match entries.get(&key) {
858                Some(existing) if versioned => entity.xmin >= existing.xmin,
859                Some(existing) => entity.id.raw() >= existing.id.raw(),
860                None => true,
861            };
862            if should_replace {
863                entries.insert(key, entity);
864            }
865        }
866        Ok(entries.into_iter().collect())
867    }
868
869    fn get_vault_entry(&self, collection: &str, key: &str) -> RedDBResult<Option<VaultEntry>> {
870        self.vault_versions(collection, key)
871            .map(super::keyed_spine::latest_version)
872    }
873
874    fn get_vault_entry_version(
875        &self,
876        collection: &str,
877        key: &str,
878        version: i64,
879    ) -> RedDBResult<Option<VaultEntry>> {
880        Ok(self
881            .vault_versions(collection, key)?
882            .into_iter()
883            .find(|entry| entry.version == version))
884    }
885
886    fn vault_versions(&self, collection: &str, key: &str) -> RedDBResult<Vec<VaultEntry>> {
887        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
888        let store = self.runtime.inner.db.store();
889        let Some(manager) = store.get_collection(collection) else {
890            return Ok(Vec::new());
891        };
892        let entities = manager.query_all(|_| true);
893        let mut versions = Vec::new();
894        for entity in entities {
895            let crate::storage::EntityData::Row(ref row) = entity.data else {
896                continue;
897            };
898            let Some(version) =
899                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
900            else {
901                continue;
902            };
903            if version.key != key {
904                continue;
905            }
906            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
907            versions.push(VaultEntry::from_keyed_row(
908                version,
909                metadata,
910                entity.created_at,
911                entity.updated_at,
912                entity.sequence_id,
913            ));
914        }
915        Ok(versions)
916    }
917
918    fn latest_vault_entries(
919        &self,
920        collection: &str,
921        prefix: Option<&str>,
922    ) -> RedDBResult<Vec<VaultEntry>> {
923        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
924        let store = self.runtime.inner.db.store();
925        let Some(manager) = store.get_collection(collection) else {
926            return Ok(Vec::new());
927        };
928        let mut versions = Vec::new();
929        for entity in manager.query_all(|_| true) {
930            let crate::storage::EntityData::Row(ref row) = entity.data else {
931                continue;
932            };
933            let Some(version) =
934                super::keyed_spine::row_version(entity.id, row, entity.sequence_id as i64)
935            else {
936                continue;
937            };
938            let metadata = manager.get_metadata(entity.id).unwrap_or_default();
939            let entry = VaultEntry::from_keyed_row(
940                version,
941                metadata,
942                entity.created_at,
943                entity.updated_at,
944                entity.sequence_id,
945            );
946            versions.push(entry);
947        }
948        Ok(super::keyed_spine::latest_versions(versions, prefix))
949    }
950
951    fn append_vault_version(
952        &self,
953        collection: &str,
954        key: &str,
955        value: crate::storage::schema::Value,
956        op: &str,
957        tombstone: bool,
958        tags: &[String],
959    ) -> RedDBResult<VaultEntry> {
960        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
961        let version = self
962            .get_vault_entry(collection, key)?
963            .map(|entry| entry.version)
964            .unwrap_or(0)
965            + 1;
966        let stored_value = if tombstone {
967            crate::storage::schema::Value::Null
968        } else {
969            self.runtime.seal_vault_value(collection, value)?
970        };
971        let now = current_unix_ms() as i64;
972        let fields = vec![
973            (
974                "key".to_string(),
975                crate::storage::schema::Value::text(key.to_string()),
976            ),
977            ("value".to_string(), stored_value),
978            (
979                "version".to_string(),
980                crate::storage::schema::Value::Integer(version),
981            ),
982            (
983                "tombstone".to_string(),
984                crate::storage::schema::Value::Boolean(tombstone),
985            ),
986            (
987                "op".to_string(),
988                crate::storage::schema::Value::text(op.to_string()),
989            ),
990            (
991                "created_at_ms".to_string(),
992                crate::storage::schema::Value::Integer(now),
993            ),
994        ];
995        let mut row = crate::storage::RowData::new(Vec::new());
996        row.named = Some(fields.into_iter().collect());
997        let entity = crate::storage::UnifiedEntity::new(
998            crate::storage::EntityId::new(0),
999            crate::storage::EntityKind::TableRow {
1000                table: std::sync::Arc::from(collection),
1001                row_id: 0,
1002            },
1003            crate::storage::EntityData::Row(row),
1004        );
1005        let id = self
1006            .runtime
1007            .inner
1008            .db
1009            .store()
1010            .insert(collection, entity)
1011            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1012        if !tags.is_empty() {
1013            self.runtime
1014                .inner
1015                .db
1016                .store()
1017                .set_metadata(
1018                    collection,
1019                    id,
1020                    Metadata::with_fields(vault_tags_metadata(tags)),
1021                )
1022                .map_err(|err| RedDBError::Internal(err.to_string()))?;
1023            self.runtime
1024                .inner
1025                .kv_tag_index
1026                .replace(collection, key, id, tags);
1027        }
1028        self.get_vault_entry_version(collection, key, version)?
1029            .ok_or_else(|| RedDBError::Internal(format!("vault version {id} was not readable")))
1030    }
1031
1032    fn purge_vault_versions(&self, collection: &str, key: &str) -> RedDBResult<usize> {
1033        self.ensure_declared_model(crate::catalog::CollectionModel::Vault, collection)?;
1034        let versions = self.vault_versions(collection, key)?;
1035        let store = self.runtime.inner.db.store();
1036        let mut purged = 0usize;
1037        for entry in versions {
1038            if store
1039                .delete(collection, entry.id)
1040                .map_err(|err| RedDBError::Internal(err.to_string()))?
1041            {
1042                store.context_index().remove_entity(entry.id);
1043                purged += 1;
1044            }
1045        }
1046        Ok(purged)
1047    }
1048
1049    fn ensure_declared_model(
1050        &self,
1051        model: crate::catalog::CollectionModel,
1052        collection: &str,
1053    ) -> RedDBResult<()> {
1054        let Some(contract) = self.runtime.inner.db.collection_contract(collection) else {
1055            return Ok(());
1056        };
1057        if contract.declared_model == model
1058            || contract.declared_model == crate::catalog::CollectionModel::Mixed
1059        {
1060            return Ok(());
1061        }
1062        Err(RedDBError::InvalidOperation(format!(
1063            "collection '{}' is declared as '{}' and does not allow '{}' operations",
1064            collection,
1065            keyed_model_name(contract.declared_model),
1066            keyed_model_name(model)
1067        )))
1068    }
1069}
1070
1071impl RedDBRuntime {
1072    pub(crate) fn seal_vault_value(
1073        &self,
1074        collection: &str,
1075        value: crate::storage::schema::Value,
1076    ) -> RedDBResult<crate::storage::schema::Value> {
1077        let key = self.vault_encryption_key(collection)?;
1078        let plaintext = value.to_bytes();
1079        let nonce_bytes = crate::auth::store::random_bytes(12);
1080        let mut nonce = [0u8; 12];
1081        nonce.copy_from_slice(&nonce_bytes[..12]);
1082        let aad = format!("reddb.vault.{collection}");
1083        let ciphertext =
1084            crate::crypto::aes_gcm::aes256_gcm_encrypt(&key, &nonce, aad.as_bytes(), &plaintext);
1085        let mut payload = Vec::with_capacity(12 + ciphertext.len());
1086        payload.extend_from_slice(&nonce);
1087        payload.extend_from_slice(&ciphertext);
1088        Ok(crate::storage::schema::Value::Secret(payload))
1089    }
1090
1091    fn vault_key_available(&self, collection: &str) -> bool {
1092        self.vault_encryption_key(collection).is_ok()
1093    }
1094
1095    fn vault_encryption_key(&self, collection: &str) -> RedDBResult<[u8; 32]> {
1096        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
1097            RedDBError::Query("vault sealed_unavailable: no key provider is configured".to_string())
1098        })?;
1099        if !auth_store.is_vault_backed() {
1100            return Err(RedDBError::Query(
1101                "vault sealed_unavailable: key provider is sealed".to_string(),
1102            ));
1103        }
1104
1105        if let Some(hex_key) = auth_store.vault_kv_get(&vault_master_key_ref(collection)) {
1106            return decode_vault_key(&hex_key);
1107        }
1108        auth_store.vault_secret_key().ok_or_else(|| {
1109            RedDBError::Query("vault sealed_unavailable: cluster vault key is missing".to_string())
1110        })
1111    }
1112
1113    fn unseal_vault_value(
1114        &self,
1115        collection: &str,
1116        sealed: &crate::storage::schema::Value,
1117    ) -> RedDBResult<crate::storage::schema::Value> {
1118        let crate::storage::schema::Value::Secret(payload) = sealed else {
1119            return Err(RedDBError::Query(
1120                "vault unseal failed: stored value is not sealed".to_string(),
1121            ));
1122        };
1123        if payload.len() < 12 {
1124            return Err(RedDBError::Query(
1125                "vault unseal failed: sealed payload is truncated".to_string(),
1126            ));
1127        }
1128        let key = self.vault_encryption_key(collection)?;
1129        let mut nonce = [0u8; 12];
1130        nonce.copy_from_slice(&payload[..12]);
1131        let aad = format!("reddb.vault.{collection}");
1132        let plaintext = crate::crypto::aes_gcm::aes256_gcm_decrypt(
1133            &key,
1134            &nonce,
1135            aad.as_bytes(),
1136            &payload[12..],
1137        )
1138        .map_err(|_| RedDBError::Query("vault unseal failed: decryption failed".to_string()))?;
1139        let (value, consumed) =
1140            crate::storage::schema::Value::from_bytes(&plaintext).map_err(|err| {
1141                RedDBError::Query(format!("vault unseal failed: bad plaintext value: {err}"))
1142            })?;
1143        if consumed != plaintext.len() {
1144            return Err(RedDBError::Query(
1145                "vault unseal failed: trailing plaintext bytes".to_string(),
1146            ));
1147        }
1148        Ok(value)
1149    }
1150
1151    /// Internal peek of a vault entry's unsealed value, bypassing audit and
1152    /// policy checks. Used by `SecretRefGuard` to detect chained `secret_ref`
1153    /// indirection at config write time without emitting vault read events.
1154    /// Returns `Ok(None)` if the entry is absent, tombstoned, or its sealed
1155    /// payload cannot be decrypted with the available key material.
1156    pub(crate) fn peek_vault_unsealed(
1157        &self,
1158        collection: &str,
1159        key: &str,
1160    ) -> RedDBResult<Option<crate::storage::schema::Value>> {
1161        let ops = KvAtomicOps::new(self);
1162        let Some(entry) = ops.get_vault_entry(collection, key)? else {
1163            return Ok(None);
1164        };
1165        if entry.tombstone {
1166            return Ok(None);
1167        }
1168        Ok(self.unseal_vault_value(collection, &entry.value).ok())
1169    }
1170
1171    fn vault_target_resource(collection: &str, key: &str) -> String {
1172        if collection == "red.vault" {
1173            return format!("red.vault/{}", key.to_ascii_lowercase());
1174        }
1175        format!("{collection}.{key}")
1176    }
1177
1178    fn current_vault_actor() -> String {
1179        current_auth_identity()
1180            .map(|(principal, _)| principal)
1181            .unwrap_or_else(|| "anonymous".to_string())
1182    }
1183
1184    fn vault_request_id() -> String {
1185        let conn_id = current_connection_id();
1186        if conn_id == 0 {
1187            "embedded".to_string()
1188        } else {
1189            format!("conn-{conn_id}")
1190        }
1191    }
1192
1193    fn check_vault_capability(
1194        &self,
1195        action: &str,
1196        collection: &str,
1197        key: &str,
1198    ) -> Result<(), String> {
1199        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1200            return Ok(());
1201        };
1202        if !auth_store.iam_authorization_enabled() {
1203            return Ok(());
1204        }
1205        let Some((principal, role)) = current_auth_identity() else {
1206            return Err(
1207                "IAM authorization is enabled; vault capability check requires an authenticated principal"
1208                    .to_string(),
1209            );
1210        };
1211        let tenant = current_tenant();
1212        let principal_id = crate::auth::UserId::from_parts(tenant.as_deref(), &principal);
1213        let mut resource = crate::auth::policies::ResourceRef::new(
1214            "vault",
1215            Self::vault_target_resource(collection, key),
1216        );
1217        if let Some(ref tenant) = tenant {
1218            resource = resource.with_tenant(tenant.clone());
1219        }
1220        let ctx = crate::auth::policies::EvalContext {
1221            principal_tenant: tenant.clone(),
1222            current_tenant: tenant,
1223            peer_ip: None,
1224            mfa_present: false,
1225            now_ms: crate::utils::now_unix_millis() as u128,
1226            principal_is_admin_role: role == crate::auth::Role::Admin,
1227            principal_is_platform_scoped: principal_id.tenant.is_none(),
1228        };
1229        if auth_store.check_policy_authz_with_role(&principal_id, action, &resource, &ctx, role) {
1230            Ok(())
1231        } else {
1232            Err(format!(
1233                "principal=`{}` action=`{}` resource=`vault:{}` denied by IAM policy",
1234                principal,
1235                action,
1236                Self::vault_target_resource(collection, key)
1237            ))
1238        }
1239    }
1240
1241    fn check_system_vault_capability(
1242        &self,
1243        action: &str,
1244        collection: &str,
1245        key: &str,
1246    ) -> Result<(), String> {
1247        if collection != "red.vault" {
1248            return Ok(());
1249        }
1250        self.check_vault_capability(action, collection, key)
1251    }
1252
1253    fn audit_vault_unseal(
1254        &self,
1255        collection: &str,
1256        key: &str,
1257        outcome: crate::runtime::audit_log::Outcome,
1258        reason: &str,
1259        entry: Option<&VaultEntry>,
1260    ) {
1261        let actor = Self::current_vault_actor();
1262        let request_id = Self::vault_request_id();
1263        let mut builder = crate::runtime::audit_log::AuditEvent::builder("vault/unseal")
1264            .principal(actor.clone())
1265            .source(crate::runtime::audit_log::AuditAuthSource::Password)
1266            .resource(format!(
1267                "vault:{}",
1268                Self::vault_target_resource(collection, key)
1269            ))
1270            .outcome(outcome)
1271            .correlation_id(request_id.clone())
1272            .fields([
1273                crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1274                crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1275                crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1276                crate::runtime::audit_log::AuditFieldEscaper::field(
1277                    "target",
1278                    Self::vault_target_resource(collection, key),
1279                ),
1280                crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1281                crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1282                crate::runtime::audit_log::AuditFieldEscaper::field(
1283                    "connection_id",
1284                    current_connection_id(),
1285                ),
1286            ]);
1287        if let Some(tenant) = current_tenant() {
1288            builder = builder.tenant(tenant);
1289        }
1290        if let Some(entry) = entry {
1291            builder = builder.fields([
1292                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1293                crate::runtime::audit_log::AuditFieldEscaper::field(
1294                    "sequence_id",
1295                    entry.sequence_id,
1296                ),
1297            ]);
1298        }
1299        self.audit_log().record_event(builder.build());
1300    }
1301
1302    fn audit_vault_lifecycle(
1303        &self,
1304        operation: &str,
1305        collection: &str,
1306        key: &str,
1307        outcome: crate::runtime::audit_log::Outcome,
1308        reason: &str,
1309        entry: Option<&VaultEntry>,
1310    ) {
1311        let actor = Self::current_vault_actor();
1312        let request_id = Self::vault_request_id();
1313        let mut builder =
1314            crate::runtime::audit_log::AuditEvent::builder(format!("vault/{operation}"))
1315                .principal(actor.clone())
1316                .source(crate::runtime::audit_log::AuditAuthSource::Password)
1317                .resource(format!(
1318                    "vault:{}",
1319                    Self::vault_target_resource(collection, key)
1320                ))
1321                .outcome(outcome)
1322                .correlation_id(request_id.clone())
1323                .fields([
1324                    crate::runtime::audit_log::AuditFieldEscaper::field("actor", actor),
1325                    crate::runtime::audit_log::AuditFieldEscaper::field("collection", collection),
1326                    crate::runtime::audit_log::AuditFieldEscaper::field("key", key),
1327                    crate::runtime::audit_log::AuditFieldEscaper::field(
1328                        "target",
1329                        Self::vault_target_resource(collection, key),
1330                    ),
1331                    crate::runtime::audit_log::AuditFieldEscaper::field("reason", reason),
1332                    crate::runtime::audit_log::AuditFieldEscaper::field("request_id", request_id),
1333                    crate::runtime::audit_log::AuditFieldEscaper::field(
1334                        "connection_id",
1335                        current_connection_id(),
1336                    ),
1337                ]);
1338        if let Some(tenant) = current_tenant() {
1339            builder = builder.tenant(tenant);
1340        }
1341        if let Some(entry) = entry {
1342            builder = builder.fields([
1343                crate::runtime::audit_log::AuditFieldEscaper::field("entity_id", entry.id.raw()),
1344                crate::runtime::audit_log::AuditFieldEscaper::field("version", entry.version),
1345                crate::runtime::audit_log::AuditFieldEscaper::field(
1346                    "sequence_id",
1347                    entry.sequence_id,
1348                ),
1349            ]);
1350        }
1351        self.audit_log().record_event(builder.build());
1352    }
1353
1354    fn emit_vault_control_event(
1355        &self,
1356        kind: crate::runtime::control_events::EventKind,
1357        outcome: crate::runtime::control_events::Outcome,
1358        action: &'static str,
1359        collection: &str,
1360        key: &str,
1361        reason: &str,
1362        entry: Option<&VaultEntry>,
1363        extra_fields: Vec<(String, crate::runtime::control_events::Sensitivity)>,
1364    ) -> RedDBResult<()> {
1365        use crate::runtime::control_events::{
1366            ActorRef, ControlEvent, ControlEventCtx, ControlEventLedger, Sensitivity,
1367        };
1368        use std::borrow::Cow;
1369
1370        let tenant = current_tenant();
1371        let principal = current_auth_identity().map(|(principal, _)| principal);
1372        let actor_user = principal
1373            .as_ref()
1374            .map(|principal| crate::auth::UserId::from_parts(tenant.as_deref(), principal));
1375        let request_id = Self::vault_request_id();
1376        let actor = actor_user
1377            .as_ref()
1378            .map(ActorRef::User)
1379            .unwrap_or(ActorRef::Anonymous);
1380        let ctx = ControlEventCtx {
1381            actor,
1382            scope: tenant.as_ref().map(|scope| Cow::Borrowed(scope.as_str())),
1383            request_id: Some(Cow::Borrowed(request_id.as_str())),
1384            trace_id: None,
1385        };
1386
1387        let target = Self::vault_target_resource(collection, key);
1388        let mut fields = std::collections::HashMap::new();
1389        fields.insert("path".to_string(), Sensitivity::raw(target.clone()));
1390        fields.insert("collection".to_string(), Sensitivity::raw(collection));
1391        fields.insert("key".to_string(), Sensitivity::raw(key));
1392        fields.insert(
1393            "connection_id".to_string(),
1394            Sensitivity::raw(current_connection_id().to_string()),
1395        );
1396        if let Some(entry) = entry {
1397            fields.insert(
1398                "entity_id".to_string(),
1399                Sensitivity::raw(entry.id.raw().to_string()),
1400            );
1401            fields.insert(
1402                "sequence_id".to_string(),
1403                Sensitivity::raw(entry.sequence_id.to_string()),
1404            );
1405            fields.insert(
1406                "version".to_string(),
1407                Sensitivity::raw(entry.version.to_string()),
1408            );
1409            fields.insert("op".to_string(), Sensitivity::raw(entry.op.clone()));
1410            fields.insert(
1411                "tombstone".to_string(),
1412                Sensitivity::raw(entry.tombstone.to_string()),
1413            );
1414            if !entry.tombstone {
1415                fields.insert(
1416                    "fingerprint".to_string(),
1417                    Sensitivity::raw(vault_fingerprint(&entry.value)),
1418                );
1419            }
1420            fields.insert(
1421                "tags".to_string(),
1422                Sensitivity::raw(format!("{:?}", vault_tags_value(&entry.metadata))),
1423            );
1424        }
1425        for (key, value) in extra_fields {
1426            fields.insert(key, value);
1427        }
1428
1429        let event = ControlEvent {
1430            kind,
1431            outcome,
1432            action: Cow::Borrowed(action),
1433            resource: Some(format!("vault:{target}")),
1434            reason: Some(reason.to_string()),
1435            matched_policy_id: None,
1436            fields,
1437        };
1438        let ledger = self.inner.control_event_ledger.read();
1439        match ledger.emit(&ctx, event) {
1440            Ok(_) => Ok(()),
1441            Err(err) if self.inner.control_event_config.require_persistence() => {
1442                Err(RedDBError::Internal(err.to_string()))
1443            }
1444            Err(_) => Ok(()),
1445        }
1446    }
1447
1448    pub(crate) fn resolve_vault_secret_value(
1449        &self,
1450        collection: &str,
1451        key: &str,
1452    ) -> RedDBResult<Value> {
1453        let ops = KvAtomicOps::new(self);
1454        let entry = ops.get_vault_entry(collection, key)?;
1455        if let Err(reason) = self.check_vault_capability("vault:read", collection, key) {
1456            self.audit_vault_unseal(
1457                collection,
1458                key,
1459                crate::runtime::audit_log::Outcome::Denied,
1460                &reason,
1461                entry.as_ref(),
1462            );
1463            self.emit_vault_control_event(
1464                crate::runtime::control_events::EventKind::VaultRead,
1465                crate::runtime::control_events::Outcome::Denied,
1466                "vault:read",
1467                collection,
1468                key,
1469                &reason,
1470                entry.as_ref(),
1471                Vec::new(),
1472            )?;
1473            return Err(RedDBError::Query(reason));
1474        }
1475        let Some(entry) = entry else {
1476            let reason = "not_found";
1477            self.audit_vault_unseal(
1478                collection,
1479                key,
1480                crate::runtime::audit_log::Outcome::Denied,
1481                reason,
1482                None,
1483            );
1484            self.emit_vault_control_event(
1485                crate::runtime::control_events::EventKind::VaultRead,
1486                crate::runtime::control_events::Outcome::Denied,
1487                "vault:read",
1488                collection,
1489                key,
1490                reason,
1491                None,
1492                Vec::new(),
1493            )?;
1494            return Err(RedDBError::NotFound(format!(
1495                "vault secret '{}.{}' not found",
1496                collection, key
1497            )));
1498        };
1499        if entry.tombstone {
1500            let reason = "deleted";
1501            self.audit_vault_unseal(
1502                collection,
1503                key,
1504                crate::runtime::audit_log::Outcome::Denied,
1505                reason,
1506                Some(&entry),
1507            );
1508            self.emit_vault_control_event(
1509                crate::runtime::control_events::EventKind::VaultRead,
1510                crate::runtime::control_events::Outcome::Denied,
1511                "vault:read",
1512                collection,
1513                key,
1514                reason,
1515                Some(&entry),
1516                Vec::new(),
1517            )?;
1518            return Err(RedDBError::NotFound(format!(
1519                "vault secret '{}.{}' is deleted",
1520                collection, key
1521            )));
1522        }
1523        match self.unseal_vault_value(collection, &entry.value) {
1524            Ok(value) => {
1525                self.audit_vault_unseal(
1526                    collection,
1527                    key,
1528                    crate::runtime::audit_log::Outcome::Success,
1529                    "ok",
1530                    Some(&entry),
1531                );
1532                self.emit_vault_control_event(
1533                    crate::runtime::control_events::EventKind::VaultRead,
1534                    crate::runtime::control_events::Outcome::Allowed,
1535                    "vault:read",
1536                    collection,
1537                    key,
1538                    "ok",
1539                    Some(&entry),
1540                    Vec::new(),
1541                )?;
1542                Ok(value)
1543            }
1544            Err(err) => {
1545                let reason = err.to_string();
1546                self.audit_vault_unseal(
1547                    collection,
1548                    key,
1549                    crate::runtime::audit_log::Outcome::Error,
1550                    &reason,
1551                    Some(&entry),
1552                );
1553                self.emit_vault_control_event(
1554                    crate::runtime::control_events::EventKind::VaultRead,
1555                    crate::runtime::control_events::Outcome::Error,
1556                    "vault:read",
1557                    collection,
1558                    key,
1559                    &reason,
1560                    Some(&entry),
1561                    Vec::new(),
1562                )?;
1563                Err(err)
1564            }
1565        }
1566    }
1567
1568    /// Dispatch a `KV PUT / GET / DELETE` command.
1569    pub fn execute_kv_command(
1570        &self,
1571        raw_query: &str,
1572        cmd: &crate::storage::query::ast::KvCommand,
1573    ) -> RedDBResult<RuntimeQueryResult> {
1574        use crate::storage::query::ast::KvCommand;
1575
1576        let ops = KvAtomicOps::new(self);
1577
1578        match cmd {
1579            KvCommand::Put {
1580                model,
1581                collection,
1582                key,
1583                value,
1584                ttl_ms,
1585                tags,
1586                if_not_exists,
1587            } => {
1588                if *model == crate::catalog::CollectionModel::Vault {
1589                    self.check_system_vault_capability("vault:write", collection, key)
1590                        .map_err(RedDBError::Query)?;
1591                }
1592                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1593                let (created, id) = ops.set_with_tags_for_model(
1594                    *model,
1595                    collection,
1596                    key,
1597                    value.clone(),
1598                    *ttl_ms,
1599                    tags,
1600                    *if_not_exists,
1601                )?;
1602
1603                let mut result = UnifiedResult::with_columns(vec![
1604                    "ok".into(),
1605                    "collection".into(),
1606                    "key".into(),
1607                    "id".into(),
1608                    "created".into(),
1609                    "tags".into(),
1610                ]);
1611                let mut record = UnifiedRecord::new();
1612                record.set("ok", Value::Boolean(true));
1613                record.set("collection", Value::text(collection.clone()));
1614                record.set("key", Value::text(key.clone()));
1615                record.set("id", Value::Integer(id.raw() as i64));
1616                record.set("created", Value::Boolean(created));
1617                record.set("tags", kv_tags_value(tags));
1618                result.push(record);
1619
1620                Ok(RuntimeQueryResult {
1621                    query: raw_query.to_string(),
1622                    mode: crate::storage::query::modes::QueryMode::Sql,
1623                    statement: if *model == crate::catalog::CollectionModel::Vault {
1624                        "vault_put"
1625                    } else {
1626                        "kv_put"
1627                    },
1628                    engine: if *model == crate::catalog::CollectionModel::Vault {
1629                        "vault"
1630                    } else {
1631                        "kv"
1632                    },
1633                    result,
1634                    affected_rows: 1,
1635                    statement_type: if created { "insert" } else { "update" },
1636                    bookmark: None,
1637                })
1638            }
1639            KvCommand::InvalidateTags { collection, tags } => {
1640                let invalidated = ops.invalidate_tags(collection, tags)?;
1641
1642                let mut result = UnifiedResult::with_columns(vec![
1643                    "ok".into(),
1644                    "collection".into(),
1645                    "invalidated".into(),
1646                    "tags".into(),
1647                ]);
1648                let mut record = UnifiedRecord::new();
1649                record.set("ok", Value::Boolean(true));
1650                record.set("collection", Value::text(collection.clone()));
1651                record.set("invalidated", Value::Integer(invalidated as i64));
1652                record.set("tags", kv_tags_value(tags));
1653                result.push(record);
1654
1655                Ok(RuntimeQueryResult {
1656                    query: raw_query.to_string(),
1657                    mode: crate::storage::query::modes::QueryMode::Sql,
1658                    statement: "kv_invalidate_tags",
1659                    engine: "kv",
1660                    result,
1661                    affected_rows: invalidated as u64,
1662                    statement_type: "delete",
1663                    bookmark: None,
1664                })
1665            }
1666
1667            KvCommand::Rotate {
1668                collection,
1669                key,
1670                value,
1671                tags,
1672            } => {
1673                self.check_system_vault_capability("vault:write", collection, key)
1674                    .map_err(RedDBError::Query)?;
1675                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1676                let entry = ops.append_vault_version(
1677                    collection,
1678                    key,
1679                    value.clone(),
1680                    "rotate",
1681                    false,
1682                    tags,
1683                )?;
1684                self.record_kv_watch_event(
1685                    crate::replication::cdc::ChangeOperation::Update,
1686                    collection,
1687                    key,
1688                    entry.id.raw(),
1689                    None,
1690                    Some(vault_entry_metadata_json(&entry)),
1691                );
1692                self.audit_vault_lifecycle(
1693                    "rotate",
1694                    collection,
1695                    key,
1696                    crate::runtime::audit_log::Outcome::Success,
1697                    "ok",
1698                    Some(&entry),
1699                );
1700                self.emit_vault_control_event(
1701                    crate::runtime::control_events::EventKind::VaultRotate,
1702                    crate::runtime::control_events::Outcome::Allowed,
1703                    "vault:rotate",
1704                    collection,
1705                    key,
1706                    "ok",
1707                    Some(&entry),
1708                    Vec::new(),
1709                )?;
1710                Ok(vault_write_result(
1711                    raw_query,
1712                    "vault_rotate",
1713                    "update",
1714                    collection,
1715                    key,
1716                    &entry,
1717                    1,
1718                ))
1719            }
1720
1721            KvCommand::List {
1722                model,
1723                collection,
1724                prefix,
1725                limit,
1726                offset,
1727                as_json,
1728            } => {
1729                if *model == crate::catalog::CollectionModel::Vault {
1730                    if *as_json {
1731                        return Err(RedDBError::Query(
1732                            "LIST VAULT AS JSON is not supported; vault list only exposes metadata"
1733                                .to_string(),
1734                        ));
1735                    }
1736                    let mut entries = ops.latest_vault_entries(collection, prefix.as_deref())?;
1737                    entries.sort_by(|left, right| left.key.cmp(&right.key));
1738                    let mut result = UnifiedResult::with_columns(vec![
1739                        "collection".into(),
1740                        "key".into(),
1741                        "version".into(),
1742                        "fingerprint".into(),
1743                        "tags".into(),
1744                        "created_at".into(),
1745                        "updated_at".into(),
1746                        "status".into(),
1747                        "tombstone".into(),
1748                        "op".into(),
1749                    ]);
1750                    let mut visible = Vec::new();
1751                    for entry in entries {
1752                        match self.check_vault_capability(
1753                            "vault:read_metadata",
1754                            collection,
1755                            &entry.key,
1756                        ) {
1757                            Ok(()) => {
1758                                self.emit_vault_control_event(
1759                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1760                                    crate::runtime::control_events::Outcome::Allowed,
1761                                    "vault:read_metadata",
1762                                    collection,
1763                                    &entry.key,
1764                                    "ok",
1765                                    Some(&entry),
1766                                    Vec::new(),
1767                                )?;
1768                                visible.push(entry);
1769                            }
1770                            Err(reason) => {
1771                                self.emit_vault_control_event(
1772                                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1773                                    crate::runtime::control_events::Outcome::Denied,
1774                                    "vault:read_metadata",
1775                                    collection,
1776                                    &entry.key,
1777                                    &reason,
1778                                    Some(&entry),
1779                                    Vec::new(),
1780                                )?;
1781                            }
1782                        }
1783                    }
1784                    for entry in visible
1785                        .into_iter()
1786                        .skip(*offset)
1787                        .take(limit.unwrap_or(usize::MAX))
1788                    {
1789                        push_vault_metadata_record(&mut result, collection, &entry.key, &entry);
1790                    }
1791                    Ok(RuntimeQueryResult {
1792                        query: raw_query.to_string(),
1793                        mode: crate::storage::query::modes::QueryMode::Sql,
1794                        statement: "vault_list",
1795                        engine: "vault",
1796                        result,
1797                        affected_rows: 0,
1798                        statement_type: "select",
1799                        bookmark: None,
1800                    })
1801                } else {
1802                    let mut result = UnifiedResult::with_columns(vec![
1803                        "rid".into(),
1804                        "collection".into(),
1805                        "kind".into(),
1806                        "tenant".into(),
1807                        "created_at".into(),
1808                        "updated_at".into(),
1809                        "key".into(),
1810                        "value".into(),
1811                        "tags".into(),
1812                    ]);
1813                    let entries = ops.latest_kv_entries(collection, prefix.as_deref())?;
1814                    if *as_json {
1815                        let mut tree = crate::json::Value::Object(crate::json::Map::new());
1816                        for (key, entity) in entries
1817                            .into_iter()
1818                            .skip(*offset)
1819                            .take(limit.unwrap_or(usize::MAX))
1820                        {
1821                            let Some(value) = kv_value_from_entity(&entity) else {
1822                                continue;
1823                            };
1824                            let relative = match prefix {
1825                                Some(pfx) if key == *pfx => "",
1826                                Some(pfx) => key
1827                                    .strip_prefix(pfx.as_str())
1828                                    .map(|tail| tail.strip_prefix('.').unwrap_or(tail))
1829                                    .unwrap_or(key.as_str()),
1830                                None => key.as_str(),
1831                            };
1832                            insert_kv_json_path(
1833                                &mut tree,
1834                                relative,
1835                                crate::presentation::entity_json::storage_value_to_json(&value),
1836                            );
1837                        }
1838                        Ok(kv_list_json_result(raw_query, collection, prefix, tree))
1839                    } else {
1840                        for (key, entity) in entries
1841                            .into_iter()
1842                            .skip(*offset)
1843                            .take(limit.unwrap_or(usize::MAX))
1844                        {
1845                            let mut record = UnifiedRecord::new();
1846                            record.set("rid", Value::UnsignedInteger(entity.id.raw()));
1847                            record.set("collection", Value::text(collection.clone()));
1848                            record.set("kind", Value::text("kv"));
1849                            record.set("tenant", Value::Null);
1850                            record.set("created_at", Value::UnsignedInteger(entity.created_at));
1851                            record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
1852                            record.set("key", Value::text(key.clone()));
1853                            record.set(
1854                                "value",
1855                                kv_value_from_entity(&entity)
1856                                    .unwrap_or(crate::storage::schema::Value::Null),
1857                            );
1858                            record.set("tags", kv_tags_value(&ops.tags_for_key(collection, &key)));
1859                            result.push(record);
1860                        }
1861                        Ok(RuntimeQueryResult {
1862                            query: raw_query.to_string(),
1863                            mode: crate::storage::query::modes::QueryMode::Sql,
1864                            statement: "kv_list",
1865                            engine: "kv",
1866                            result,
1867                            affected_rows: 0,
1868                            statement_type: "select",
1869                            bookmark: None,
1870                        })
1871                    }
1872                }
1873            }
1874
1875            KvCommand::History { collection, key } => {
1876                let latest = ops.get_vault_entry(collection, key)?;
1877                if let Err(reason) =
1878                    self.check_vault_capability("vault:read_metadata", collection, key)
1879                {
1880                    self.emit_vault_control_event(
1881                        crate::runtime::control_events::EventKind::VaultMetadataRead,
1882                        crate::runtime::control_events::Outcome::Denied,
1883                        "vault:read_metadata",
1884                        collection,
1885                        key,
1886                        &reason,
1887                        latest.as_ref(),
1888                        Vec::new(),
1889                    )?;
1890                    return Err(RedDBError::Query(reason));
1891                }
1892                let versions =
1893                    super::keyed_spine::history_versions(ops.vault_versions(collection, key)?);
1894                let result = vault_history_result(collection, key, &versions);
1895                self.emit_vault_control_event(
1896                    crate::runtime::control_events::EventKind::VaultMetadataRead,
1897                    crate::runtime::control_events::Outcome::Allowed,
1898                    "vault:read_metadata",
1899                    collection,
1900                    key,
1901                    "ok",
1902                    latest.as_ref(),
1903                    Vec::new(),
1904                )?;
1905                Ok(RuntimeQueryResult {
1906                    query: raw_query.to_string(),
1907                    mode: crate::storage::query::modes::QueryMode::Sql,
1908                    statement: "vault_history",
1909                    engine: "vault",
1910                    result,
1911                    affected_rows: 0,
1912                    statement_type: "select",
1913                    bookmark: None,
1914                })
1915            }
1916
1917            KvCommand::Purge { collection, key } => {
1918                let entry = ops.get_vault_entry(collection, key)?;
1919                if let Err(reason) = self.check_vault_capability("vault:purge", collection, key) {
1920                    self.audit_vault_lifecycle(
1921                        "purge",
1922                        collection,
1923                        key,
1924                        crate::runtime::audit_log::Outcome::Denied,
1925                        &reason,
1926                        entry.as_ref(),
1927                    );
1928                    self.emit_vault_control_event(
1929                        crate::runtime::control_events::EventKind::VaultPurge,
1930                        crate::runtime::control_events::Outcome::Denied,
1931                        "vault:purge",
1932                        collection,
1933                        key,
1934                        &reason,
1935                        entry.as_ref(),
1936                        Vec::new(),
1937                    )?;
1938                    return Err(RedDBError::Query(reason));
1939                }
1940                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1941                let purged = ops.purge_vault_versions(collection, key)?;
1942                self.audit_vault_lifecycle(
1943                    "purge",
1944                    collection,
1945                    key,
1946                    crate::runtime::audit_log::Outcome::Success,
1947                    "ok",
1948                    entry.as_ref(),
1949                );
1950                self.emit_vault_control_event(
1951                    crate::runtime::control_events::EventKind::VaultPurge,
1952                    crate::runtime::control_events::Outcome::Allowed,
1953                    "vault:purge",
1954                    collection,
1955                    key,
1956                    "ok",
1957                    entry.as_ref(),
1958                    vec![(
1959                        "purged".to_string(),
1960                        crate::runtime::control_events::Sensitivity::raw(purged.to_string()),
1961                    )],
1962                )?;
1963                let mut result = UnifiedResult::with_columns(vec![
1964                    "ok".into(),
1965                    "collection".into(),
1966                    "key".into(),
1967                    "purged".into(),
1968                ]);
1969                let mut record = UnifiedRecord::new();
1970                record.set("ok", Value::Boolean(true));
1971                record.set("collection", Value::text(collection.clone()));
1972                record.set("key", Value::text(key.clone()));
1973                record.set("purged", Value::Integer(purged as i64));
1974                result.push(record);
1975                Ok(RuntimeQueryResult {
1976                    query: raw_query.to_string(),
1977                    mode: crate::storage::query::modes::QueryMode::Sql,
1978                    statement: "vault_purge",
1979                    engine: "vault",
1980                    result,
1981                    affected_rows: purged as u64,
1982                    statement_type: "delete",
1983                    bookmark: None,
1984                })
1985            }
1986
1987            KvCommand::Get {
1988                model,
1989                collection,
1990                key,
1991            } => {
1992                if *model == crate::catalog::CollectionModel::Vault {
1993                    let entry = ops.get_vault_entry(collection, key)?;
1994                    if let Err(reason) =
1995                        self.check_vault_capability("vault:read_metadata", collection, key)
1996                    {
1997                        self.emit_vault_control_event(
1998                            crate::runtime::control_events::EventKind::VaultMetadataRead,
1999                            crate::runtime::control_events::Outcome::Denied,
2000                            "vault:read_metadata",
2001                            collection,
2002                            key,
2003                            &reason,
2004                            entry.as_ref(),
2005                            Vec::new(),
2006                        )?;
2007                        return Err(RedDBError::Query(reason));
2008                    }
2009                    let key_available = self.vault_key_available(collection);
2010                    let result =
2011                        vault_metadata_result(collection, key, entry.as_ref(), key_available);
2012                    self.emit_vault_control_event(
2013                        crate::runtime::control_events::EventKind::VaultMetadataRead,
2014                        crate::runtime::control_events::Outcome::Allowed,
2015                        "vault:read_metadata",
2016                        collection,
2017                        key,
2018                        "ok",
2019                        entry.as_ref(),
2020                        Vec::new(),
2021                    )?;
2022                    return Ok(RuntimeQueryResult {
2023                        query: raw_query.to_string(),
2024                        mode: crate::storage::query::modes::QueryMode::Sql,
2025                        statement: "vault_get",
2026                        engine: "vault",
2027                        result,
2028                        affected_rows: 0,
2029                        statement_type: "select",
2030                        bookmark: None,
2031                    });
2032                }
2033
2034                let entity = ops.get_entity(*model, collection, key)?;
2035                let value = entity.as_ref().and_then(kv_value_from_entity);
2036                if *model == crate::catalog::CollectionModel::Kv {
2037                    self.inner.kv_stats.incr_gets();
2038                }
2039                let mut result = UnifiedResult::with_columns(vec![
2040                    "rid".into(),
2041                    "collection".into(),
2042                    "kind".into(),
2043                    "tenant".into(),
2044                    "created_at".into(),
2045                    "updated_at".into(),
2046                    "key".into(),
2047                    "value".into(),
2048                    "tags".into(),
2049                ]);
2050                let mut record = UnifiedRecord::new();
2051                if let Some(entity) = entity.as_ref() {
2052                    record.set("rid", Value::UnsignedInteger(entity.id.raw()));
2053                    record.set("created_at", Value::UnsignedInteger(entity.created_at));
2054                    record.set("updated_at", Value::UnsignedInteger(entity.updated_at));
2055                } else {
2056                    record.set("rid", Value::Null);
2057                    record.set("created_at", Value::Null);
2058                    record.set("updated_at", Value::Null);
2059                }
2060                record.set("collection", Value::text(collection.clone()));
2061                record.set("kind", Value::text(keyed_model_name(*model).to_string()));
2062                record.set("tenant", Value::Null);
2063                record.set("key", Value::text(key.clone()));
2064                record.set(
2065                    "value",
2066                    value.unwrap_or(crate::storage::schema::Value::Null),
2067                );
2068                record.set("tags", kv_tags_value(&ops.tags_for_key(collection, key)));
2069                result.push(record);
2070
2071                Ok(RuntimeQueryResult {
2072                    query: raw_query.to_string(),
2073                    mode: crate::storage::query::modes::QueryMode::Sql,
2074                    statement: "kv_get",
2075                    engine: "kv",
2076                    result,
2077                    affected_rows: 0,
2078                    statement_type: "select",
2079                    bookmark: None,
2080                })
2081            }
2082            KvCommand::Watch {
2083                model,
2084                collection,
2085                key,
2086                prefix,
2087                from_lsn,
2088            } => {
2089                let watch_key = if *prefix {
2090                    format!("{key}.*")
2091                } else {
2092                    key.clone()
2093                };
2094                let endpoint = match from_lsn {
2095                    Some(lsn) => format!(
2096                        "/collections/{collection}/{}/{watch_key}/watch?since_lsn={lsn}",
2097                        keyed_model_name(*model)
2098                    ),
2099                    None => format!(
2100                        "/collections/{collection}/{}/{watch_key}/watch",
2101                        keyed_model_name(*model)
2102                    ),
2103                };
2104                let mut result = UnifiedResult::with_columns(vec![
2105                    "collection".into(),
2106                    "key".into(),
2107                    "prefix".into(),
2108                    "from_lsn".into(),
2109                    "watch_url".into(),
2110                    "streaming".into(),
2111                ]);
2112                let mut record = UnifiedRecord::new();
2113                record.set("collection", Value::text(collection.clone()));
2114                record.set("key", Value::text(watch_key));
2115                record.set("prefix", Value::Boolean(*prefix));
2116                record.set(
2117                    "from_lsn",
2118                    from_lsn
2119                        .map(Value::UnsignedInteger)
2120                        .unwrap_or(crate::storage::schema::Value::Null),
2121                );
2122                record.set("watch_url", Value::text(endpoint));
2123                record.set("streaming", Value::Boolean(true));
2124                result.push(record);
2125
2126                Ok(RuntimeQueryResult {
2127                    query: raw_query.to_string(),
2128                    mode: crate::storage::query::modes::QueryMode::Sql,
2129                    statement: "kv_watch",
2130                    engine: keyed_model_name(*model),
2131                    result,
2132                    affected_rows: 0,
2133                    statement_type: "stream",
2134                    bookmark: None,
2135                })
2136            }
2137
2138            KvCommand::Unseal {
2139                collection,
2140                key,
2141                version,
2142            } => {
2143                let latest = ops.get_vault_entry(collection, key)?;
2144                let entry = match version {
2145                    Some(version) => ops.get_vault_entry_version(collection, key, *version)?,
2146                    None => latest.clone(),
2147                };
2148                let action = match (version, latest.as_ref()) {
2149                    (Some(requested), Some(latest)) if *requested == latest.version => "vault:read",
2150                    (Some(_), _) => "vault:unseal_history",
2151                    _ => "vault:read",
2152                };
2153                let event_kind = if action == "vault:read" {
2154                    crate::runtime::control_events::EventKind::VaultRead
2155                } else {
2156                    crate::runtime::control_events::EventKind::VaultUnseal
2157                };
2158                if let Err(reason) = self.check_vault_capability(action, collection, key) {
2159                    self.audit_vault_unseal(
2160                        collection,
2161                        key,
2162                        crate::runtime::audit_log::Outcome::Denied,
2163                        &reason,
2164                        entry.as_ref(),
2165                    );
2166                    self.emit_vault_control_event(
2167                        event_kind,
2168                        crate::runtime::control_events::Outcome::Denied,
2169                        action,
2170                        collection,
2171                        key,
2172                        &reason,
2173                        entry.as_ref(),
2174                        Vec::new(),
2175                    )?;
2176                    return Err(RedDBError::Query(reason));
2177                }
2178                let Some(entry) = entry else {
2179                    let reason = "not_found";
2180                    self.audit_vault_unseal(
2181                        collection,
2182                        key,
2183                        crate::runtime::audit_log::Outcome::Denied,
2184                        reason,
2185                        None,
2186                    );
2187                    self.emit_vault_control_event(
2188                        event_kind,
2189                        crate::runtime::control_events::Outcome::Denied,
2190                        action,
2191                        collection,
2192                        key,
2193                        reason,
2194                        None,
2195                        Vec::new(),
2196                    )?;
2197                    return Err(RedDBError::NotFound(format!(
2198                        "vault secret '{}.{}' not found",
2199                        collection, key
2200                    )));
2201                };
2202                if entry.tombstone {
2203                    let reason = "deleted";
2204                    self.audit_vault_unseal(
2205                        collection,
2206                        key,
2207                        crate::runtime::audit_log::Outcome::Denied,
2208                        reason,
2209                        Some(&entry),
2210                    );
2211                    self.emit_vault_control_event(
2212                        event_kind,
2213                        crate::runtime::control_events::Outcome::Denied,
2214                        action,
2215                        collection,
2216                        key,
2217                        reason,
2218                        Some(&entry),
2219                        Vec::new(),
2220                    )?;
2221                    return Err(RedDBError::NotFound(format!(
2222                        "vault secret '{}.{}' is deleted",
2223                        collection, key
2224                    )));
2225                }
2226                match self.unseal_vault_value(collection, &entry.value) {
2227                    Ok(value) => {
2228                        self.audit_vault_unseal(
2229                            collection,
2230                            key,
2231                            crate::runtime::audit_log::Outcome::Success,
2232                            "ok",
2233                            Some(&entry),
2234                        );
2235                        self.emit_vault_control_event(
2236                            event_kind,
2237                            crate::runtime::control_events::Outcome::Allowed,
2238                            action,
2239                            collection,
2240                            key,
2241                            "ok",
2242                            Some(&entry),
2243                            Vec::new(),
2244                        )?;
2245                        let mut result = UnifiedResult::with_columns(vec![
2246                            "collection".into(),
2247                            "key".into(),
2248                            "value".into(),
2249                        ]);
2250                        let mut record = UnifiedRecord::new();
2251                        record.set("collection", Value::text(collection.clone()));
2252                        record.set("key", Value::text(key.clone()));
2253                        record.set("value", value);
2254                        result.push(record);
2255                        Ok(RuntimeQueryResult {
2256                            query: raw_query.to_string(),
2257                            mode: crate::storage::query::modes::QueryMode::Sql,
2258                            statement: "vault_unseal",
2259                            engine: "vault",
2260                            result,
2261                            affected_rows: 0,
2262                            statement_type: "select",
2263                            bookmark: None,
2264                        })
2265                    }
2266                    Err(err) => {
2267                        let reason = err.to_string();
2268                        self.audit_vault_unseal(
2269                            collection,
2270                            key,
2271                            crate::runtime::audit_log::Outcome::Error,
2272                            &reason,
2273                            Some(&entry),
2274                        );
2275                        self.emit_vault_control_event(
2276                            event_kind,
2277                            crate::runtime::control_events::Outcome::Error,
2278                            action,
2279                            collection,
2280                            key,
2281                            &reason,
2282                            Some(&entry),
2283                            Vec::new(),
2284                        )?;
2285                        Err(err)
2286                    }
2287                }
2288            }
2289
2290            KvCommand::Incr {
2291                model,
2292                collection,
2293                key,
2294                by,
2295                ttl_ms,
2296            } => {
2297                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2298                let new_value = ops.incr(*model, collection, key, *by, *ttl_ms)?;
2299
2300                let mut result = UnifiedResult::with_columns(vec![
2301                    "ok".into(),
2302                    "collection".into(),
2303                    "key".into(),
2304                    "value".into(),
2305                ]);
2306                let mut record = UnifiedRecord::new();
2307                record.set("ok", Value::Boolean(true));
2308                record.set("collection", Value::text(collection.clone()));
2309                record.set("key", Value::text(key.clone()));
2310                record.set("value", Value::Integer(new_value));
2311                result.push(record);
2312
2313                Ok(RuntimeQueryResult {
2314                    query: raw_query.to_string(),
2315                    mode: crate::storage::query::modes::QueryMode::Sql,
2316                    statement: "kv_incr",
2317                    engine: "kv",
2318                    result,
2319                    affected_rows: 1,
2320                    statement_type: "update",
2321                    bookmark: None,
2322                })
2323            }
2324
2325            KvCommand::Cas {
2326                model,
2327                collection,
2328                key,
2329                expected,
2330                new_value,
2331                ttl_ms,
2332            } => {
2333                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2334                let (ok, current) = ops.cas(
2335                    *model,
2336                    collection,
2337                    key,
2338                    expected.as_ref(),
2339                    new_value.clone(),
2340                    *ttl_ms,
2341                )?;
2342
2343                let mut result = UnifiedResult::with_columns(vec![
2344                    "ok".into(),
2345                    "collection".into(),
2346                    "key".into(),
2347                    "current".into(),
2348                ]);
2349                let mut record = UnifiedRecord::new();
2350                record.set("ok", Value::Boolean(ok));
2351                record.set("collection", Value::text(collection.clone()));
2352                record.set("key", Value::text(key.clone()));
2353                record.set(
2354                    "current",
2355                    current.unwrap_or(crate::storage::schema::Value::Null),
2356                );
2357                result.push(record);
2358
2359                Ok(RuntimeQueryResult {
2360                    query: raw_query.to_string(),
2361                    mode: crate::storage::query::modes::QueryMode::Sql,
2362                    statement: "kv_cas",
2363                    engine: "kv",
2364                    result,
2365                    affected_rows: if ok { 1 } else { 0 },
2366                    statement_type: "update",
2367                    bookmark: None,
2368                })
2369            }
2370
2371            KvCommand::Delete {
2372                model,
2373                collection,
2374                key,
2375            } => {
2376                if *model == crate::catalog::CollectionModel::Vault {
2377                    self.check_system_vault_capability("vault:write", collection, key)
2378                        .map_err(RedDBError::Query)?;
2379                    self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2380                    let entry = ops.append_vault_version(
2381                        collection,
2382                        key,
2383                        Value::Null,
2384                        "delete",
2385                        true,
2386                        &[],
2387                    )?;
2388                    self.record_kv_watch_event(
2389                        crate::replication::cdc::ChangeOperation::Delete,
2390                        collection,
2391                        key,
2392                        entry.id.raw(),
2393                        None,
2394                        Some(vault_entry_metadata_json(&entry)),
2395                    );
2396                    self.audit_vault_lifecycle(
2397                        "delete",
2398                        collection,
2399                        key,
2400                        crate::runtime::audit_log::Outcome::Success,
2401                        "ok",
2402                        Some(&entry),
2403                    );
2404                    return Ok(vault_write_result(
2405                        raw_query,
2406                        "vault_delete",
2407                        "delete",
2408                        collection,
2409                        key,
2410                        &entry,
2411                        1,
2412                    ));
2413                }
2414                self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2415                let deleted = ops.delete(*model, collection, key)?;
2416
2417                let mut result = UnifiedResult::with_columns(vec![
2418                    "ok".into(),
2419                    "collection".into(),
2420                    "key".into(),
2421                    "deleted".into(),
2422                ]);
2423                let mut record = UnifiedRecord::new();
2424                record.set("ok", Value::Boolean(true));
2425                record.set("collection", Value::text(collection.clone()));
2426                record.set("key", Value::text(key.clone()));
2427                record.set("deleted", Value::Boolean(deleted));
2428                result.push(record);
2429
2430                Ok(RuntimeQueryResult {
2431                    query: raw_query.to_string(),
2432                    mode: crate::storage::query::modes::QueryMode::Sql,
2433                    statement: "kv_delete",
2434                    engine: "kv",
2435                    result,
2436                    affected_rows: if deleted { 1 } else { 0 },
2437                    statement_type: "delete",
2438                    bookmark: None,
2439                })
2440            }
2441        }
2442    }
2443
2444    pub fn vault_watch_events_since(
2445        &self,
2446        collection: &str,
2447        key: &str,
2448        since_lsn: u64,
2449        max_count: usize,
2450    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2451        self.kv_watch_events_since(collection, key, since_lsn, max_count)
2452            .into_iter()
2453            .filter(|event| {
2454                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2455                    .is_ok()
2456            })
2457            .map(vault_filter_watch_event)
2458            .collect()
2459    }
2460
2461    pub fn vault_watch_events_since_prefix(
2462        &self,
2463        collection: &str,
2464        prefix: &str,
2465        since_lsn: u64,
2466        max_count: usize,
2467    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
2468        self.kv_watch_events_since_prefix(collection, prefix, since_lsn, max_count)
2469            .into_iter()
2470            .filter(|event| {
2471                self.check_vault_capability("vault:read_metadata", &event.collection, &event.key)
2472                    .is_ok()
2473            })
2474            .map(vault_filter_watch_event)
2475            .collect()
2476    }
2477
2478    fn check_kv_invalidate_policy(&self, collection: &str) -> RedDBResult<()> {
2479        let auth_store = match self.inner.auth_store.read().clone() {
2480            Some(store) => store,
2481            None => return Ok(()),
2482        };
2483        let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2484            Some(identity) => identity,
2485            None => return Ok(()),
2486        };
2487        if role < crate::auth::Role::Write {
2488            return Err(RedDBError::Query(format!(
2489                "principal=`{username}` role=`{role:?}` cannot invalidate KV tags"
2490            )));
2491        }
2492        if !auth_store.iam_authorization_enabled() {
2493            return Ok(());
2494        }
2495
2496        let tenant = crate::runtime::impl_core::current_tenant();
2497        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2498        let mut resource =
2499            crate::auth::policies::ResourceRef::new("kv".to_string(), collection.to_string());
2500        if let Some(tenant) = tenant.clone() {
2501            resource = resource.with_tenant(tenant);
2502        }
2503        let ctx = crate::auth::policies::EvalContext {
2504            principal_tenant: tenant.clone(),
2505            current_tenant: tenant,
2506            peer_ip: None,
2507            mfa_present: false,
2508            now_ms: current_unix_ms(),
2509            principal_is_admin_role: role == crate::auth::Role::Admin,
2510            principal_is_platform_scoped: principal.tenant.is_none(),
2511        };
2512        if auth_store.check_policy_authz_with_role(
2513            &principal,
2514            "kv:invalidate",
2515            &resource,
2516            &ctx,
2517            role,
2518        ) {
2519            Ok(())
2520        } else {
2521            Err(RedDBError::Query(format!(
2522                "principal=`{username}` action=`kv:invalidate` resource=`kv:{collection}` denied by IAM policy"
2523            )))
2524        }
2525    }
2526}
2527
2528fn ttl_metadata(ttl_ms: Option<u64>) -> Option<Metadata> {
2529    let ttl_ms = ttl_ms?;
2530    Some(Metadata::with_fields(
2531        [(
2532            "_ttl_ms".to_string(),
2533            if ttl_ms <= i64::MAX as u64 {
2534                MetadataValue::Int(ttl_ms as i64)
2535            } else {
2536                MetadataValue::Timestamp(ttl_ms)
2537            },
2538        )]
2539        .into_iter()
2540        .collect(),
2541    ))
2542}
2543
2544fn vault_write_result(
2545    raw_query: &str,
2546    statement: &'static str,
2547    statement_type: &'static str,
2548    collection: &str,
2549    key: &str,
2550    entry: &VaultEntry,
2551    affected_rows: u64,
2552) -> RuntimeQueryResult {
2553    let mut result = UnifiedResult::with_columns(vec![
2554        "ok".into(),
2555        "collection".into(),
2556        "key".into(),
2557        "version".into(),
2558        "fingerprint".into(),
2559        "tombstone".into(),
2560        "op".into(),
2561        "id".into(),
2562    ]);
2563    let mut record = UnifiedRecord::new();
2564    record.set("ok", Value::Boolean(true));
2565    record.set("collection", Value::text(collection.to_string()));
2566    record.set("key", Value::text(key.to_string()));
2567    record.set("version", Value::Integer(entry.version));
2568    if entry.tombstone {
2569        record.set("fingerprint", Value::Null);
2570    } else {
2571        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2572    }
2573    record.set("tombstone", Value::Boolean(entry.tombstone));
2574    record.set("op", Value::text(entry.op.clone()));
2575    record.set("id", Value::Integer(entry.id.raw() as i64));
2576    result.push(record);
2577    RuntimeQueryResult {
2578        query: raw_query.to_string(),
2579        mode: crate::storage::query::modes::QueryMode::Sql,
2580        statement,
2581        engine: "vault",
2582        result,
2583        affected_rows,
2584        statement_type,
2585        bookmark: None,
2586    }
2587}
2588
2589fn vault_history_result(collection: &str, key: &str, versions: &[VaultEntry]) -> UnifiedResult {
2590    let mut result = UnifiedResult::with_columns(vec![
2591        "collection".into(),
2592        "key".into(),
2593        "version".into(),
2594        "fingerprint".into(),
2595        "tags".into(),
2596        "created_at".into(),
2597        "updated_at".into(),
2598        "status".into(),
2599        "tombstone".into(),
2600        "op".into(),
2601    ]);
2602    for entry in versions {
2603        push_vault_metadata_record(&mut result, collection, key, entry);
2604    }
2605    result
2606}
2607
2608fn push_vault_metadata_record(
2609    result: &mut UnifiedResult,
2610    collection: &str,
2611    key: &str,
2612    entry: &VaultEntry,
2613) {
2614    let mut record = UnifiedRecord::new();
2615    record.set("collection", Value::text(collection.to_string()));
2616    record.set("key", Value::text(key.to_string()));
2617    record.set("version", Value::Integer(entry.version));
2618    if entry.tombstone {
2619        record.set("fingerprint", Value::Null);
2620        record.set("status", Value::text("deleted"));
2621    } else {
2622        record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2623        record.set("status", Value::text("sealed"));
2624    }
2625    record.set("tags", vault_tags_value(&entry.metadata));
2626    record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2627    record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2628    record.set("tombstone", Value::Boolean(entry.tombstone));
2629    record.set("op", Value::text(entry.op.clone()));
2630    result.push(record);
2631}
2632
2633fn vault_metadata_result(
2634    collection: &str,
2635    key: &str,
2636    entry: Option<&VaultEntry>,
2637    key_available: bool,
2638) -> UnifiedResult {
2639    let mut result = UnifiedResult::with_columns(vec![
2640        "collection".into(),
2641        "key".into(),
2642        "version".into(),
2643        "fingerprint".into(),
2644        "tags".into(),
2645        "created_at".into(),
2646        "updated_at".into(),
2647        "value".into(),
2648        "status".into(),
2649        "tombstone".into(),
2650        "op".into(),
2651    ]);
2652    let mut record = UnifiedRecord::new();
2653    record.set("collection", Value::text(collection.to_string()));
2654    record.set("key", Value::text(key.to_string()));
2655    match entry {
2656        Some(entry) => {
2657            record.set("version", Value::Integer(entry.version));
2658            if entry.tombstone {
2659                record.set("fingerprint", Value::Null);
2660            } else {
2661                record.set("fingerprint", Value::text(vault_fingerprint(&entry.value)));
2662            }
2663            record.set("tags", vault_tags_value(&entry.metadata));
2664            record.set("created_at", Value::TimestampMs(entry.created_at as i64));
2665            record.set("updated_at", Value::TimestampMs(entry.updated_at as i64));
2666            record.set("value", Value::text("***"));
2667            record.set(
2668                "status",
2669                Value::text(if entry.tombstone {
2670                    "deleted"
2671                } else if key_available {
2672                    "sealed"
2673                } else {
2674                    "sealed_unavailable"
2675                }),
2676            );
2677            record.set("tombstone", Value::Boolean(entry.tombstone));
2678            record.set("op", Value::text(entry.op.clone()));
2679        }
2680        None => {
2681            record.set("version", Value::Null);
2682            record.set("fingerprint", Value::Null);
2683            record.set("tags", Value::Array(Vec::new()));
2684            record.set("created_at", Value::Null);
2685            record.set("updated_at", Value::Null);
2686            record.set("value", Value::text(""));
2687            record.set("status", Value::text("missing"));
2688            record.set("tombstone", Value::Boolean(false));
2689            record.set("op", Value::Null);
2690        }
2691    }
2692    result.push(record);
2693    result
2694}
2695
2696fn vault_fingerprint(value: &Value) -> String {
2697    match value {
2698        Value::Secret(payload) => crate::utils::to_hex(&crate::crypto::sha256::sha256(payload)),
2699        other => crate::utils::to_hex(&crate::crypto::sha256::sha256(&other.to_bytes())),
2700    }
2701}
2702
2703fn vault_entry_metadata_json(entry: &VaultEntry) -> crate::json::Value {
2704    let mut object = crate::json::Map::new();
2705    object.insert(
2706        "key".to_string(),
2707        crate::json::Value::String(entry.key.clone()),
2708    );
2709    object.insert(
2710        "version".to_string(),
2711        crate::json::Value::Number(entry.version as f64),
2712    );
2713    object.insert(
2714        "fingerprint".to_string(),
2715        if entry.tombstone {
2716            crate::json::Value::Null
2717        } else {
2718            crate::json::Value::String(vault_fingerprint(&entry.value))
2719        },
2720    );
2721    object.insert("tags".to_string(), vault_tags_json(&entry.metadata));
2722    object.insert(
2723        "actor".to_string(),
2724        crate::json::Value::String(RedDBRuntime::current_vault_actor()),
2725    );
2726    object.insert(
2727        "sequence_id".to_string(),
2728        crate::json::Value::Number(entry.sequence_id as f64),
2729    );
2730    object.insert(
2731        "tombstone".to_string(),
2732        crate::json::Value::Bool(entry.tombstone),
2733    );
2734    object.insert(
2735        "op".to_string(),
2736        crate::json::Value::String(entry.op.clone()),
2737    );
2738    crate::json::Value::Object(object)
2739}
2740
2741fn vault_tags_json(metadata: &Metadata) -> crate::json::Value {
2742    match vault_tags_value(metadata) {
2743        Value::Array(values) => crate::json::Value::Array(
2744            values
2745                .into_iter()
2746                .filter_map(|value| match value {
2747                    Value::Text(tag) => Some(crate::json::Value::String(tag.to_string())),
2748                    _ => None,
2749                })
2750                .collect(),
2751        ),
2752        _ => crate::json::Value::Array(Vec::new()),
2753    }
2754}
2755
2756fn vault_tags_metadata(tags: &[String]) -> std::collections::HashMap<String, MetadataValue> {
2757    [(
2758        "tags".to_string(),
2759        MetadataValue::Array(
2760            tags.iter()
2761                .map(|tag| MetadataValue::String(tag.clone()))
2762                .collect(),
2763        ),
2764    )]
2765    .into_iter()
2766    .collect()
2767}
2768
2769fn vault_filter_watch_event(
2770    mut event: crate::replication::cdc::KvWatchEvent,
2771) -> crate::replication::cdc::KvWatchEvent {
2772    event.before = event.before.and_then(vault_metadata_json_only);
2773    event.after = event.after.and_then(vault_metadata_json_only);
2774    event
2775}
2776
2777fn vault_metadata_json_only(value: crate::json::Value) -> Option<crate::json::Value> {
2778    let object = value.as_object()?;
2779    let mut out = crate::json::Map::new();
2780    for field in [
2781        "key",
2782        "version",
2783        "fingerprint",
2784        "tags",
2785        "actor",
2786        "sequence_id",
2787        "tombstone",
2788        "op",
2789    ] {
2790        if let Some(value) = object.get(field) {
2791            out.insert(field.to_string(), value.clone());
2792        }
2793    }
2794    Some(crate::json::Value::Object(out))
2795}
2796
2797fn vault_tags_value(metadata: &Metadata) -> Value {
2798    match metadata.get("tags") {
2799        Some(MetadataValue::Array(values)) => Value::Array(
2800            values
2801                .iter()
2802                .filter_map(|value| match value {
2803                    MetadataValue::String(tag) => Some(Value::text(tag.clone())),
2804                    _ => None,
2805                })
2806                .collect(),
2807        ),
2808        Some(MetadataValue::String(tag)) if !tag.is_empty() => {
2809            Value::Array(vec![Value::text(tag.clone())])
2810        }
2811        _ => Value::Array(Vec::new()),
2812    }
2813}
2814
2815fn decode_vault_key(hex_key: &str) -> RedDBResult<[u8; 32]> {
2816    let bytes = hex::decode(hex_key)
2817        .map_err(|_| RedDBError::Query("vault sealed_unavailable: bad key material".to_string()))?;
2818    let key: [u8; 32] = bytes.try_into().map_err(|_| {
2819        RedDBError::Query("vault sealed_unavailable: bad key material length".to_string())
2820    })?;
2821    Ok(key)
2822}
2823
2824fn kv_tags_metadata(tags: &[String]) -> Option<(String, MetadataValue)> {
2825    if tags.is_empty() {
2826        return None;
2827    }
2828    let values = tags
2829        .iter()
2830        .map(|tag| MetadataValue::String(tag.clone()))
2831        .collect();
2832    Some(("_kv_tags".to_string(), MetadataValue::Array(values)))
2833}
2834
2835fn kv_tags_value(tags: &[String]) -> Value {
2836    let json = crate::json::Value::Array(
2837        tags.iter()
2838            .map(|tag| crate::json::Value::String(tag.clone()))
2839            .collect(),
2840    );
2841    Value::Json(crate::json::to_vec(&json).unwrap_or_default())
2842}
2843
2844fn kv_value_from_entity(entity: &crate::storage::UnifiedEntity) -> Option<Value> {
2845    if let crate::storage::EntityData::Row(ref row) = entity.data {
2846        if let Some(ref named) = row.named {
2847            return named.get("value").cloned();
2848        }
2849    }
2850    None
2851}
2852
2853/// Whether a KV row entity carries the logical `key`.
2854fn kv_entity_has_key(entity: &crate::storage::UnifiedEntity, key: &str) -> bool {
2855    if let crate::storage::EntityData::Row(ref row) = entity.data {
2856        if let Some(ref named) = row.named {
2857            if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2858                return &**k == key;
2859            }
2860        }
2861    }
2862    false
2863}
2864
2865fn insert_kv_json_path(root: &mut crate::json::Value, path: &str, value: crate::json::Value) {
2866    let segments: Vec<&str> = path
2867        .split('.')
2868        .filter(|segment| !segment.is_empty())
2869        .collect();
2870    insert_kv_json_segments(root, &segments, value);
2871}
2872
2873fn insert_kv_json_segments(
2874    root: &mut crate::json::Value,
2875    segments: &[&str],
2876    value: crate::json::Value,
2877) {
2878    if segments.is_empty() {
2879        *root = value;
2880        return;
2881    }
2882
2883    if !matches!(root, crate::json::Value::Object(_)) {
2884        *root = crate::json::Value::Object(crate::json::Map::new());
2885    }
2886
2887    let crate::json::Value::Object(map) = root else {
2888        return;
2889    };
2890    if segments.len() == 1 {
2891        map.insert(segments[0].to_string(), value);
2892        return;
2893    }
2894    let entry = map
2895        .entry(segments[0].to_string())
2896        .or_insert_with(|| crate::json::Value::Object(crate::json::Map::new()));
2897    insert_kv_json_segments(entry, &segments[1..], value);
2898}
2899
2900fn kv_list_json_result(
2901    raw_query: &str,
2902    collection: &str,
2903    prefix: &Option<String>,
2904    value: crate::json::Value,
2905) -> RuntimeQueryResult {
2906    let mut result =
2907        UnifiedResult::with_columns(vec!["collection".into(), "prefix".into(), "value".into()]);
2908    let mut record = UnifiedRecord::new();
2909    record.set("collection", Value::text(collection.to_string()));
2910    record.set(
2911        "prefix",
2912        prefix
2913            .as_ref()
2914            .map(|prefix| Value::text(prefix.clone()))
2915            .unwrap_or(Value::Null),
2916    );
2917    record.set("value", Value::Json(value.to_string_compact().into_bytes()));
2918    result.push(record);
2919    RuntimeQueryResult {
2920        query: raw_query.to_string(),
2921        mode: crate::storage::query::modes::QueryMode::Sql,
2922        statement: "kv_list_json",
2923        engine: "kv",
2924        result,
2925        affected_rows: 0,
2926        statement_type: "select",
2927        bookmark: None,
2928    }
2929}
2930
2931fn kv_collection_contract(name: &str) -> crate::physical::CollectionContract {
2932    let now = current_unix_ms();
2933    crate::physical::CollectionContract {
2934        name: name.to_string(),
2935        declared_model: crate::catalog::CollectionModel::Kv,
2936        schema_mode: crate::catalog::SchemaMode::Dynamic,
2937        origin: crate::physical::ContractOrigin::Implicit,
2938        version: 1,
2939        created_at_unix_ms: now,
2940        updated_at_unix_ms: now,
2941        default_ttl_ms: None,
2942        vector_dimension: None,
2943        vector_metric: None,
2944        context_index_fields: Vec::new(),
2945        declared_columns: Vec::new(),
2946        table_def: None,
2947        timestamps_enabled: false,
2948        context_index_enabled: false,
2949        metrics_raw_retention_ms: None,
2950        metrics_rollup_policies: Vec::new(),
2951        metrics_tenant_identity: None,
2952        metrics_namespace: None,
2953        append_only: false,
2954        subscriptions: Vec::new(),
2955        analytics_config: Vec::new(),
2956        session_key: None,
2957        session_gap_ms: None,
2958        retention_duration_ms: None,
2959        analytical_storage: None,
2960
2961        ai_policy: None,
2962    }
2963}
2964
2965fn current_unix_ms() -> u128 {
2966    std::time::SystemTime::now()
2967        .duration_since(std::time::UNIX_EPOCH)
2968        .unwrap_or_default()
2969        .as_millis()
2970}
2971
2972#[cfg(test)]
2973mod tests {
2974    use crate::api::RedDBOptions;
2975    use crate::catalog::CollectionModel;
2976    use crate::runtime::RedDBRuntime;
2977
2978    fn rt() -> RedDBRuntime {
2979        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
2980    }
2981
2982    #[test]
2983    fn incr_missing_key_initialises_at_by() {
2984        let r = rt();
2985        let ops = super::KvAtomicOps::new(&r);
2986        let v = ops
2987            .incr(CollectionModel::Kv, "kv_default", "missing", 5, None)
2988            .unwrap();
2989        assert_eq!(v, 5);
2990    }
2991
2992    #[test]
2993    fn kv_runtime_stats_count_public_ops() {
2994        let r = rt();
2995        let ops = super::KvAtomicOps::new(&r);
2996
2997        ops.set(
2998            CollectionModel::Kv,
2999            "kv_default",
3000            "profile",
3001            crate::storage::schema::Value::text("alice"),
3002            None,
3003            false,
3004        )
3005        .unwrap();
3006        ops.get(CollectionModel::Kv, "kv_default", "profile")
3007            .unwrap();
3008        ops.delete(CollectionModel::Kv, "kv_default", "profile")
3009            .unwrap();
3010        ops.incr(CollectionModel::Kv, "kv_default", "hits", 1, None)
3011            .unwrap();
3012        ops.cas(
3013            CollectionModel::Kv,
3014            "kv_default",
3015            "profile",
3016            None,
3017            crate::storage::schema::Value::text("created"),
3018            None,
3019        )
3020        .unwrap();
3021        ops.cas(
3022            CollectionModel::Kv,
3023            "kv_default",
3024            "profile",
3025            Some(&crate::storage::schema::Value::text("different")),
3026            crate::storage::schema::Value::text("ignored"),
3027            None,
3028        )
3029        .unwrap();
3030
3031        let stats = r.stats().kv;
3032        assert_eq!(stats.puts, 1);
3033        assert_eq!(stats.gets, 1);
3034        assert_eq!(stats.deletes, 1);
3035        assert_eq!(stats.incrs, 1);
3036        assert_eq!(stats.cas_success, 1);
3037        assert_eq!(stats.cas_conflict, 1);
3038    }
3039
3040    #[test]
3041    fn kv_invalidate_tags_removes_matching_entries_only() {
3042        let r = rt();
3043
3044        r.execute_query("KV PUT sessions.blob = 'payload' TAGS [user:42, org:7]")
3045            .unwrap();
3046
3047        let miss = r
3048            .execute_query("INVALIDATE TAGS [org:99] FROM sessions")
3049            .unwrap();
3050        assert_eq!(miss.affected_rows, 0);
3051        assert!(matches!(
3052            r.execute_query("KV GET sessions.blob")
3053                .unwrap()
3054                .result
3055                .records[0]
3056                .get("value"),
3057            Some(crate::storage::schema::Value::Text(value)) if &**value == "payload"
3058        ));
3059
3060        let hit = r
3061            .execute_query("INVALIDATE TAGS [user:42] FROM sessions")
3062            .unwrap();
3063        assert_eq!(hit.affected_rows, 1);
3064        assert!(matches!(
3065            r.execute_query("KV GET sessions.blob")
3066                .unwrap()
3067                .result
3068                .records[0]
3069                .get("value"),
3070            Some(crate::storage::schema::Value::Null)
3071        ));
3072    }
3073
3074    #[test]
3075    fn kv_runtime_stats_count_watch_streams_and_events() {
3076        let r = rt();
3077        let ops = super::KvAtomicOps::new(&r);
3078        assert_eq!(r.stats().kv.watch_streams_active, 0);
3079
3080        {
3081            let mut stream = r.kv_watch_subscribe("kv_default", "watched", None);
3082            assert_eq!(r.stats().kv.watch_streams_active, 1);
3083
3084            ops.set(
3085                CollectionModel::Kv,
3086                "kv_default",
3087                "watched",
3088                crate::storage::schema::Value::Integer(1),
3089                None,
3090                false,
3091            )
3092            .unwrap();
3093            let event = stream.poll_next().expect("watch event");
3094            assert_eq!(event.key, "watched");
3095            assert_eq!(r.stats().kv.watch_events_emitted, 1);
3096
3097            stream.record_drop_count(3);
3098            assert_eq!(r.stats().kv.watch_drops, 3);
3099        }
3100
3101        assert_eq!(r.stats().kv.watch_streams_active, 0);
3102    }
3103
3104    #[test]
3105    fn incr_existing_integer_accumulates() {
3106        let r = rt();
3107        let ops = super::KvAtomicOps::new(&r);
3108        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3109            .unwrap();
3110        ops.incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3111            .unwrap();
3112        let v = ops
3113            .incr(CollectionModel::Kv, "kv_default", "ctr", 1, None)
3114            .unwrap();
3115        assert_eq!(v, 3);
3116    }
3117
3118    #[test]
3119    fn decr_via_negative_by() {
3120        let r = rt();
3121        let ops = super::KvAtomicOps::new(&r);
3122        ops.incr(CollectionModel::Kv, "kv_default", "stock", 10, None)
3123            .unwrap();
3124        let v = ops
3125            .incr(CollectionModel::Kv, "kv_default", "stock", -3, None)
3126            .unwrap();
3127        assert_eq!(v, 7);
3128    }
3129
3130    #[test]
3131    fn concurrent_incr_single_key_is_atomic() {
3132        const THREADS: usize = 8;
3133        const ITERS: usize = 1000;
3134
3135        let runtime = std::sync::Arc::new(rt());
3136        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3137        let mut handles = Vec::new();
3138
3139        for _ in 0..THREADS {
3140            let runtime = std::sync::Arc::clone(&runtime);
3141            let barrier = std::sync::Arc::clone(&barrier);
3142            handles.push(std::thread::spawn(move || {
3143                let ops = super::KvAtomicOps::new(&runtime);
3144                barrier.wait();
3145                for _ in 0..ITERS {
3146                    ops.incr(CollectionModel::Kv, "kv_default", "counter", 1, None)
3147                        .unwrap();
3148                }
3149            }));
3150        }
3151
3152        for handle in handles {
3153            handle.join().expect("worker should finish");
3154        }
3155
3156        let ops = super::KvAtomicOps::new(&runtime);
3157        assert_eq!(
3158            ops.get(CollectionModel::Kv, "kv_default", "counter")
3159                .unwrap(),
3160            Some(crate::storage::schema::Value::Integer(
3161                (THREADS * ITERS) as i64
3162            ))
3163        );
3164    }
3165
3166    #[test]
3167    fn incr_on_string_value_returns_error() {
3168        let r = rt();
3169        let ops = super::KvAtomicOps::new(&r);
3170        ops.set(
3171            CollectionModel::Kv,
3172            "kv_default",
3173            "name",
3174            crate::storage::schema::Value::text("alice"),
3175            None,
3176            false,
3177        )
3178        .unwrap();
3179        let err = ops
3180            .incr(CollectionModel::Kv, "kv_default", "name", 1, None)
3181            .unwrap_err();
3182        assert!(err.to_string().contains("non-integer"));
3183    }
3184
3185    // --- CAS tests ---
3186
3187    #[test]
3188    fn cas_matching_value_succeeds() {
3189        let r = rt();
3190        let ops = super::KvAtomicOps::new(&r);
3191        ops.set(
3192            CollectionModel::Kv,
3193            "kv_default",
3194            "lock",
3195            crate::storage::schema::Value::text("free"),
3196            None,
3197            false,
3198        )
3199        .unwrap();
3200        let (ok, prev) = ops
3201            .cas(
3202                CollectionModel::Kv,
3203                "kv_default",
3204                "lock",
3205                Some(&crate::storage::schema::Value::text("free")),
3206                crate::storage::schema::Value::text("held"),
3207                None,
3208            )
3209            .unwrap();
3210        assert!(ok);
3211        assert_eq!(prev, Some(crate::storage::schema::Value::text("free")));
3212        // Value actually changed.
3213        assert_eq!(
3214            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3215            Some(crate::storage::schema::Value::text("held"))
3216        );
3217    }
3218
3219    #[test]
3220    fn concurrent_cas_allows_one_success_per_round() {
3221        const THREADS: usize = 8;
3222        const ROUNDS: usize = 100;
3223
3224        let runtime = std::sync::Arc::new(rt());
3225        let ops = super::KvAtomicOps::new(&runtime);
3226        ops.set(
3227            CollectionModel::Kv,
3228            "kv_default",
3229            "cas_counter",
3230            crate::storage::schema::Value::Integer(0),
3231            None,
3232            false,
3233        )
3234        .unwrap();
3235
3236        let start_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3237        let finish_round = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3238        let successes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
3239        let mut handles = Vec::new();
3240
3241        for _ in 0..THREADS {
3242            let runtime = std::sync::Arc::clone(&runtime);
3243            let start_round = std::sync::Arc::clone(&start_round);
3244            let finish_round = std::sync::Arc::clone(&finish_round);
3245            let successes = std::sync::Arc::clone(&successes);
3246            handles.push(std::thread::spawn(move || {
3247                let ops = super::KvAtomicOps::new(&runtime);
3248                for round in 0..ROUNDS {
3249                    start_round.wait();
3250                    let (ok, _) = ops
3251                        .cas(
3252                            CollectionModel::Kv,
3253                            "kv_default",
3254                            "cas_counter",
3255                            Some(&crate::storage::schema::Value::Integer(round as i64)),
3256                            crate::storage::schema::Value::Integer((round + 1) as i64),
3257                            None,
3258                        )
3259                        .unwrap();
3260                    if ok {
3261                        successes.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3262                    }
3263                    finish_round.wait();
3264                }
3265            }));
3266        }
3267
3268        for handle in handles {
3269            handle.join().expect("worker should finish");
3270        }
3271
3272        assert_eq!(successes.load(std::sync::atomic::Ordering::SeqCst), ROUNDS);
3273        assert_eq!(
3274            ops.get(CollectionModel::Kv, "kv_default", "cas_counter")
3275                .unwrap(),
3276            Some(crate::storage::schema::Value::Integer(ROUNDS as i64))
3277        );
3278    }
3279
3280    #[test]
3281    fn cas_mismatching_value_fails() {
3282        let r = rt();
3283        let ops = super::KvAtomicOps::new(&r);
3284        ops.set(
3285            CollectionModel::Kv,
3286            "kv_default",
3287            "lock",
3288            crate::storage::schema::Value::text("free"),
3289            None,
3290            false,
3291        )
3292        .unwrap();
3293        let (ok, current) = ops
3294            .cas(
3295                CollectionModel::Kv,
3296                "kv_default",
3297                "lock",
3298                Some(&crate::storage::schema::Value::text("held")),
3299                crate::storage::schema::Value::text("worker-7"),
3300                None,
3301            )
3302            .unwrap();
3303        assert!(!ok);
3304        assert_eq!(current, Some(crate::storage::schema::Value::text("free")));
3305        // Value unchanged.
3306        assert_eq!(
3307            ops.get(CollectionModel::Kv, "kv_default", "lock").unwrap(),
3308            Some(crate::storage::schema::Value::text("free"))
3309        );
3310    }
3311
3312    #[test]
3313    fn cas_expect_null_on_missing_key_creates() {
3314        let r = rt();
3315        let ops = super::KvAtomicOps::new(&r);
3316        let (ok, prev) = ops
3317            .cas(
3318                CollectionModel::Kv,
3319                "kv_default",
3320                "new_key",
3321                None,
3322                crate::storage::schema::Value::text("created"),
3323                None,
3324            )
3325            .unwrap();
3326        assert!(ok);
3327        assert_eq!(prev, None);
3328        assert_eq!(
3329            ops.get(CollectionModel::Kv, "kv_default", "new_key")
3330                .unwrap(),
3331            Some(crate::storage::schema::Value::text("created"))
3332        );
3333    }
3334
3335    #[test]
3336    fn cas_expect_null_on_existing_key_fails() {
3337        let r = rt();
3338        let ops = super::KvAtomicOps::new(&r);
3339        ops.set(
3340            CollectionModel::Kv,
3341            "kv_default",
3342            "taken",
3343            crate::storage::schema::Value::text("worker-1"),
3344            None,
3345            false,
3346        )
3347        .unwrap();
3348        let (ok, current) = ops
3349            .cas(
3350                CollectionModel::Kv,
3351                "kv_default",
3352                "taken",
3353                None,
3354                crate::storage::schema::Value::text("worker-2"),
3355                None,
3356            )
3357            .unwrap();
3358        assert!(!ok);
3359        assert_eq!(
3360            current,
3361            Some(crate::storage::schema::Value::text("worker-1"))
3362        );
3363    }
3364
3365    #[test]
3366    fn cas_via_sql_roundtrip() {
3367        let r = rt();
3368        // Seed value.
3369        r.execute_query("KV PUT lock = 'free'").unwrap();
3370        // CAS: free → held — should succeed.
3371        let res = r
3372            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3373            .unwrap();
3374        let row = &res.result.records[0];
3375        assert_eq!(
3376            row.get("ok"),
3377            Some(&crate::storage::schema::Value::Boolean(true))
3378        );
3379        // CAS: free → held again — should fail (value is now 'held').
3380        let res2 = r
3381            .execute_query("KV CAS lock EXPECT 'free' SET 'held'")
3382            .unwrap();
3383        let row2 = &res2.result.records[0];
3384        assert_eq!(
3385            row2.get("ok"),
3386            Some(&crate::storage::schema::Value::Boolean(false))
3387        );
3388    }
3389
3390    #[test]
3391    fn cas_expect_null_via_sql() {
3392        let r = rt();
3393        let res = r
3394            .execute_query("KV CAS singleton EXPECT NULL SET 'first'")
3395            .unwrap();
3396        let row = &res.result.records[0];
3397        assert_eq!(
3398            row.get("ok"),
3399            Some(&crate::storage::schema::Value::Boolean(true))
3400        );
3401        // Second call must fail.
3402        let res2 = r
3403            .execute_query("KV CAS singleton EXPECT NULL SET 'second'")
3404            .unwrap();
3405        let row2 = &res2.result.records[0];
3406        assert_eq!(
3407            row2.get("ok"),
3408            Some(&crate::storage::schema::Value::Boolean(false))
3409        );
3410    }
3411
3412    #[test]
3413    fn incr_via_sql_roundtrip() {
3414        let r = rt();
3415        let res = r.execute_query("KV INCR hits").unwrap();
3416        let row = &res.result.records[0];
3417        assert_eq!(
3418            row.get("value"),
3419            Some(&crate::storage::schema::Value::Integer(1))
3420        );
3421        let res2 = r.execute_query("KV INCR hits BY 4").unwrap();
3422        let row2 = &res2.result.records[0];
3423        assert_eq!(
3424            row2.get("value"),
3425            Some(&crate::storage::schema::Value::Integer(5))
3426        );
3427    }
3428
3429    #[test]
3430    fn concurrent_self_referential_update_is_atomic() {
3431        const THREADS: usize = 8;
3432        const ITERS: usize = 100;
3433
3434        let runtime = std::sync::Arc::new(rt());
3435        runtime
3436            .execute_query("CREATE TABLE counters (id INT, n INT)")
3437            .unwrap();
3438        runtime
3439            .execute_query("INSERT INTO counters (id, n) VALUES (1, 0)")
3440            .unwrap();
3441
3442        let barrier = std::sync::Arc::new(std::sync::Barrier::new(THREADS));
3443        let mut handles = Vec::new();
3444        for _ in 0..THREADS {
3445            let runtime = std::sync::Arc::clone(&runtime);
3446            let barrier = std::sync::Arc::clone(&barrier);
3447            handles.push(std::thread::spawn(move || {
3448                barrier.wait();
3449                for _ in 0..ITERS {
3450                    runtime
3451                        .execute_query("UPDATE counters SET n = n + 1 WHERE id = 1")
3452                        .unwrap();
3453                }
3454            }));
3455        }
3456
3457        for handle in handles {
3458            handle.join().expect("worker should finish");
3459        }
3460
3461        let selected = runtime
3462            .execute_query("SELECT n FROM counters WHERE id = 1")
3463            .unwrap();
3464        assert_eq!(
3465            selected.result.records[0].get("n"),
3466            Some(&crate::storage::schema::Value::Integer(
3467                (THREADS * ITERS) as i64
3468            ))
3469        );
3470    }
3471
3472    #[test]
3473    fn watch_stream_delivers_key_events_in_lsn_order() {
3474        let r = rt();
3475        let ops = super::KvAtomicOps::new(&r);
3476        let mut stream = r.kv_watch_subscribe("kv_default", "seq", None);
3477
3478        ops.set(
3479            CollectionModel::Kv,
3480            "kv_default",
3481            "seq",
3482            crate::storage::schema::Value::Integer(1),
3483            None,
3484            false,
3485        )
3486        .unwrap();
3487        ops.incr(CollectionModel::Kv, "kv_default", "seq", 1, None)
3488            .unwrap();
3489        ops.delete(CollectionModel::Kv, "kv_default", "seq")
3490            .unwrap();
3491        ops.set(
3492            CollectionModel::Kv,
3493            "kv_default",
3494            "seq",
3495            crate::storage::schema::Value::Integer(9),
3496            None,
3497            false,
3498        )
3499        .unwrap();
3500
3501        let mut events = Vec::new();
3502        while let Some(event) = stream.poll_next() {
3503            events.push(event);
3504            if events.len() == 4 {
3505                break;
3506            }
3507        }
3508
3509        assert_eq!(events.len(), 4);
3510        assert_eq!(
3511            events[0].op,
3512            crate::replication::cdc::ChangeOperation::Insert
3513        );
3514        assert_eq!(
3515            events[1].op,
3516            crate::replication::cdc::ChangeOperation::Update
3517        );
3518        assert_eq!(
3519            events[2].op,
3520            crate::replication::cdc::ChangeOperation::Delete
3521        );
3522        assert_eq!(
3523            events[3].op,
3524            crate::replication::cdc::ChangeOperation::Insert
3525        );
3526        assert!(events.windows(2).all(|pair| pair[0].lsn < pair[1].lsn));
3527    }
3528
3529    #[test]
3530    fn watch_prefix_stream_delivers_matching_events_only() {
3531        let r = rt();
3532        let ops = super::KvAtomicOps::new(&r);
3533        let mut stream = r.kv_watch_subscribe_prefix("kv_default", "acct:", None);
3534
3535        ops.set(
3536            CollectionModel::Kv,
3537            "kv_default",
3538            "acct:1",
3539            crate::storage::schema::Value::Integer(1),
3540            None,
3541            false,
3542        )
3543        .unwrap();
3544        ops.set(
3545            CollectionModel::Kv,
3546            "kv_default",
3547            "session:1",
3548            crate::storage::schema::Value::Integer(2),
3549            None,
3550            false,
3551        )
3552        .unwrap();
3553        ops.set(
3554            CollectionModel::Kv,
3555            "kv_default",
3556            "acct:2",
3557            crate::storage::schema::Value::Integer(3),
3558            None,
3559            false,
3560        )
3561        .unwrap();
3562
3563        let first = stream.poll_next().expect("first prefix event");
3564        let second = stream.poll_next().expect("second prefix event");
3565        assert_eq!(first.key, "acct:1");
3566        assert_eq!(second.key, "acct:2");
3567        assert!(stream.poll_next().is_none());
3568    }
3569
3570    #[test]
3571    fn watch_stream_resume_from_lsn_delivers_missed_events_without_duplicates() {
3572        let r = rt();
3573        let ops = super::KvAtomicOps::new(&r);
3574        let mut stream = r.kv_watch_subscribe("kv_default", "resume", None);
3575
3576        let mut last_seen_lsn = 0;
3577        for value in 0..5 {
3578            ops.set(
3579                CollectionModel::Kv,
3580                "kv_default",
3581                "resume",
3582                crate::storage::schema::Value::Integer(value),
3583                None,
3584                false,
3585            )
3586            .unwrap();
3587            last_seen_lsn = stream.poll_next().expect("initial event").lsn;
3588        }
3589        drop(stream);
3590
3591        for value in 5..55 {
3592            ops.set(
3593                CollectionModel::Kv,
3594                "kv_default",
3595                "resume",
3596                crate::storage::schema::Value::Integer(value),
3597                None,
3598                false,
3599            )
3600            .unwrap();
3601        }
3602
3603        let expected_lsns: Vec<u64> = r
3604            .kv_watch_events_since("kv_default", "resume", last_seen_lsn, 200)
3605            .into_iter()
3606            .map(|event| event.lsn)
3607            .collect();
3608        assert!(!expected_lsns.is_empty());
3609
3610        let mut resumed = r.kv_watch_subscribe("kv_default", "resume", Some(last_seen_lsn));
3611        let mut lsns = Vec::new();
3612        while let Some(event) = resumed.poll_next() {
3613            lsns.push(event.lsn);
3614            if lsns.len() == expected_lsns.len() {
3615                break;
3616            }
3617        }
3618
3619        assert_eq!(lsns, expected_lsns);
3620        assert!(lsns.iter().all(|lsn| *lsn > last_seen_lsn));
3621        assert!(lsns.windows(2).all(|pair| pair[0] < pair[1]));
3622        assert!(resumed.poll_next().is_none());
3623    }
3624
3625    #[test]
3626    fn watch_stream_slow_consumer_drops_oldest_buffered_events() {
3627        let r = rt();
3628        let ops = super::KvAtomicOps::new(&r);
3629        let mut stream = r.kv_watch_subscribe("kv_default", "slow", None);
3630
3631        for value in 0..1_200 {
3632            ops.set(
3633                CollectionModel::Kv,
3634                "kv_default",
3635                "slow",
3636                crate::storage::schema::Value::Integer(value),
3637                None,
3638                false,
3639            )
3640            .unwrap();
3641        }
3642
3643        let event = stream.poll_next().expect("tail event after drops");
3644        assert!(event.lsn > 1);
3645        assert!(event.dropped_event_count > 0);
3646        assert_eq!(stream.dropped_event_count(), event.dropped_event_count);
3647        assert_eq!(r.stats().kv.watch_drops, event.dropped_event_count);
3648    }
3649
3650    #[test]
3651    fn watch_stream_idle_timeout_closes_subscription() {
3652        let r = rt();
3653        r.execute_query("SET CONFIG red.config.kv.watch.idle_timeout_ms = 1")
3654            .unwrap();
3655
3656        let mut stream = r.kv_watch_subscribe("kv_default", "idle", None);
3657        assert_eq!(r.stats().kv.watch_streams_active, 1);
3658        std::thread::sleep(std::time::Duration::from_millis(5));
3659
3660        assert!(stream.poll_next().is_none());
3661        assert_eq!(r.stats().kv.watch_streams_active, 0);
3662    }
3663
3664    #[test]
3665    fn watch_stream_does_not_emit_rolled_back_put() {
3666        let r = rt();
3667        let mut stream = r.kv_watch_subscribe("kv_default", "rollback_key", None);
3668
3669        r.execute_query("BEGIN").unwrap();
3670        r.execute_query("KV PUT rollback_key = 'dirty'").unwrap();
3671        r.execute_query("ROLLBACK").unwrap();
3672
3673        assert!(stream.poll_next().is_none());
3674    }
3675}