lunaris_storage_moon/lib.rs
1//! `MoonStorage` — `StoragePort` impl backed by Moon (Redis-compatible RESP).
2//!
3//! RFC 0001 Wave 1C: every `StoragePort` method now routes through per-scope
4//! keyspace helpers (`keyspace::{scope_prefix, ft_index_name, graph_key, mq_topic}`).
5//! Per-scope FT indices, graph keys, and MQ topics are created lazily on first
6//! write via `ensure_scope`.
7//!
8//! Per blueprint §6, every method is a thin pass-through to a Moon native command:
9//!
10//! | trait method | Moon command(s) |
11//! |-------------------|-------------------------------------------------------------------------|
12//! | `atomic_write` | `TXN.BEGIN` + per-op (`HSET` / `FT.UPSERT` / `GRAPH.QUERY MERGE`) + `TXN.COMMIT` |
13//! | `vector_search` | `FT.SEARCH` (with `TEMPORAL.SNAPSHOT_AT` when `as_of` is `Some`) |
14//! | `graph_traverse` | `GRAPH.QUERY` (with `TEMPORAL.SNAPSHOT_AT` when `as_of` is `Some`) |
15//! | `scan_range` | `SCAN ... MATCH <prefix>*` then `HGET` per matched key |
16//! | `read_as_of` | `TEMPORAL.SNAPSHOT_AT` then `HGET <key> v` |
17//! | `publish` | `MQ.PUSH` |
18//! | `subscribe` | `MQ.POP ... BLOCK` polling stream |
19//! | `capabilities` | constant — Moon-native everything |
20//!
21//! ## Lazy per-scope init
22//!
23//! On first write under a scope, `ensure_scope` creates:
24//! - `FT.CREATE lunaris_{scope}_{kind}_idx` for each of chunks / entities / facts / communities
25//! - `GRAPH.CREATE lunaris_{scope}_graph`
26//!
27//! Subsequent calls for an already-initialized scope skip the Moon round-trips via
28//! an in-memory `initialized_scopes` set (lock-free read path once initialized).
29//!
30//! ## Threat model snapshot (T-01-03-*)
31//!
32//! * `WriteOp::GraphNode { label, ... }` and `WriteOp::GraphEdge { rel, ... }` are
33//! interpolated into Cypher. Callers MUST validate `label` / `rel` against
34//! `^[A-Za-z_][A-Za-z0-9_]*$` — see `crates/lunaris-storage-moon/src/atomic.rs` rustdoc.
35//! Phase 4 (`OPS-04` audit) will move the guard into the trait.
36//! * Connection is cleartext RESP over TCP — Moon is treated as trusted infra inside the
37//! same network boundary as the Lunaris process. TLS lands in Phase 5.
38
39#![deny(rust_2018_idioms, unreachable_pub)]
40#![forbid(unsafe_code)]
41
42pub mod atomic;
43pub mod client;
44pub mod graph;
45// W2-L2 — FT.INVALIDATE_RANGE raw RESP escape hatch (UC-G3 force-push invalidation).
46// pub(crate): only called from `MoonStorage::invalidate_range` in this file.
47pub(crate) mod invalidate;
48pub mod keyspace;
49pub mod keyword;
50pub mod kv;
51// hotkeys-observability — HOTKEYS raw RESP path (typed SDK has no wrapper).
52pub(crate) mod hotkeys;
53// ft-navigate-recall — FT.NAVIGATE raw RESP path (typed SDK lacks a DECAY slot).
54pub(crate) mod navigate;
55pub mod queue;
56pub mod scopes;
57pub mod vector;
58
59pub use client::MoonClient;
60
61use std::collections::HashSet;
62use std::sync::Arc;
63
64use async_trait::async_trait;
65use bytes::Bytes;
66use futures::stream::BoxStream;
67use lunaris_core::{
68 CypherQuery, Filter, GraphDecay, GraphResult, Hlc, KeywordHit, KeywordPort, Lsn, NavigateHit,
69 NavigateSpec, QueueMsg, Row, Scope, ScopePage, StorageCapabilities, StorageError, StoragePort,
70 VectorHit, WriteOp,
71};
72use parking_lot::Mutex;
73
74use crate::keyspace::{ft_index_name, graph_key};
75
76/// `StoragePort` backed by a single Moon RESP connection manager.
77///
78/// `initialized_scopes` tracks which scopes have had their FT indices and graph
79/// key created (lazy init on first write). `Mutex` is held only during the
80/// brief check + insert — never across `.await` points.
81#[derive(Debug, Clone)]
82pub struct MoonStorage {
83 pub(crate) client: MoonClient,
84 queue_native: bool,
85 /// Set of scopes whose FT indices + graph key have been created on Moon.
86 /// `parking_lot::Mutex` (not `std::sync::Mutex`) per CLAUDE.md lock discipline.
87 /// The lock is NEVER held across an `.await` — it is taken, the bool is checked,
88 /// optionally the scope is inserted, and the lock is dropped BEFORE the async
89 /// Moon calls in `ensure_scope`.
90 initialized_scopes: Arc<Mutex<HashSet<String>>>,
91}
92
93impl MoonStorage {
94 /// Open a connection to Moon at `url` (`moon://host:port[?ws=workspace]`),
95 /// creating FT vector indices at the default dimension
96 /// ([`client::DEFAULT_VECTOR_DIM`] = 768, matching EmbeddingGemma-300M).
97 pub async fn connect(url: &str) -> Result<Self, StorageError> {
98 Self::connect_with_dim(url, crate::client::DEFAULT_VECTOR_DIM).await
99 }
100
101 /// Like [`MoonStorage::connect`], but creates the FT vector indices at
102 /// `dim` instead of the default 768. `dim` MUST be `> 0`. Moon's
103 /// `FT.CREATE` has no upper cap, so a 1536-d embedder (OpenAI
104 /// `text-embedding-3`) works against Moon out of the box.
105 ///
106 /// ## Operator footgun — existing index won't auto-resize
107 ///
108 /// Moon's `FT.CREATE` is idempotent and does NOT update an existing
109 /// index's schema. If a Moon instance already holds a 768-d `chunks`
110 /// index from a prior run, reopening with a 1536-d embedder leaves the
111 /// 768-d index in place; the mismatch surfaces only on the first vector
112 /// write. Drop the stale index first (`FT.DROPINDEX <name>`).
113 pub async fn connect_with_dim(url: &str, dim: usize) -> Result<Self, StorageError> {
114 let client = MoonClient::connect_with_dim(url, dim).await?;
115 let queue_native = crate::queue::supports_native_queue(&client).await?;
116 Ok(Self { client, queue_native, initialized_scopes: Arc::new(Mutex::new(HashSet::new())) })
117 }
118
119 /// Borrow the underlying client (used by integration tests).
120 pub fn client(&self) -> &MoonClient {
121 &self.client
122 }
123
124 /// Lazily ensure per-scope FT indices and graph key exist on Moon.
125 ///
126 /// On first call for a given scope, creates:
127 /// - `FT.CREATE lunaris_{scope}_{kind}_idx` for chunks / entities / facts / communities
128 /// - `GRAPH.CREATE lunaris_{scope}_graph`
129 ///
130 /// Idempotent: "already exists" errors from Moon are swallowed. Subsequent calls
131 /// for the same scope return immediately (in-memory set check, no Moon I/O).
132 ///
133 /// ## Lock discipline
134 ///
135 /// The `Mutex` is locked only to read/write the `HashSet<String>` — it is
136 /// dropped BEFORE any `.await` call so it is NEVER held across an await point.
137 async fn ensure_scope(&self, scope: &Scope) -> Result<(), StorageError> {
138 let scope_str = scope.as_str().to_string();
139
140 // Fast path: scope already initialized — lock, check, drop.
141 {
142 let guard = self.initialized_scopes.lock();
143 if guard.contains(&scope_str) {
144 return Ok(());
145 }
146 } // lock dropped here
147
148 // Slow path: create FT indices and graph on Moon.
149 self.create_scope_indexes(scope).await?;
150
151 // Mark initialized — lock, insert, drop.
152 {
153 let mut guard = self.initialized_scopes.lock();
154 guard.insert(scope_str);
155 } // lock dropped here
156
157 Ok(())
158 }
159
160 /// Create per-scope FT indices and graph key. Called at most once per scope
161 /// (guarded by `ensure_scope`'s in-memory set).
162 async fn create_scope_indexes(&self, scope: &Scope) -> Result<(), StorageError> {
163 // Single-sourced: the FT vector dimension is configured once on the
164 // underlying client (`connect`/`connect_with_dim`); per-scope indices
165 // inherit it — and the `?quant=` choice — so engine-level
166 // `Lunaris::open` sizing flows through here. Schema construction is
167 // shared with the legacy global `ensure_indexes` via
168 // `client::create_lunaris_index_named` so the two sites can never
169 // diverge.
170 let dim = self.client.dim;
171 let typed = self.client.typed();
172
173 for kind in &["chunks", "entities", "facts", "communities"] {
174 let idx_name = ft_index_name(scope, kind);
175 // The FT prefix must match the key shape written by `atomic.rs::VectorUpsert`:
176 // `{ft_index_name(scope, kind)}:{id_hex}`.
177 let prefix = format!("{idx_name}:");
178 crate::client::create_lunaris_index_named(
179 &typed,
180 &idx_name,
181 kind,
182 &prefix,
183 dim,
184 self.client.quantization,
185 )
186 .await?;
187 }
188
189 // Create per-scope graph. Moon does not auto-create graphs on first GRAPH.QUERY.
190 let gkey = graph_key(scope);
191 let typed = self.client.typed();
192 match typed.graph().create(&gkey).await {
193 Ok(_) => {}
194 Err(e) => {
195 let msg = e.to_string();
196 if !(msg.contains("already exists") || msg.contains("Graph already exists")) {
197 return Err(crate::client::moon_err(e));
198 }
199 }
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl StoragePort for MoonStorage {
208 /// RFC 0001 Wave 1C: lazy per-scope init before writing, then route all ops
209 /// through scope-prefixed keys / indices.
210 async fn atomic_write(&self, scope: &Scope, ops: &[WriteOp]) -> Result<Lsn, StorageError> {
211 self.ensure_scope(scope).await?;
212 crate::atomic::atomic_write(&self.client, scope, ops).await
213 }
214
215 /// observability-rollout-maturity — override the additive default with a
216 /// real Moon `PING` so a dead/stalled backend surfaces as `Err` on the
217 /// `/healthz` rollout-cutback probe. Bounded by `LUNARIS_MOON_OP_TIMEOUT`.
218 async fn health_check(&self) -> Result<(), StorageError> {
219 self.client.ping().await
220 }
221
222 #[allow(clippy::too_many_arguments)]
223 async fn vector_search(
224 &self,
225 scope: &Scope,
226 index: &str,
227 query: &[f32],
228 k: usize,
229 filter: Option<&Filter>,
230 as_of: Option<Hlc>,
231 rerank: bool,
232 ) -> Result<Vec<VectorHit>, StorageError> {
233 crate::vector::vector_search(&self.client, scope, index, query, k, filter, as_of, rerank)
234 .await
235 }
236
237 async fn graph_traverse(
238 &self,
239 scope: &Scope,
240 query: &CypherQuery,
241 as_of: Option<Hlc>,
242 ) -> Result<GraphResult, StorageError> {
243 crate::graph::graph_traverse(&self.client, scope, query, as_of).await
244 }
245
246 async fn graph_traverse_decayed(
247 &self,
248 scope: &Scope,
249 query: &CypherQuery,
250 as_of: Option<Hlc>,
251 decay: Option<&GraphDecay>,
252 ) -> Result<GraphResult, StorageError> {
253 match decay {
254 None => crate::graph::graph_traverse(&self.client, scope, query, as_of).await,
255 Some(d) => {
256 crate::graph::graph_traverse_decayed(&self.client, scope, query, as_of, d).await
257 }
258 }
259 }
260
261 async fn vector_navigate(
262 &self,
263 scope: &Scope,
264 index: &str,
265 query: &[f32],
266 k: usize,
267 spec: &NavigateSpec,
268 ) -> Result<Vec<NavigateHit>, StorageError> {
269 crate::navigate::vector_navigate(&self.client, scope, index, query, k, spec).await
270 }
271
272 async fn hot_keys(&self, count: usize) -> Result<Vec<lunaris_core::HotKey>, StorageError> {
273 crate::hotkeys::hot_keys(&self.client, count).await
274 }
275
276 async fn scan_range(
277 &self,
278 scope: &Scope,
279 prefix: &[u8],
280 as_of: Option<Hlc>,
281 ) -> Result<BoxStream<'_, Result<(Bytes, Bytes), StorageError>>, StorageError> {
282 crate::kv::scan_range(&self.client, scope, prefix, as_of).await
283 }
284
285 async fn read_as_of(
286 &self,
287 scope: &Scope,
288 key: &[u8],
289 as_of: Hlc,
290 ) -> Result<Option<Row<Bytes>>, StorageError> {
291 crate::kv::read_as_of(&self.client, scope, key, as_of).await
292 }
293
294 async fn publish(
295 &self,
296 scope: &Scope,
297 topic: &str,
298 partition: u16,
299 payload: Bytes,
300 ) -> Result<u64, StorageError> {
301 crate::queue::publish(&self.client, scope, topic, partition, payload).await
302 }
303
304 async fn subscribe(
305 &self,
306 scope: &Scope,
307 group: &str,
308 topic: &str,
309 partition: u16,
310 ) -> Result<BoxStream<'static, Result<QueueMsg, StorageError>>, StorageError> {
311 crate::queue::subscribe(self.client.clone(), scope, group, topic, partition).await
312 }
313
314 /// Plan 04 D-12 — see `crate::queue::queue_length` (private) for the raw
315 /// `MQ.LENGTH` escape hatch rationale.
316 async fn queue_depth(
317 &self,
318 scope: &Scope,
319 topic: &str,
320 partition: u16,
321 ) -> Result<u64, StorageError> {
322 crate::queue::queue_length(&self.client, scope, topic, partition).await
323 }
324
325 /// Cross-scope enumeration via `SCAN MATCH lunaris:*` + key parse.
326 /// Q-U2 lock — lazy SCAN-derived. See `crate::scopes` for the cursor
327 /// model and the Moon-SCAN-cursor vs scope-string-cursor tradeoff.
328 async fn list_scopes(
329 &self,
330 prefix: Option<&str>,
331 limit: usize,
332 cursor: Option<&str>,
333 ) -> Result<ScopePage, StorageError> {
334 crate::scopes::list_scopes(&self.client, prefix, limit, cursor).await
335 }
336
337 /// Bulk-invalidate FT index records via `FT.INVALIDATE_RANGE`.
338 ///
339 /// ## Wire shape
340 ///
341 /// ```text
342 /// FT.INVALIDATE_RANGE <index> <node_id_field> <node_id_value>
343 /// <hlc_wall_field> <hlc_wall_lo> <hlc_wall_hi>
344 /// ```
345 ///
346 /// Returns the integer count of deleted records as `u64`.
347 ///
348 /// ## Escape hatch
349 ///
350 /// `moon-client` v0.1.x does not expose a typed wrapper for
351 /// `FT.INVALIDATE_RANGE`. We reach the underlying
352 /// `redis::aio::MultiplexedConnection` via `MoonClient::inner_mut()` on a
353 /// local clone — the same documented pattern used by the HSCAN escape hatch
354 /// in `kv.rs` (the only other permitted raw-RESP site in this crate per
355 /// Phase 1.5 STORE-09 constraints).
356 ///
357 /// ## Error mapping
358 ///
359 /// - Moon `WRONGTYPE` (index does not exist) → `StorageError::Backend`
360 /// containing `"WRONGTYPE"`. The `Lunaris::invalidate_range` fan-out
361 /// treats this as warn-and-skip (degraded mode).
362 /// - Any other Moon error → `StorageError::Backend`.
363 #[allow(clippy::too_many_arguments)]
364 async fn invalidate_range(
365 &self,
366 scope: &Scope,
367 index: &str,
368 node_id_field: &str,
369 node_id_value: &str,
370 hlc_wall_field: &str,
371 hlc_wall_lo_inclusive: i64,
372 hlc_wall_hi_inclusive: i64,
373 ) -> Result<u64, StorageError> {
374 crate::invalidate::invalidate_range(
375 &self.client,
376 scope,
377 index,
378 node_id_field,
379 node_id_value,
380 hlc_wall_field,
381 hlc_wall_lo_inclusive,
382 hlc_wall_hi_inclusive,
383 )
384 .await
385 }
386
387 fn capabilities(&self) -> StorageCapabilities {
388 StorageCapabilities {
389 // Moon supports AS_OF for FT.SEARCH (vector + keyword) and VALID_AT
390 // for GRAPH.QUERY, but plain HGET does NOT accept temporal clauses.
391 // Per moon/docs/guides/temporal.mdx: "Bi-temporal fields are
392 // currently limited to graph entities (nodes/edges). KV temporal
393 // versioning uses a sparse index" (the sparse index is for
394 // transactional MVCC isolation, NOT for AS_OF reads). Lunaris's
395 // KV `read_as_of` therefore returns current state on Moon —
396 // historical KV reads need a Lunaris-layer versioned-key encoding
397 // (Gap 8 — tracked for follow-up phase). Reporting `false` here
398 // makes downstream consumers route bi-temporal reads to Postgres
399 // (which has native bi-temporal columns) per the dual-backend
400 // contract. Live-measurement gap fix 2026-04-21.
401 bi_temporal_native: false,
402 graph_native: true,
403 rerank_native: true,
404 queue_native: self.queue_native,
405 // Moon's FT.CREATE has no dimension cap — report the dimension the
406 // adapter actually created its indices at (default 768d matching
407 // EmbeddingGemma-300M; `connect_with_dim` / `Lunaris::open` size it
408 // to the embedder). This stays an accurate description of what the
409 // FT `vec` field will accept.
410 max_vector_dim: self.client.dim as u32,
411 // Gap 9 closure (2026-04-21): `ensure_indexes` now declares
412 // `SchemaField::Text("content")` on chunks/entities/facts/communities
413 // and `WriteOp::VectorUpsert` writes the `content` field via
414 // `extract_content_for_index` (mirrors the Postgres
415 // `payload->>'text'/'fact_text'/...` tsvector convention). Moon's
416 // SDK `hybrid_search` (3-weight + sparse_field) therefore resolves
417 // `@content` and `fuse_rrf` opts into `RrfFusion::Moon` for one
418 // round-trip server-side fusion. If the schema regresses (e.g. an
419 // older Moon binary that ignores extra_schema), set this back to
420 // `false` to force the always-correct local fusion path.
421 native_rrf: true,
422 // RFC 0001 §3.6 — Moon's soft FT-index limit is ~512 per node
423 // before recall p99 degrades (Moon docs §6.4). Above this,
424 // operators should consider workspace-level pooling (future RFC).
425 max_scopes_recommended: 512,
426 cypher_dialect: lunaris_core::CypherDialect::Legacy,
427 graph_decay_native: true,
428 graph_navigate_native: true,
429 }
430 }
431}
432
433#[async_trait]
434impl KeywordPort for MoonStorage {
435 /// Wave 2.5A: `KeywordPort::keyword_search` now carries `scope: &Scope`
436 /// (RFC 0001 §3.4 amendment). The Moon backend threads scope through to
437 /// `keyword::keyword_search` which routes to the per-scope FT index
438 /// (`ft_index_name(scope, index)`). Previously this impl used `Scope::dev()`
439 /// as a placeholder — that placeholder is now replaced by the caller-supplied scope.
440 async fn keyword_search(
441 &self,
442 scope: &Scope,
443 index: &str,
444 query: &str,
445 k: usize,
446 filter: Option<&Filter>,
447 as_of: Option<Hlc>,
448 ) -> Result<Vec<KeywordHit>, StorageError> {
449 crate::keyword::keyword_search(&self.client, scope, index, query, k, filter, as_of).await
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456
457 /// Compile-time assertion that `MoonStorage` is dyn-compatible.
458 #[allow(dead_code)]
459 fn _moonstorage_is_storage_port() {
460 fn assert_storage_port<T: StoragePort + ?Sized>() {}
461 assert_storage_port::<MoonStorage>();
462 assert_storage_port::<dyn StoragePort>();
463 }
464
465 #[test]
466 fn capabilities_match_moon_profile() {
467 // We can't construct a real `MoonStorage` without a connection, but we can match
468 // the `capabilities()` body shape directly.
469 let want = StorageCapabilities {
470 bi_temporal_native: false,
471 graph_native: true,
472 rerank_native: true,
473 queue_native: true,
474 max_vector_dim: 768,
475 native_rrf: true,
476 max_scopes_recommended: 512,
477 cypher_dialect: lunaris_core::CypherDialect::Legacy,
478 graph_decay_native: true,
479 graph_navigate_native: true,
480 };
481 assert!(
482 !want.bi_temporal_native,
483 "Moon does not natively support KV bi-temporal reads (HGET ignores AS_OF); only FT.SEARCH AS_OF + GRAPH.QUERY VALID_AT are temporal — Gap 8 fix 2026-04-21"
484 );
485 assert!(want.graph_native);
486 assert!(want.rerank_native);
487 assert!(want.queue_native);
488 assert_eq!(want.max_vector_dim, 768);
489 assert!(
490 want.native_rrf,
491 "Moon HYBRID FT.SEARCH now resolves @content via the SchemaField::Text added by ensure_indexes; fuse_rrf opts into RrfFusion::Moon — Gap 9 closure 2026-04-21"
492 );
493 assert_eq!(
494 want.max_scopes_recommended, 512,
495 "Moon FT soft limit is ~512 indices per node (RFC 0001 §3.6)"
496 );
497 }
498}