Skip to main content

rmcp_memex/storage/
mod.rs

1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4    Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
5    UInt8Array,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::Serialize;
14use serde_json::{Value, json};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tracing::{debug, info};
18
19use crate::rag::SliceLayer;
20
21/// Schema version for LanceDB tables. Increment when changing table structure.
22/// Version 2: Added onion slice fields (layer, parent_id, children_ids, keywords)
23/// Version 3: Added content_hash for exact-match deduplication
24/// See docs/MIGRATION.md for migration procedures.
25pub const SCHEMA_VERSION: u32 = 3;
26
27// =============================================================================
28// STORAGE BACKEND INTERFACE
29// =============================================================================
30//
31// To add a new storage backend, implement a struct with the following methods:
32//
33//   async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()>
34//   async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>>
35//   async fn search(&self, namespace: Option<&str>, embedding: &[f32], k: usize) -> Result<Vec<ChromaDocument>>
36//   async fn delete(&self, namespace: &str, id: &str) -> Result<usize>
37//   async fn delete_namespace(&self, namespace: &str) -> Result<usize>
38//
39// Current implementation:
40//   - `StorageManager`: LanceDB embedded vector store
41//
42// Future alternatives to consider:
43//   - Qdrant, Milvus, Pinecone (external vector DBs)
44//   - SQLite with vector extension
45// =============================================================================
46
47#[derive(Debug, Serialize, Clone)]
48pub struct ChromaDocument {
49    pub id: String,
50    pub namespace: String,
51    pub embedding: Vec<f32>,
52    pub metadata: serde_json::Value,
53    pub document: String,
54    /// Onion slice layer (1=Outer, 2=Middle, 3=Inner, 4=Core, 0=legacy flat)
55    pub layer: u8,
56    /// Parent slice ID in the onion hierarchy (None for Core slices)
57    pub parent_id: Option<String>,
58    /// Children slice IDs in the onion hierarchy
59    pub children_ids: Vec<String>,
60    /// Extracted keywords for this slice
61    pub keywords: Vec<String>,
62    /// SHA256 hash of original content for exact-match deduplication
63    pub content_hash: Option<String>,
64}
65
66impl ChromaDocument {
67    /// Create a new document with default (legacy) slice values
68    pub fn new_flat(
69        id: String,
70        namespace: String,
71        embedding: Vec<f32>,
72        metadata: serde_json::Value,
73        document: String,
74    ) -> Self {
75        Self {
76            id,
77            namespace,
78            embedding,
79            metadata,
80            document,
81            layer: 0, // Legacy flat mode
82            parent_id: None,
83            children_ids: vec![],
84            keywords: vec![],
85            content_hash: None,
86        }
87    }
88
89    /// Create a new document with content hash for deduplication
90    pub fn new_flat_with_hash(
91        id: String,
92        namespace: String,
93        embedding: Vec<f32>,
94        metadata: serde_json::Value,
95        document: String,
96        content_hash: String,
97    ) -> Self {
98        Self {
99            id,
100            namespace,
101            embedding,
102            metadata,
103            document,
104            layer: 0,
105            parent_id: None,
106            children_ids: vec![],
107            keywords: vec![],
108            content_hash: Some(content_hash),
109        }
110    }
111
112    /// Create a document from an onion slice
113    pub fn from_onion_slice(
114        slice: &crate::rag::OnionSlice,
115        namespace: String,
116        embedding: Vec<f32>,
117        metadata: serde_json::Value,
118    ) -> Self {
119        Self {
120            id: slice.id.clone(),
121            namespace,
122            embedding,
123            metadata,
124            document: slice.content.clone(),
125            layer: slice.layer.as_u8(),
126            parent_id: slice.parent_id.clone(),
127            children_ids: slice.children_ids.clone(),
128            keywords: slice.keywords.clone(),
129            content_hash: None,
130        }
131    }
132
133    /// Create a document from an onion slice with content hash for deduplication
134    pub fn from_onion_slice_with_hash(
135        slice: &crate::rag::OnionSlice,
136        namespace: String,
137        embedding: Vec<f32>,
138        metadata: serde_json::Value,
139        content_hash: String,
140    ) -> Self {
141        Self {
142            id: slice.id.clone(),
143            namespace,
144            embedding,
145            metadata,
146            document: slice.content.clone(),
147            layer: slice.layer.as_u8(),
148            parent_id: slice.parent_id.clone(),
149            children_ids: slice.children_ids.clone(),
150            keywords: slice.keywords.clone(),
151            content_hash: Some(content_hash),
152        }
153    }
154
155    /// Check if this is a legacy flat chunk (not an onion slice)
156    pub fn is_flat(&self) -> bool {
157        self.layer == 0
158    }
159
160    /// Get the slice layer if this is an onion slice
161    pub fn slice_layer(&self) -> Option<SliceLayer> {
162        SliceLayer::from_u8(self.layer)
163    }
164}
165
166pub struct StorageManager {
167    lance: Connection,
168    table: Arc<Mutex<Option<Table>>>,
169    collection_name: String,
170    lance_path: String,
171}
172
173type BatchIter =
174    RecordBatchIterator<std::vec::IntoIter<std::result::Result<RecordBatch, ArrowError>>>;
175
176impl StorageManager {
177    pub async fn new(db_path: &str) -> Result<Self> {
178        // Embedded LanceDB path (expand ~, allow override via env)
179        let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
180        let lance_path = if lance_env.trim().is_empty() {
181            shellexpand::tilde("~/.rmcp-servers/rmcp-memex/lancedb").to_string()
182        } else {
183            shellexpand::tilde(&lance_env).to_string()
184        };
185
186        let lance = connect(&lance_path).execute().await?;
187
188        Ok(Self {
189            lance,
190            table: Arc::new(Mutex::new(None)),
191            collection_name: "mcp_documents".to_string(),
192            lance_path,
193        })
194    }
195
196    /// Create a storage manager for CLI tools.
197    /// Use this for CLI tools that only need vector operations (index/search).
198    pub async fn new_lance_only(db_path: &str) -> Result<Self> {
199        let lance_path = shellexpand::tilde(db_path).to_string();
200        let lance = connect(&lance_path).execute().await?;
201
202        Ok(Self {
203            lance,
204            table: Arc::new(Mutex::new(None)),
205            collection_name: "mcp_documents".to_string(),
206            lance_path,
207        })
208    }
209
210    pub fn lance_path(&self) -> &str {
211        &self.lance_path
212    }
213
214    /// Refresh the table connection to see new data written by other processes.
215    /// This clears the cached table reference, forcing it to be re-opened on next query.
216    pub async fn refresh(&self) -> Result<()> {
217        let mut guard = self.table.lock().await;
218        *guard = None;
219        tracing::info!("LanceDB table cache cleared - will refresh on next query");
220        Ok(())
221    }
222
223    pub async fn ensure_collection(&self) -> Result<()> {
224        // Attempt to open; if missing, create empty table lazily on first add
225        let mut guard = self.table.lock().await;
226        if guard.is_some() {
227            return Ok(());
228        }
229        match self
230            .lance
231            .open_table(self.collection_name.as_str())
232            .execute()
233            .await
234        {
235            Ok(table) => {
236                *guard = Some(table);
237                info!("Found existing Lance table '{}'", self.collection_name);
238            }
239            Err(_) => {
240                info!(
241                    "Lance table '{}' will be created on first insert",
242                    self.collection_name
243                );
244            }
245        }
246        Ok(())
247    }
248
249    pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
250        if documents.is_empty() {
251            return Ok(());
252        }
253
254        // Pre-validation: check all documents before writing anything
255        let dim = documents
256            .first()
257            .ok_or_else(|| anyhow!("No documents to add"))?
258            .embedding
259            .len();
260        if dim == 0 {
261            return Err(anyhow!("Embedding dimension is zero"));
262        }
263
264        // Validate ALL documents have consistent dimensions and required fields
265        for (i, doc) in documents.iter().enumerate() {
266            if doc.embedding.len() != dim {
267                return Err(anyhow!(
268                    "Document {} has inconsistent embedding dimension: expected {}, got {}. \
269                     Aborting batch to prevent database corruption.",
270                    i,
271                    dim,
272                    doc.embedding.len()
273                ));
274            }
275            if doc.id.is_empty() {
276                return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
277            }
278            if doc.namespace.is_empty() {
279                return Err(anyhow!(
280                    "Document {} has empty namespace. Aborting batch.",
281                    i
282                ));
283            }
284            // Check for NaN/Inf in embeddings
285            for (j, &val) in doc.embedding.iter().enumerate() {
286                if val.is_nan() || val.is_infinite() {
287                    return Err(anyhow!(
288                        "Document {} has invalid embedding value at index {}: {}. \
289                         Aborting batch to prevent database corruption.",
290                        i,
291                        j,
292                        val
293                    ));
294                }
295            }
296        }
297
298        let table = self.ensure_table(dim).await?;
299        let batch = self.docs_to_batch(&documents, dim)?;
300        table.add(batch).execute().await?;
301        debug!(
302            "Inserted {} documents into Lance (validated)",
303            documents.len()
304        );
305        Ok(())
306    }
307
308    pub async fn search_store(
309        &self,
310        namespace: Option<&str>,
311        embedding: Vec<f32>,
312        k: usize,
313    ) -> Result<Vec<ChromaDocument>> {
314        if embedding.is_empty() {
315            return Ok(vec![]);
316        }
317        let dim = embedding.len();
318        let table = self.ensure_table(dim).await?;
319
320        let mut query = table.query();
321        if let Some(ns) = namespace {
322            query = query.only_if(self.namespace_filter(ns).as_str());
323        }
324        let mut stream = query.nearest_to(embedding)?.limit(k).execute().await?;
325
326        let mut results = Vec::new();
327        while let Some(batch) = stream.try_next().await? {
328            let mut docs = self.batch_to_docs(&batch)?;
329            results.append(&mut docs);
330        }
331        debug!("Lance returned {} results", results.len());
332        Ok(results)
333    }
334
335    /// Return documents without running a vector search.
336    /// Used by admin/reporting paths that need a full table scan without
337    /// assuming any embedding dimension or creating a table on read.
338    pub async fn all_documents(
339        &self,
340        namespace: Option<&str>,
341        limit: usize,
342    ) -> Result<Vec<ChromaDocument>> {
343        let table = match self.ensure_table(0).await {
344            Ok(t) => t,
345            Err(_) => return Ok(vec![]),
346        };
347
348        let mut query = table.query().limit(limit);
349        if let Some(ns) = namespace {
350            query = query.only_if(self.namespace_filter(ns).as_str());
351        }
352        let mut stream = query.execute().await?;
353
354        let mut results = Vec::new();
355        while let Some(batch) = stream.try_next().await? {
356            let mut docs = self.batch_to_docs(&batch)?;
357            results.append(&mut docs);
358        }
359
360        Ok(results)
361    }
362
363    pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
364        let table = match self.ensure_table(0).await {
365            Ok(t) => t,
366            Err(_) => return Ok(None),
367        };
368        let filter = format!(
369            "{} AND {}",
370            self.namespace_filter(namespace),
371            self.id_filter(id)
372        );
373        let mut stream = table
374            .query()
375            .only_if(filter.as_str())
376            .limit(1)
377            .execute()
378            .await?;
379        if let Some(batch) = stream.try_next().await? {
380            let mut docs = self.batch_to_docs(&batch)?;
381            if let Some(doc) = docs.pop() {
382                return Ok(Some(doc));
383            }
384        }
385        Ok(None)
386    }
387
388    pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
389        let table = match self.ensure_table(0).await {
390            Ok(t) => t,
391            Err(_) => return Ok(0),
392        };
393        let predicate = format!(
394            "{} AND {}",
395            self.namespace_filter(namespace),
396            self.id_filter(id)
397        );
398        let deleted = table.delete(predicate.as_str()).await?;
399        Ok(deleted.version as usize)
400    }
401
402    pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
403        let table = match self.ensure_table(0).await {
404            Ok(t) => t,
405            Err(_) => return Ok(0),
406        };
407        let predicate = self.namespace_filter(namespace);
408        let deleted = table.delete(predicate.as_str()).await?;
409        Ok(deleted.version as usize)
410    }
411
412    pub fn get_collection_name(&self) -> &str {
413        &self.collection_name
414    }
415
416    async fn ensure_table(&self, dim: usize) -> Result<Table> {
417        let mut guard = self.table.lock().await;
418        if let Some(table) = guard.as_ref() {
419            return Ok(table.clone());
420        }
421
422        let maybe_table = self
423            .lance
424            .open_table(self.collection_name.as_str())
425            .execute()
426            .await;
427
428        let table = if let Ok(tbl) = maybe_table {
429            tbl
430        } else {
431            if dim == 0 {
432                return Err(anyhow!(
433                    "Vector table '{}' not found and dimension is unknown",
434                    self.collection_name
435                ));
436            }
437            info!(
438                "Creating Lance table '{}' with vector dimension {} (schema v{})",
439                self.collection_name, dim, SCHEMA_VERSION
440            );
441            let schema = Arc::new(Self::create_schema(dim));
442            self.lance
443                .create_empty_table(self.collection_name.as_str(), schema)
444                .execute()
445                .await?
446        };
447
448        *guard = Some(table.clone());
449        Ok(table)
450    }
451
452    async fn open_existing_table(&self) -> Result<Table> {
453        self.ensure_table(0).await.map_err(|_| {
454            anyhow!(
455                "Vector table '{}' not found at {}. Index data first so rmcp-memex can use the stored embedding dimension instead of guessing.",
456                self.collection_name,
457                self.lance_path
458            )
459        })
460    }
461
462    /// Create the LanceDB schema with onion slice fields and content hash
463    fn create_schema(dim: usize) -> Schema {
464        Schema::new(vec![
465            Field::new("id", DataType::Utf8, false),
466            Field::new("namespace", DataType::Utf8, false),
467            Field::new(
468                "vector",
469                DataType::FixedSizeList(
470                    Arc::new(Field::new("item", DataType::Float32, true)),
471                    dim as i32,
472                ),
473                false,
474            ),
475            Field::new("text", DataType::Utf8, true),
476            Field::new("metadata", DataType::Utf8, true),
477            // Onion slice fields (v2 schema)
478            Field::new("layer", DataType::UInt8, true), // 0=flat, 1=outer, 2=middle, 3=inner, 4=core
479            Field::new("parent_id", DataType::Utf8, true), // Parent slice ID
480            Field::new("children_ids", DataType::Utf8, true), // JSON array of children IDs
481            Field::new("keywords", DataType::Utf8, true), // JSON array of keywords
482            // Deduplication field (v3 schema)
483            Field::new("content_hash", DataType::Utf8, true), // SHA256 hash for exact-match dedup
484        ])
485    }
486
487    fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
488        let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
489        let namespaces = documents
490            .iter()
491            .map(|d| d.namespace.as_str())
492            .collect::<Vec<_>>();
493        let texts = documents
494            .iter()
495            .map(|d| d.document.as_str())
496            .collect::<Vec<_>>();
497        let metadata_strings = documents
498            .iter()
499            .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
500            .collect::<Vec<_>>();
501
502        let vectors = documents.iter().map(|d| {
503            if d.embedding.len() != dim {
504                None
505            } else {
506                Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
507            }
508        });
509
510        // Onion slice fields
511        let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
512        let parent_ids: Vec<Option<&str>> =
513            documents.iter().map(|d| d.parent_id.as_deref()).collect();
514        let children_ids_json: Vec<String> = documents
515            .iter()
516            .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
517            .collect();
518        let keywords_json: Vec<String> = documents
519            .iter()
520            .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
521            .collect();
522        // Content hash for deduplication
523        let content_hashes: Vec<Option<&str>> = documents
524            .iter()
525            .map(|d| d.content_hash.as_deref())
526            .collect();
527
528        let schema = Arc::new(Self::create_schema(dim));
529
530        let batch = RecordBatch::try_new(
531            schema.clone(),
532            vec![
533                Arc::new(StringArray::from(ids)),
534                Arc::new(StringArray::from(namespaces)),
535                Arc::new(
536                    FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
537                        vectors, dim as i32,
538                    ),
539                ),
540                Arc::new(StringArray::from(texts)),
541                Arc::new(StringArray::from(metadata_strings)),
542                // Onion slice fields
543                Arc::new(UInt8Array::from(layers)),
544                Arc::new(StringArray::from(parent_ids)),
545                Arc::new(StringArray::from(
546                    children_ids_json
547                        .iter()
548                        .map(|s| s.as_str())
549                        .collect::<Vec<_>>(),
550                )),
551                Arc::new(StringArray::from(
552                    keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
553                )),
554                // Content hash for deduplication
555                Arc::new(StringArray::from(content_hashes)),
556            ],
557        )?;
558
559        Ok(RecordBatchIterator::new(
560            vec![Ok(batch)].into_iter(),
561            schema,
562        ))
563    }
564
565    fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
566        let id_col = batch
567            .column_by_name("id")
568            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
569            .ok_or_else(|| anyhow!("Missing id column"))?;
570        let ns_col = batch
571            .column_by_name("namespace")
572            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
573            .ok_or_else(|| anyhow!("Missing namespace column"))?;
574        let text_col = batch
575            .column_by_name("text")
576            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
577            .ok_or_else(|| anyhow!("Missing text column"))?;
578        let metadata_col = batch
579            .column_by_name("metadata")
580            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
581            .ok_or_else(|| anyhow!("Missing metadata column"))?;
582        let vector_col = batch
583            .column_by_name("vector")
584            .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
585            .ok_or_else(|| anyhow!("Missing vector column"))?;
586
587        // Onion slice fields (optional for backward compatibility with v1 schema)
588        let layer_col = batch
589            .column_by_name("layer")
590            .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
591        let parent_id_col = batch
592            .column_by_name("parent_id")
593            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
594        let children_ids_col = batch
595            .column_by_name("children_ids")
596            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
597        let keywords_col = batch
598            .column_by_name("keywords")
599            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
600        // Content hash field (optional for backward compatibility with v2 schema)
601        let content_hash_col = batch
602            .column_by_name("content_hash")
603            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
604
605        let dim = vector_col.value_length() as usize;
606        let values = vector_col
607            .values()
608            .as_any()
609            .downcast_ref::<Float32Array>()
610            .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
611
612        let mut docs = Vec::new();
613        for i in 0..batch.num_rows() {
614            let id = id_col.value(i).to_string();
615            let text = text_col.value(i).to_string();
616            let namespace = ns_col.value(i).to_string();
617            let meta_str = metadata_col.value(i);
618            let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
619
620            let offset = i * dim;
621            let mut emb = Vec::with_capacity(dim);
622            for j in 0..dim {
623                emb.push(values.value(offset + j));
624            }
625
626            // Read onion slice fields (with v1 schema compatibility)
627            let layer = layer_col
628                .and_then(|col| {
629                    if col.is_null(i) {
630                        None
631                    } else {
632                        Some(col.value(i))
633                    }
634                })
635                .unwrap_or(0);
636
637            let parent_id = parent_id_col.and_then(|col| {
638                if col.is_null(i) {
639                    None
640                } else {
641                    Some(col.value(i).to_string())
642                }
643            });
644
645            let children_ids: Vec<String> = children_ids_col
646                .and_then(|col| {
647                    if col.is_null(i) {
648                        None
649                    } else {
650                        serde_json::from_str(col.value(i)).ok()
651                    }
652                })
653                .unwrap_or_default();
654
655            let keywords: Vec<String> = keywords_col
656                .and_then(|col| {
657                    if col.is_null(i) {
658                        None
659                    } else {
660                        serde_json::from_str(col.value(i)).ok()
661                    }
662                })
663                .unwrap_or_default();
664
665            let content_hash = content_hash_col.and_then(|col| {
666                if col.is_null(i) {
667                    None
668                } else {
669                    Some(col.value(i).to_string())
670                }
671            });
672
673            docs.push(ChromaDocument {
674                id,
675                namespace,
676                embedding: emb,
677                metadata,
678                document: text,
679                layer,
680                parent_id,
681                children_ids,
682                keywords,
683                content_hash,
684            });
685        }
686        Ok(docs)
687    }
688
689    pub async fn get_filtered_in_namespace(
690        &self,
691        namespace: &str,
692        filter: &str,
693    ) -> Result<Vec<ChromaDocument>> {
694        let table = match self.ensure_table(0).await {
695            Ok(t) => t,
696            Err(_) => return Ok(vec![]),
697        };
698        let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
699        let mut stream = table.query().only_if(combined.as_str()).execute().await?;
700        let mut results = Vec::new();
701        while let Some(batch) = stream.try_next().await? {
702            let mut docs = self.batch_to_docs(&batch)?;
703            results.append(&mut docs);
704        }
705        Ok(results)
706    }
707
708    /// Search with optional layer filtering for onion slice architecture
709    pub async fn search_store_with_layer(
710        &self,
711        namespace: Option<&str>,
712        embedding: Vec<f32>,
713        k: usize,
714        layer_filter: Option<SliceLayer>,
715    ) -> Result<Vec<ChromaDocument>> {
716        if embedding.is_empty() {
717            return Ok(vec![]);
718        }
719        let dim = embedding.len();
720        let table = self.ensure_table(dim).await?;
721
722        let mut query = table.query();
723
724        // Build combined filter
725        let mut filters = Vec::new();
726        if let Some(ns) = namespace {
727            filters.push(self.namespace_filter(ns));
728        }
729        if let Some(layer) = layer_filter {
730            filters.push(self.layer_filter(layer));
731        }
732
733        if !filters.is_empty() {
734            let combined = filters.join(" AND ");
735            query = query.only_if(combined.as_str());
736        }
737
738        let mut stream = query.nearest_to(embedding)?.limit(k).execute().await?;
739
740        let mut results = Vec::new();
741        while let Some(batch) = stream.try_next().await? {
742            let mut docs = self.batch_to_docs(&batch)?;
743            results.append(&mut docs);
744        }
745        debug!(
746            "Lance returned {} results (layer filter: {:?})",
747            results.len(),
748            layer_filter
749        );
750        Ok(results)
751    }
752
753    /// Get a document by ID and expand to get its children
754    pub async fn get_children(
755        &self,
756        namespace: &str,
757        parent_id: &str,
758    ) -> Result<Vec<ChromaDocument>> {
759        // Ensure table exists
760        let _ = match self.ensure_table(0).await {
761            Ok(t) => t,
762            Err(_) => return Ok(vec![]),
763        };
764
765        // First get the parent document to find children IDs
766        if let Some(parent) = self.get_document(namespace, parent_id).await? {
767            if parent.children_ids.is_empty() {
768                return Ok(vec![]);
769            }
770
771            // Query for all children
772            let mut children = Vec::new();
773            for child_id in &parent.children_ids {
774                if let Some(child) = self.get_document(namespace, child_id).await? {
775                    children.push(child);
776                }
777            }
778            return Ok(children);
779        }
780
781        Ok(vec![])
782    }
783
784    /// Get the parent of a document (drill up in onion hierarchy)
785    pub async fn get_parent(
786        &self,
787        namespace: &str,
788        child_id: &str,
789    ) -> Result<Option<ChromaDocument>> {
790        if let Some(child) = self.get_document(namespace, child_id).await?
791            && let Some(ref parent_id) = child.parent_id
792        {
793            return self.get_document(namespace, parent_id).await;
794        }
795        Ok(None)
796    }
797
798    fn namespace_filter(&self, namespace: &str) -> String {
799        format!("namespace = '{}'", namespace.replace('\'', "''"))
800    }
801
802    fn id_filter(&self, id: &str) -> String {
803        format!("id = '{}'", id.replace('\'', "''"))
804    }
805
806    fn layer_filter(&self, layer: SliceLayer) -> String {
807        if layer == SliceLayer::Outer {
808            // Default search should surface onion summaries while still seeing legacy flat chunks.
809            "(layer = 0 OR layer = 1)".to_string()
810        } else {
811            format!("layer = {}", layer.as_u8())
812        }
813    }
814
815    fn content_hash_filter(&self, hash: &str) -> String {
816        format!("content_hash = '{}'", hash.replace('\'', "''"))
817    }
818
819    /// Check if the table schema has content_hash column (schema v3+)
820    async fn table_has_content_hash(table: &Table) -> bool {
821        table
822            .schema()
823            .await
824            .map(|schema| schema.field_with_name("content_hash").is_ok())
825            .unwrap_or(false)
826    }
827
828    /// Check if a content hash already exists in a namespace (for exact-match deduplication)
829    ///
830    /// Returns Ok(false) if:
831    /// - Table doesn't exist yet
832    /// - Table has old schema without content_hash column (graceful degradation)
833    pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
834        let table = match self.ensure_table(0).await {
835            Ok(t) => t,
836            Err(_) => return Ok(false), // Table doesn't exist yet, no duplicates possible
837        };
838
839        // Graceful handling of old schema without content_hash column
840        if !Self::table_has_content_hash(&table).await {
841            tracing::warn!(
842                "Table '{}' has old schema without content_hash column. \
843                 Deduplication disabled. Consider re-indexing with new schema.",
844                self.collection_name
845            );
846            return Ok(false); // Can't check for duplicates, treat as new
847        }
848
849        let filter = format!(
850            "{} AND {}",
851            self.namespace_filter(namespace),
852            self.content_hash_filter(hash)
853        );
854
855        let mut stream = table
856            .query()
857            .only_if(filter.as_str())
858            .limit(1)
859            .execute()
860            .await?;
861
862        if let Some(batch) = stream.try_next().await? {
863            return Ok(batch.num_rows() > 0);
864        }
865
866        Ok(false)
867    }
868
869    /// Filter a list of hashes to return only those that don't exist in the namespace.
870    /// This is more efficient than calling has_content_hash for each hash individually.
871    ///
872    /// Returns all hashes as "new" if table has old schema without content_hash column.
873    pub async fn filter_existing_hashes<'a>(
874        &self,
875        namespace: &str,
876        hashes: &'a [String],
877    ) -> Result<Vec<&'a String>> {
878        if hashes.is_empty() {
879            return Ok(vec![]);
880        }
881
882        let table = match self.ensure_table(0).await {
883            Ok(t) => t,
884            Err(_) => return Ok(hashes.iter().collect()), // Table doesn't exist, all are new
885        };
886
887        // Graceful handling of old schema without content_hash column
888        if !Self::table_has_content_hash(&table).await {
889            tracing::warn!(
890                "Table '{}' has old schema without content_hash column. \
891                 Deduplication disabled. Consider re-indexing with new schema.",
892                self.collection_name
893            );
894            return Ok(hashes.iter().collect()); // All are "new" since we can't check
895        }
896
897        // Query for existing hashes in this namespace
898        // We build a filter with OR conditions for all hashes
899        let hash_conditions: Vec<String> =
900            hashes.iter().map(|h| self.content_hash_filter(h)).collect();
901
902        let filter = format!(
903            "{} AND ({})",
904            self.namespace_filter(namespace),
905            hash_conditions.join(" OR ")
906        );
907
908        let mut stream = table
909            .query()
910            .only_if(filter.as_str())
911            .limit(hashes.len())
912            .execute()
913            .await?;
914
915        // Collect existing hashes from results
916        let mut existing_hashes = std::collections::HashSet::new();
917        while let Some(batch) = stream.try_next().await? {
918            if let Some(hash_col) = batch
919                .column_by_name("content_hash")
920                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
921            {
922                for i in 0..batch.num_rows() {
923                    if !hash_col.is_null(i) {
924                        existing_hashes.insert(hash_col.value(i).to_string());
925                    }
926                }
927            }
928        }
929
930        // Return only hashes that don't exist
931        Ok(hashes
932            .iter()
933            .filter(|h| !existing_hashes.contains(h.as_str()))
934            .collect())
935    }
936
937    // =========================================================================
938    // MAINTENANCE OPERATIONS
939    // =========================================================================
940
941    /// Run all optimizations (compact + prune old versions)
942    pub async fn optimize(&self) -> Result<OptimizeStats> {
943        let table = self.open_existing_table().await?;
944        let stats = table.optimize(OptimizeAction::All).await?;
945        info!(
946            "Optimize complete: compaction={:?}, prune={:?}",
947            stats.compaction, stats.prune
948        );
949        Ok(stats)
950    }
951
952    /// Compact small files into larger ones for better performance
953    pub async fn compact(&self) -> Result<OptimizeStats> {
954        let table = self.open_existing_table().await?;
955        let stats = table
956            .optimize(OptimizeAction::Compact {
957                options: Default::default(),
958                remap_options: None,
959            })
960            .await?;
961        info!("Compaction complete: {:?}", stats.compaction);
962        Ok(stats)
963    }
964
965    /// Remove old versions older than specified duration (default: 7 days)
966    pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
967        let table = self.open_existing_table().await?;
968        let days = older_than_days.unwrap_or(7) as i64;
969        let duration = chrono::TimeDelta::days(days);
970        let stats = table
971            .optimize(OptimizeAction::Prune {
972                older_than: Some(duration),
973                delete_unverified: Some(false),
974                error_if_tagged_old_versions: None,
975            })
976            .await?;
977        info!("Cleanup complete: {:?}", stats.prune);
978        Ok(stats)
979    }
980
981    /// Get table statistics (row count, fragments, etc.)
982    pub async fn stats(&self) -> Result<TableStats> {
983        let table = self.open_existing_table().await?;
984        let row_count = table.count_rows(None).await?;
985
986        // Get version count
987        let versions = table.list_versions().await.unwrap_or_default();
988        let version_count = versions.len();
989
990        Ok(TableStats {
991            row_count,
992            version_count,
993            table_name: self.collection_name.clone(),
994            db_path: self.lance_path.clone(),
995        })
996    }
997
998    /// Count rows in a specific namespace
999    pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1000        let table = match self.ensure_table(0).await {
1001            Ok(table) => table,
1002            Err(_) => return Ok(0),
1003        };
1004        let filter = self.namespace_filter(namespace);
1005        let count = table.count_rows(Some(filter)).await?;
1006        Ok(count)
1007    }
1008
1009    /// Get all documents from a namespace (for migration/export)
1010    ///
1011    /// Note: This uses a full table scan with namespace filter.
1012    /// For very large namespaces, consider batching.
1013    pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
1014        let table = match self.ensure_table(0).await {
1015            Ok(t) => t,
1016            Err(_) => return Ok(vec![]), // Table doesn't exist
1017        };
1018
1019        let filter = self.namespace_filter(namespace);
1020        let mut stream = table.query().only_if(filter.as_str()).execute().await?;
1021
1022        let mut results = Vec::new();
1023        while let Some(batch) = stream.try_next().await? {
1024            let mut docs = self.batch_to_docs(&batch)?;
1025            results.append(&mut docs);
1026        }
1027
1028        debug!(
1029            "Retrieved {} documents from namespace '{}'",
1030            results.len(),
1031            namespace
1032        );
1033        Ok(results)
1034    }
1035
1036    /// Check if a namespace exists (has any documents)
1037    pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
1038        let count = self.count_namespace(namespace).await?;
1039        Ok(count > 0)
1040    }
1041}
1042
1043/// Statistics about the LanceDB table
1044#[derive(Debug, Clone, Serialize)]
1045pub struct TableStats {
1046    pub row_count: usize,
1047    pub version_count: usize,
1048    pub table_name: String,
1049    pub db_path: String,
1050}
1051
1052// =============================================================================
1053// GARBAGE COLLECTION
1054// =============================================================================
1055
1056/// Statistics from garbage collection operations
1057#[derive(Debug, Clone, Default, Serialize)]
1058pub struct GcStats {
1059    /// Number of orphan embeddings found (embeddings without valid parent references)
1060    pub orphans_found: usize,
1061    /// Number of orphan embeddings removed
1062    pub orphans_removed: usize,
1063    /// Number of empty namespaces found
1064    pub empty_namespaces_found: usize,
1065    /// Number of empty namespaces removed (documents deleted)
1066    pub empty_namespaces_removed: usize,
1067    /// Number of old documents found (older than threshold)
1068    pub old_docs_found: usize,
1069    /// Number of old documents removed
1070    pub old_docs_removed: usize,
1071    /// Estimated space freed in bytes (if calculable)
1072    pub bytes_freed: Option<u64>,
1073    /// List of namespaces that were empty
1074    pub empty_namespace_names: Vec<String>,
1075    /// List of namespaces affected by old doc cleanup
1076    pub affected_namespaces: Vec<String>,
1077}
1078
1079impl GcStats {
1080    /// Check if any issues were found
1081    pub fn has_issues(&self) -> bool {
1082        self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
1083    }
1084
1085    /// Check if any deletions occurred
1086    pub fn has_deletions(&self) -> bool {
1087        self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
1088    }
1089}
1090
1091/// Configuration for garbage collection
1092#[derive(Debug, Clone)]
1093pub struct GcConfig {
1094    /// Remove orphan embeddings (embeddings with no parent document)
1095    pub remove_orphans: bool,
1096    /// Remove empty namespaces (namespaces with 0 documents)
1097    pub remove_empty: bool,
1098    /// Remove documents older than this duration
1099    pub older_than: Option<chrono::Duration>,
1100    /// Dry run mode - only report what would be removed
1101    pub dry_run: bool,
1102    /// Limit to specific namespace (None = all namespaces)
1103    pub namespace: Option<String>,
1104}
1105
1106impl Default for GcConfig {
1107    fn default() -> Self {
1108        Self {
1109            remove_orphans: false,
1110            remove_empty: false,
1111            older_than: None,
1112            dry_run: true,
1113            namespace: None,
1114        }
1115    }
1116}
1117
1118/// Parse a duration string like "30d", "6m", "1y"
1119pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
1120    let s = s.trim().to_lowercase();
1121    if s.is_empty() {
1122        return Err(anyhow!("Empty duration string"));
1123    }
1124
1125    // Extract numeric part and unit
1126    let (num_str, unit) = if s.ends_with('d') {
1127        (&s[..s.len() - 1], 'd')
1128    } else if s.ends_with('m') {
1129        (&s[..s.len() - 1], 'm')
1130    } else if s.ends_with('y') {
1131        (&s[..s.len() - 1], 'y')
1132    } else {
1133        return Err(anyhow!(
1134            "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
1135            s
1136        ));
1137    };
1138
1139    let num: i64 = num_str.parse().map_err(|_| {
1140        anyhow!(
1141            "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
1142            s
1143        )
1144    })?;
1145
1146    if num <= 0 {
1147        return Err(anyhow!("Duration must be positive, got '{}'", s));
1148    }
1149
1150    match unit {
1151        'd' => Ok(chrono::Duration::days(num)),
1152        'm' => Ok(chrono::Duration::days(num * 30)), // Approximate month
1153        'y' => Ok(chrono::Duration::days(num * 365)), // Approximate year
1154        _ => unreachable!(),
1155    }
1156}
1157
1158impl StorageManager {
1159    // =========================================================================
1160    // GARBAGE COLLECTION OPERATIONS
1161    // =========================================================================
1162
1163    /// Run garbage collection based on configuration
1164    #[doc(alias = "run_gc")]
1165    pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
1166        let mut stats = GcStats::default();
1167
1168        // Get all documents for analysis
1169        let all_docs = self
1170            .all_documents(config.namespace.as_deref(), 1_000_000)
1171            .await?;
1172
1173        if all_docs.is_empty() {
1174            return Ok(stats);
1175        }
1176
1177        // Group documents by namespace
1178        let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
1179            std::collections::HashMap::new();
1180        for doc in &all_docs {
1181            by_namespace
1182                .entry(doc.namespace.clone())
1183                .or_default()
1184                .push(doc);
1185        }
1186
1187        // 1. Find orphan embeddings (documents with parent_id that doesn't exist)
1188        if config.remove_orphans {
1189            let orphan_stats = self
1190                .find_and_remove_orphans(&all_docs, config.dry_run)
1191                .await?;
1192            stats.orphans_found = orphan_stats.0;
1193            stats.orphans_removed = orphan_stats.1;
1194        }
1195
1196        // 2. Find and optionally remove empty namespaces
1197        if config.remove_empty {
1198            let empty_stats = self
1199                .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
1200                .await?;
1201            stats.empty_namespaces_found = empty_stats.0;
1202            stats.empty_namespaces_removed = empty_stats.1;
1203            stats.empty_namespace_names = empty_stats.2;
1204        }
1205
1206        // 3. Find and optionally remove old documents
1207        if let Some(ref duration) = config.older_than {
1208            let old_stats = self
1209                .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
1210                .await?;
1211            stats.old_docs_found = old_stats.0;
1212            stats.old_docs_removed = old_stats.1;
1213            stats.affected_namespaces = old_stats.2;
1214        }
1215
1216        Ok(stats)
1217    }
1218
1219    #[deprecated(note = "use garbage_collect")]
1220    pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
1221        self.garbage_collect(config).await
1222    }
1223
1224    /// Find orphan embeddings - documents with parent_id pointing to non-existent documents
1225    async fn find_and_remove_orphans(
1226        &self,
1227        docs: &[ChromaDocument],
1228        dry_run: bool,
1229    ) -> Result<(usize, usize)> {
1230        // Build a set of all document IDs
1231        let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
1232
1233        // Find documents with parent_id that doesn't exist in the ID set
1234        let mut orphans: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
1235        for doc in docs {
1236            if let Some(ref parent_id) = doc.parent_id
1237                && !all_ids.contains(parent_id.as_str())
1238            {
1239                orphans.push((&doc.namespace, &doc.id));
1240            }
1241        }
1242
1243        let found = orphans.len();
1244        let mut removed = 0;
1245
1246        if !dry_run && !orphans.is_empty() {
1247            for (namespace, id) in &orphans {
1248                if self.delete_document(namespace, id).await.is_ok() {
1249                    removed += 1;
1250                }
1251            }
1252        }
1253
1254        Ok((found, removed))
1255    }
1256
1257    /// Find empty namespaces - this checks if namespaces have 0 documents
1258    /// Note: In LanceDB, namespaces are implicit (just a column value), so "removing"
1259    /// an empty namespace means there are no documents to delete
1260    async fn find_and_remove_empty_namespaces(
1261        &self,
1262        by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
1263        _dry_run: bool,
1264    ) -> Result<(usize, usize, Vec<String>)> {
1265        // Find namespaces with 0 documents
1266        let empty_namespaces: Vec<String> = by_namespace
1267            .iter()
1268            .filter(|(_, docs)| docs.is_empty())
1269            .map(|(ns, _)| ns.clone())
1270            .collect();
1271
1272        let found = empty_namespaces.len();
1273        // Empty namespaces don't need deletion - they have no documents
1274        // Just report them
1275        let removed = 0;
1276
1277        Ok((found, removed, empty_namespaces))
1278    }
1279
1280    /// Find and optionally remove documents older than the specified duration
1281    async fn find_and_remove_old_docs(
1282        &self,
1283        docs: &[ChromaDocument],
1284        older_than: &chrono::Duration,
1285        dry_run: bool,
1286    ) -> Result<(usize, usize, Vec<String>)> {
1287        let cutoff = chrono::Utc::now() - *older_than;
1288
1289        let mut old_docs: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
1290        let mut affected_namespaces: std::collections::HashSet<String> =
1291            std::collections::HashSet::new();
1292
1293        for doc in docs {
1294            // Check for timestamp in metadata
1295            if let Some(obj) = doc.metadata.as_object() {
1296                let mut doc_timestamp: Option<String> = None;
1297
1298                // Look for common timestamp field names
1299                for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
1300                    if let Some(value) = obj.get(*key)
1301                        && let Some(ts) = value.as_str()
1302                    {
1303                        doc_timestamp = Some(ts.to_string());
1304                        break;
1305                    }
1306                }
1307
1308                // Check if document is older than cutoff
1309                if let Some(ts) = doc_timestamp {
1310                    // Parse the timestamp - try RFC3339 first, then other formats
1311                    let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
1312                        parsed < cutoff
1313                    } else if let Ok(parsed) =
1314                        chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
1315                    {
1316                        parsed < cutoff.naive_utc()
1317                    } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
1318                        parsed < cutoff.date_naive()
1319                    } else {
1320                        // Can't parse timestamp, skip this document
1321                        false
1322                    };
1323
1324                    if is_old {
1325                        old_docs.push((&doc.namespace, &doc.id));
1326                        affected_namespaces.insert(doc.namespace.clone());
1327                    }
1328                }
1329            }
1330        }
1331
1332        let found = old_docs.len();
1333        let mut removed = 0;
1334
1335        if !dry_run && !old_docs.is_empty() {
1336            for (namespace, id) in &old_docs {
1337                if self.delete_document(namespace, id).await.is_ok() {
1338                    removed += 1;
1339                }
1340            }
1341        }
1342
1343        Ok((found, removed, affected_namespaces.into_iter().collect()))
1344    }
1345
1346    /// List all unique namespaces in the database
1347    pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
1348        let all_docs = self.all_documents(None, 1_000_000).await?;
1349
1350        let mut namespace_counts: std::collections::HashMap<String, usize> =
1351            std::collections::HashMap::new();
1352        for doc in &all_docs {
1353            *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
1354        }
1355
1356        let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
1357        namespaces.sort_by(|a, b| a.0.cmp(&b.0));
1358        Ok(namespaces)
1359    }
1360}