Skip to main content

lunaris_storage_moon/
lib.rs

1//! `MoonStorage` — `StoragePort` impl backed by Moon (Redis-compatible RESP).
2//!
3//! RFC 0001 Wave 1C: every `StoragePort` method now routes through per-scope
4//! keyspace helpers (`keyspace::{scope_prefix, ft_index_name, graph_key, mq_topic}`).
5//! Per-scope FT indices, graph keys, and MQ topics are created lazily on first
6//! write via `ensure_scope`.
7//!
8//! Per blueprint §6, every method is a thin pass-through to a Moon native command:
9//!
10//! | trait method      | Moon command(s)                                                         |
11//! |-------------------|-------------------------------------------------------------------------|
12//! | `atomic_write`    | `TXN.BEGIN` + per-op (`HSET` / `FT.UPSERT` / `GRAPH.QUERY MERGE`) + `TXN.COMMIT` |
13//! | `vector_search`   | `FT.SEARCH` (with `TEMPORAL.SNAPSHOT_AT` when `as_of` is `Some`)         |
14//! | `graph_traverse`  | `GRAPH.QUERY` (with `TEMPORAL.SNAPSHOT_AT` when `as_of` is `Some`)       |
15//! | `scan_range`      | `SCAN ... MATCH <prefix>*` then `HGET` per matched key                   |
16//! | `read_as_of`      | `TEMPORAL.SNAPSHOT_AT` then `HGET <key> v`                              |
17//! | `publish`         | `MQ.PUSH`                                                               |
18//! | `subscribe`       | `MQ.POP ... BLOCK` polling stream                                       |
19//! | `capabilities`    | constant — Moon-native everything                                       |
20//!
21//! ## Lazy per-scope init
22//!
23//! On first write under a scope, `ensure_scope` creates:
24//! - `FT.CREATE lunaris_{scope}_{kind}_idx` for each of chunks / entities / facts / communities
25//! - `GRAPH.CREATE lunaris_{scope}_graph`
26//!
27//! Subsequent calls for an already-initialized scope skip the Moon round-trips via
28//! an in-memory `initialized_scopes` set (lock-free read path once initialized).
29//!
30//! ## Threat model snapshot (T-01-03-*)
31//!
32//! * `WriteOp::GraphNode { label, ... }` and `WriteOp::GraphEdge { rel, ... }` are
33//!   interpolated into Cypher. Callers MUST validate `label` / `rel` against
34//!   `^[A-Za-z_][A-Za-z0-9_]*$` — see `crates/lunaris-storage-moon/src/atomic.rs` rustdoc.
35//!   Phase 4 (`OPS-04` audit) will move the guard into the trait.
36//! * Connection is cleartext RESP over TCP — Moon is treated as trusted infra inside the
37//!   same network boundary as the Lunaris process. TLS lands in Phase 5.
38
39#![deny(rust_2018_idioms, unreachable_pub)]
40#![forbid(unsafe_code)]
41
42pub mod atomic;
43pub mod client;
44pub mod graph;
45// W2-L2 — FT.INVALIDATE_RANGE raw RESP escape hatch (UC-G3 force-push invalidation).
46// pub(crate): only called from `MoonStorage::invalidate_range` in this file.
47pub(crate) mod invalidate;
48pub mod keyspace;
49pub mod keyword;
50pub mod kv;
51// hotkeys-observability — HOTKEYS raw RESP path (typed SDK has no wrapper).
52pub(crate) mod hotkeys;
53// ft-navigate-recall — FT.NAVIGATE raw RESP path (typed SDK lacks a DECAY slot).
54pub(crate) mod navigate;
55pub mod queue;
56pub mod scopes;
57pub mod vector;
58
59pub use client::MoonClient;
60
61use std::collections::HashSet;
62use std::sync::Arc;
63
64use async_trait::async_trait;
65use bytes::Bytes;
66use futures::stream::BoxStream;
67use lunaris_core::{
68    CypherQuery, Filter, GraphDecay, GraphResult, Hlc, KeywordHit, KeywordPort, Lsn, NavigateHit,
69    NavigateSpec, QueueMsg, Row, Scope, ScopePage, StorageCapabilities, StorageError, StoragePort,
70    VectorHit, WriteOp,
71};
72use parking_lot::Mutex;
73
74use crate::keyspace::{ft_index_name, graph_key};
75
76/// `StoragePort` backed by a single Moon RESP connection manager.
77///
78/// `initialized_scopes` tracks which scopes have had their FT indices and graph
79/// key created (lazy init on first write). `Mutex` is held only during the
80/// brief check + insert — never across `.await` points.
81#[derive(Debug, Clone)]
82pub struct MoonStorage {
83    pub(crate) client: MoonClient,
84    queue_native: bool,
85    /// Set of scopes whose FT indices + graph key have been created on Moon.
86    /// `parking_lot::Mutex` (not `std::sync::Mutex`) per CLAUDE.md lock discipline.
87    /// The lock is NEVER held across an `.await` — it is taken, the bool is checked,
88    /// optionally the scope is inserted, and the lock is dropped BEFORE the async
89    /// Moon calls in `ensure_scope`.
90    initialized_scopes: Arc<Mutex<HashSet<String>>>,
91}
92
93impl MoonStorage {
94    /// Open a connection to Moon at `url` (`moon://host:port[?ws=workspace]`),
95    /// creating FT vector indices at the default dimension
96    /// ([`client::DEFAULT_VECTOR_DIM`] = 768, matching EmbeddingGemma-300M).
97    pub async fn connect(url: &str) -> Result<Self, StorageError> {
98        Self::connect_with_dim(url, crate::client::DEFAULT_VECTOR_DIM).await
99    }
100
101    /// Like [`MoonStorage::connect`], but creates the FT vector indices at
102    /// `dim` instead of the default 768. `dim` MUST be `> 0`. Moon's
103    /// `FT.CREATE` has no upper cap, so a 1536-d embedder (OpenAI
104    /// `text-embedding-3`) works against Moon out of the box.
105    ///
106    /// ## Operator footgun — existing index won't auto-resize
107    ///
108    /// Moon's `FT.CREATE` is idempotent and does NOT update an existing
109    /// index's schema. If a Moon instance already holds a 768-d `chunks`
110    /// index from a prior run, reopening with a 1536-d embedder leaves the
111    /// 768-d index in place; the mismatch surfaces only on the first vector
112    /// write. Drop the stale index first (`FT.DROPINDEX <name>`).
113    pub async fn connect_with_dim(url: &str, dim: usize) -> Result<Self, StorageError> {
114        let client = MoonClient::connect_with_dim(url, dim).await?;
115        let queue_native = crate::queue::supports_native_queue(&client).await?;
116        Ok(Self { client, queue_native, initialized_scopes: Arc::new(Mutex::new(HashSet::new())) })
117    }
118
119    /// Borrow the underlying client (used by integration tests).
120    pub fn client(&self) -> &MoonClient {
121        &self.client
122    }
123
124    /// Lazily ensure per-scope FT indices and graph key exist on Moon.
125    ///
126    /// On first call for a given scope, creates:
127    /// - `FT.CREATE lunaris_{scope}_{kind}_idx` for chunks / entities / facts / communities
128    /// - `GRAPH.CREATE lunaris_{scope}_graph`
129    ///
130    /// Idempotent: "already exists" errors from Moon are swallowed. Subsequent calls
131    /// for the same scope return immediately (in-memory set check, no Moon I/O).
132    ///
133    /// ## Lock discipline
134    ///
135    /// The `Mutex` is locked only to read/write the `HashSet<String>` — it is
136    /// dropped BEFORE any `.await` call so it is NEVER held across an await point.
137    async fn ensure_scope(&self, scope: &Scope) -> Result<(), StorageError> {
138        let scope_str = scope.as_str().to_string();
139
140        // Fast path: scope already initialized — lock, check, drop.
141        {
142            let guard = self.initialized_scopes.lock();
143            if guard.contains(&scope_str) {
144                return Ok(());
145            }
146        } // lock dropped here
147
148        // Slow path: create FT indices and graph on Moon.
149        self.create_scope_indexes(scope).await?;
150
151        // Mark initialized — lock, insert, drop.
152        {
153            let mut guard = self.initialized_scopes.lock();
154            guard.insert(scope_str);
155        } // lock dropped here
156
157        Ok(())
158    }
159
160    /// Create per-scope FT indices and graph key. Called at most once per scope
161    /// (guarded by `ensure_scope`'s in-memory set).
162    async fn create_scope_indexes(&self, scope: &Scope) -> Result<(), StorageError> {
163        // Single-sourced: the FT vector dimension is configured once on the
164        // underlying client (`connect`/`connect_with_dim`); per-scope indices
165        // inherit it — and the `?quant=` choice — so engine-level
166        // `Lunaris::open` sizing flows through here. Schema construction is
167        // shared with the legacy global `ensure_indexes` via
168        // `client::create_lunaris_index_named` so the two sites can never
169        // diverge.
170        let dim = self.client.dim;
171        let typed = self.client.typed();
172
173        for kind in &["chunks", "entities", "facts", "communities"] {
174            let idx_name = ft_index_name(scope, kind);
175            // The FT prefix must match the key shape written by `atomic.rs::VectorUpsert`:
176            // `{ft_index_name(scope, kind)}:{id_hex}`.
177            let prefix = format!("{idx_name}:");
178            crate::client::create_lunaris_index_named(
179                &typed,
180                &idx_name,
181                kind,
182                &prefix,
183                dim,
184                self.client.quantization,
185            )
186            .await?;
187        }
188
189        // Create per-scope graph. Moon does not auto-create graphs on first GRAPH.QUERY.
190        let gkey = graph_key(scope);
191        let typed = self.client.typed();
192        match typed.graph().create(&gkey).await {
193            Ok(_) => {}
194            Err(e) => {
195                let msg = e.to_string();
196                if !(msg.contains("already exists") || msg.contains("Graph already exists")) {
197                    return Err(crate::client::moon_err(e));
198                }
199            }
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl StoragePort for MoonStorage {
208    /// RFC 0001 Wave 1C: lazy per-scope init before writing, then route all ops
209    /// through scope-prefixed keys / indices.
210    async fn atomic_write(&self, scope: &Scope, ops: &[WriteOp]) -> Result<Lsn, StorageError> {
211        self.ensure_scope(scope).await?;
212        crate::atomic::atomic_write(&self.client, scope, ops).await
213    }
214
215    /// observability-rollout-maturity — override the additive default with a
216    /// real Moon `PING` so a dead/stalled backend surfaces as `Err` on the
217    /// `/healthz` rollout-cutback probe. Bounded by `LUNARIS_MOON_OP_TIMEOUT`.
218    async fn health_check(&self) -> Result<(), StorageError> {
219        self.client.ping().await
220    }
221
222    #[allow(clippy::too_many_arguments)]
223    async fn vector_search(
224        &self,
225        scope: &Scope,
226        index: &str,
227        query: &[f32],
228        k: usize,
229        filter: Option<&Filter>,
230        as_of: Option<Hlc>,
231        rerank: bool,
232    ) -> Result<Vec<VectorHit>, StorageError> {
233        crate::vector::vector_search(&self.client, scope, index, query, k, filter, as_of, rerank)
234            .await
235    }
236
237    async fn graph_traverse(
238        &self,
239        scope: &Scope,
240        query: &CypherQuery,
241        as_of: Option<Hlc>,
242    ) -> Result<GraphResult, StorageError> {
243        crate::graph::graph_traverse(&self.client, scope, query, as_of).await
244    }
245
246    async fn graph_traverse_decayed(
247        &self,
248        scope: &Scope,
249        query: &CypherQuery,
250        as_of: Option<Hlc>,
251        decay: Option<&GraphDecay>,
252    ) -> Result<GraphResult, StorageError> {
253        match decay {
254            None => crate::graph::graph_traverse(&self.client, scope, query, as_of).await,
255            Some(d) => {
256                crate::graph::graph_traverse_decayed(&self.client, scope, query, as_of, d).await
257            }
258        }
259    }
260
261    async fn vector_navigate(
262        &self,
263        scope: &Scope,
264        index: &str,
265        query: &[f32],
266        k: usize,
267        spec: &NavigateSpec,
268    ) -> Result<Vec<NavigateHit>, StorageError> {
269        crate::navigate::vector_navigate(&self.client, scope, index, query, k, spec).await
270    }
271
272    async fn hot_keys(&self, count: usize) -> Result<Vec<lunaris_core::HotKey>, StorageError> {
273        crate::hotkeys::hot_keys(&self.client, count).await
274    }
275
276    async fn scan_range(
277        &self,
278        scope: &Scope,
279        prefix: &[u8],
280        as_of: Option<Hlc>,
281    ) -> Result<BoxStream<'_, Result<(Bytes, Bytes), StorageError>>, StorageError> {
282        crate::kv::scan_range(&self.client, scope, prefix, as_of).await
283    }
284
285    async fn read_as_of(
286        &self,
287        scope: &Scope,
288        key: &[u8],
289        as_of: Hlc,
290    ) -> Result<Option<Row<Bytes>>, StorageError> {
291        crate::kv::read_as_of(&self.client, scope, key, as_of).await
292    }
293
294    async fn publish(
295        &self,
296        scope: &Scope,
297        topic: &str,
298        partition: u16,
299        payload: Bytes,
300    ) -> Result<u64, StorageError> {
301        crate::queue::publish(&self.client, scope, topic, partition, payload).await
302    }
303
304    async fn subscribe(
305        &self,
306        scope: &Scope,
307        group: &str,
308        topic: &str,
309        partition: u16,
310    ) -> Result<BoxStream<'static, Result<QueueMsg, StorageError>>, StorageError> {
311        crate::queue::subscribe(self.client.clone(), scope, group, topic, partition).await
312    }
313
314    /// Plan 04 D-12 — see `crate::queue::queue_length` (private) for the raw
315    /// `MQ.LENGTH` escape hatch rationale.
316    async fn queue_depth(
317        &self,
318        scope: &Scope,
319        topic: &str,
320        partition: u16,
321    ) -> Result<u64, StorageError> {
322        crate::queue::queue_length(&self.client, scope, topic, partition).await
323    }
324
325    /// Cross-scope enumeration via `SCAN MATCH lunaris:*` + key parse.
326    /// Q-U2 lock — lazy SCAN-derived. See `crate::scopes` for the cursor
327    /// model and the Moon-SCAN-cursor vs scope-string-cursor tradeoff.
328    async fn list_scopes(
329        &self,
330        prefix: Option<&str>,
331        limit: usize,
332        cursor: Option<&str>,
333    ) -> Result<ScopePage, StorageError> {
334        crate::scopes::list_scopes(&self.client, prefix, limit, cursor).await
335    }
336
337    /// Bulk-invalidate FT index records via `FT.INVALIDATE_RANGE`.
338    ///
339    /// ## Wire shape
340    ///
341    /// ```text
342    /// FT.INVALIDATE_RANGE <index> <node_id_field> <node_id_value>
343    ///                     <hlc_wall_field> <hlc_wall_lo> <hlc_wall_hi>
344    /// ```
345    ///
346    /// Returns the integer count of deleted records as `u64`.
347    ///
348    /// ## Escape hatch
349    ///
350    /// `moon-client` v0.1.x does not expose a typed wrapper for
351    /// `FT.INVALIDATE_RANGE`. We reach the underlying
352    /// `redis::aio::MultiplexedConnection` via `MoonClient::inner_mut()` on a
353    /// local clone — the same documented pattern used by the HSCAN escape hatch
354    /// in `kv.rs` (the only other permitted raw-RESP site in this crate per
355    /// Phase 1.5 STORE-09 constraints).
356    ///
357    /// ## Error mapping
358    ///
359    /// - Moon `WRONGTYPE` (index does not exist) → `StorageError::Backend`
360    ///   containing `"WRONGTYPE"`. The `Lunaris::invalidate_range` fan-out
361    ///   treats this as warn-and-skip (degraded mode).
362    /// - Any other Moon error → `StorageError::Backend`.
363    #[allow(clippy::too_many_arguments)]
364    async fn invalidate_range(
365        &self,
366        scope: &Scope,
367        index: &str,
368        node_id_field: &str,
369        node_id_value: &str,
370        hlc_wall_field: &str,
371        hlc_wall_lo_inclusive: i64,
372        hlc_wall_hi_inclusive: i64,
373    ) -> Result<u64, StorageError> {
374        crate::invalidate::invalidate_range(
375            &self.client,
376            scope,
377            index,
378            node_id_field,
379            node_id_value,
380            hlc_wall_field,
381            hlc_wall_lo_inclusive,
382            hlc_wall_hi_inclusive,
383        )
384        .await
385    }
386
387    fn capabilities(&self) -> StorageCapabilities {
388        StorageCapabilities {
389            // Moon supports AS_OF for FT.SEARCH (vector + keyword) and VALID_AT
390            // for GRAPH.QUERY, but plain HGET does NOT accept temporal clauses.
391            // Per moon/docs/guides/temporal.mdx: "Bi-temporal fields are
392            // currently limited to graph entities (nodes/edges). KV temporal
393            // versioning uses a sparse index" (the sparse index is for
394            // transactional MVCC isolation, NOT for AS_OF reads). Lunaris's
395            // KV `read_as_of` therefore returns current state on Moon —
396            // historical KV reads need a Lunaris-layer versioned-key encoding
397            // (Gap 8 — tracked for follow-up phase). Reporting `false` here
398            // makes downstream consumers route bi-temporal reads to Postgres
399            // (which has native bi-temporal columns) per the dual-backend
400            // contract. Live-measurement gap fix 2026-04-21.
401            bi_temporal_native: false,
402            graph_native: true,
403            rerank_native: true,
404            queue_native: self.queue_native,
405            // Moon's FT.CREATE has no dimension cap — report the dimension the
406            // adapter actually created its indices at (default 768d matching
407            // EmbeddingGemma-300M; `connect_with_dim` / `Lunaris::open` size it
408            // to the embedder). This stays an accurate description of what the
409            // FT `vec` field will accept.
410            max_vector_dim: self.client.dim as u32,
411            // Gap 9 closure (2026-04-21): `ensure_indexes` now declares
412            // `SchemaField::Text("content")` on chunks/entities/facts/communities
413            // and `WriteOp::VectorUpsert` writes the `content` field via
414            // `extract_content_for_index` (mirrors the Postgres
415            // `payload->>'text'/'fact_text'/...` tsvector convention). Moon's
416            // SDK `hybrid_search` (3-weight + sparse_field) therefore resolves
417            // `@content` and `fuse_rrf` opts into `RrfFusion::Moon` for one
418            // round-trip server-side fusion. If the schema regresses (e.g. an
419            // older Moon binary that ignores extra_schema), set this back to
420            // `false` to force the always-correct local fusion path.
421            native_rrf: true,
422            // RFC 0001 §3.6 — Moon's soft FT-index limit is ~512 per node
423            // before recall p99 degrades (Moon docs §6.4). Above this,
424            // operators should consider workspace-level pooling (future RFC).
425            max_scopes_recommended: 512,
426            cypher_dialect: lunaris_core::CypherDialect::Legacy,
427            graph_decay_native: true,
428            graph_navigate_native: true,
429        }
430    }
431}
432
433#[async_trait]
434impl KeywordPort for MoonStorage {
435    /// Wave 2.5A: `KeywordPort::keyword_search` now carries `scope: &Scope`
436    /// (RFC 0001 §3.4 amendment). The Moon backend threads scope through to
437    /// `keyword::keyword_search` which routes to the per-scope FT index
438    /// (`ft_index_name(scope, index)`). Previously this impl used `Scope::dev()`
439    /// as a placeholder — that placeholder is now replaced by the caller-supplied scope.
440    async fn keyword_search(
441        &self,
442        scope: &Scope,
443        index: &str,
444        query: &str,
445        k: usize,
446        filter: Option<&Filter>,
447        as_of: Option<Hlc>,
448    ) -> Result<Vec<KeywordHit>, StorageError> {
449        crate::keyword::keyword_search(&self.client, scope, index, query, k, filter, as_of).await
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456
457    /// Compile-time assertion that `MoonStorage` is dyn-compatible.
458    #[allow(dead_code)]
459    fn _moonstorage_is_storage_port() {
460        fn assert_storage_port<T: StoragePort + ?Sized>() {}
461        assert_storage_port::<MoonStorage>();
462        assert_storage_port::<dyn StoragePort>();
463    }
464
465    #[test]
466    fn capabilities_match_moon_profile() {
467        // We can't construct a real `MoonStorage` without a connection, but we can match
468        // the `capabilities()` body shape directly.
469        let want = StorageCapabilities {
470            bi_temporal_native: false,
471            graph_native: true,
472            rerank_native: true,
473            queue_native: true,
474            max_vector_dim: 768,
475            native_rrf: true,
476            max_scopes_recommended: 512,
477            cypher_dialect: lunaris_core::CypherDialect::Legacy,
478            graph_decay_native: true,
479            graph_navigate_native: true,
480        };
481        assert!(
482            !want.bi_temporal_native,
483            "Moon does not natively support KV bi-temporal reads (HGET ignores AS_OF); only FT.SEARCH AS_OF + GRAPH.QUERY VALID_AT are temporal — Gap 8 fix 2026-04-21"
484        );
485        assert!(want.graph_native);
486        assert!(want.rerank_native);
487        assert!(want.queue_native);
488        assert_eq!(want.max_vector_dim, 768);
489        assert!(
490            want.native_rrf,
491            "Moon HYBRID FT.SEARCH now resolves @content via the SchemaField::Text added by ensure_indexes; fuse_rrf opts into RrfFusion::Moon — Gap 9 closure 2026-04-21"
492        );
493        assert_eq!(
494            want.max_scopes_recommended, 512,
495            "Moon FT soft limit is ~512 indices per node (RFC 0001 §3.6)"
496        );
497    }
498}