Skip to main content

rust_memex/storage/
mod.rs

1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4    Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
5    UInt8Array,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{NewColumnTransform, OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16use std::collections::HashMap;
17use std::fmt;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tracing::{debug, error, info, warn};
23use uuid::Uuid;
24
25use crate::rag::SliceLayer;
26
27/// Schema version for LanceDB tables. Increment when changing table structure.
28/// Version 2: Added onion slice fields (layer, parent_id, children_ids, keywords)
29/// Version 3: Added content_hash for exact-match deduplication
30/// Version 4: Split `content_hash` into per-chunk SHA256 (this column) and a
31///            new `source_hash` column for the SHA256 of the source document
32///            text. This enables both layer-aware chunk dedup and pre-index
33///            source-level dedup. Pre-v4 rows store source-text hash in
34///            `content_hash`; backfill via `/admin/backfill-hashes` corrects
35///            this without breaking old data.
36/// See docs/MIGRATION.md for migration procedures.
37pub const SCHEMA_VERSION: u32 = 4;
38pub const DEFAULT_TABLE_NAME: &str = "mcp_documents";
39const NAMESPACE_TABLE_PREFIX: &str = "mcp_documents__ns__";
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaVersion {
43    V3,
44    V4,
45}
46
47impl SchemaVersion {
48    pub fn current() -> Self {
49        Self::V4
50    }
51}
52
53impl fmt::Display for SchemaVersion {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        match self {
56            Self::V3 => f.write_str("v3"),
57            Self::V4 => f.write_str("v4"),
58        }
59    }
60}
61
62impl FromStr for SchemaVersion {
63    type Err = anyhow::Error;
64
65    fn from_str(value: &str) -> Result<Self> {
66        match value.trim().to_ascii_lowercase().as_str() {
67            "3" | "v3" => Ok(Self::V3),
68            "4" | "v4" => Ok(Self::V4),
69            other => Err(anyhow!(
70                "unsupported schema target '{other}' (expected v3 or v4)"
71            )),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaMigrationReport {
78    pub target: SchemaVersion,
79    pub missing_columns: Vec<Field>,
80    pub applied: bool,
81}
82
83impl SchemaMigrationReport {
84    pub fn missing_column_names(&self) -> Vec<&str> {
85        self.missing_columns
86            .iter()
87            .map(|field| field.name().as_str())
88            .collect()
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct SchemaStatusReport {
94    pub schema_version: SchemaVersion,
95    pub expected_schema: SchemaVersion,
96    pub needs_migration: bool,
97    pub missing_columns: Vec<String>,
98    pub manifest_version: Option<u64>,
99}
100
101pub fn required_columns_for(version: SchemaVersion) -> Vec<Field> {
102    let mut fields = vec![Field::new("content_hash", DataType::Utf8, true)];
103    if matches!(version, SchemaVersion::V4) {
104        fields.push(Field::new("source_hash", DataType::Utf8, true));
105    }
106    fields
107}
108
109#[derive(Debug, Clone)]
110pub struct SchemaMismatchWriteError {
111    table_name: String,
112    db_path: String,
113    missing_columns: Vec<String>,
114    message: String,
115}
116
117impl SchemaMismatchWriteError {
118    fn new(
119        table_name: impl Into<String>,
120        db_path: impl Into<String>,
121        missing_columns: Vec<String>,
122        message: impl Into<String>,
123    ) -> Self {
124        Self {
125            table_name: table_name.into(),
126            db_path: db_path.into(),
127            missing_columns,
128            message: message.into(),
129        }
130    }
131
132    pub fn missing_columns(&self) -> &[String] {
133        &self.missing_columns
134    }
135
136    pub fn db_path(&self) -> &str {
137        &self.db_path
138    }
139
140    pub fn table_name(&self) -> &str {
141        &self.table_name
142    }
143
144    pub fn remediation(&self) -> String {
145        format!("rust-memex migrate-schema --db-path {}", self.db_path)
146    }
147}
148
149impl fmt::Display for SchemaMismatchWriteError {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        write!(
152            f,
153            "ERROR schema mismatch while writing Lance table '{}': missing columns {:?}. {}. Remediation: {}",
154            self.table_name,
155            self.missing_columns,
156            self.message,
157            self.remediation()
158        )
159    }
160}
161
162impl std::error::Error for SchemaMismatchWriteError {}
163
164fn is_schema_mismatch_message(message: &str) -> bool {
165    let lower = message.to_ascii_lowercase();
166    lower.contains("schema mismatch")
167        || lower.contains("append with different schema")
168        || lower.contains("fields did not match")
169        || lower.contains("missing=[")
170}
171
172fn extract_missing_columns(message: &str) -> Vec<String> {
173    let mut columns = Vec::new();
174    let mut rest = message;
175
176    while let Some(start) = rest.find("missing=[") {
177        rest = &rest[start + "missing=[".len()..];
178        let Some(end) = rest.find(']') else {
179            break;
180        };
181        let list = &rest[..end];
182        for item in list.split(',') {
183            let column = item
184                .trim()
185                .trim_matches('`')
186                .trim_matches('"')
187                .trim_matches('\'');
188            if !column.is_empty() && !columns.iter().any(|existing| existing == column) {
189                columns.push(column.to_string());
190            }
191        }
192        rest = &rest[end + 1..];
193    }
194
195    if columns.is_empty() {
196        for field in required_columns_for(SchemaVersion::current()) {
197            let name = field.name();
198            if message.contains(name) {
199                columns.push(name.to_string());
200            }
201        }
202    }
203
204    columns
205}
206
207// =============================================================================
208// STORAGE BACKEND INTERFACE
209// =============================================================================
210//
211// To add a new storage backend, implement a struct with the following methods:
212//
213//   async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()>
214//   async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>>
215//   async fn search(&self, namespace: Option<&str>, embedding: &[f32], k: usize) -> Result<Vec<ChromaDocument>>
216//   async fn delete(&self, namespace: &str, id: &str) -> Result<usize>
217//   async fn delete_namespace(&self, namespace: &str) -> Result<usize>
218//
219// Current implementation:
220//   - `StorageManager`: LanceDB embedded vector store
221//
222// Future alternatives to consider:
223//   - Qdrant, Milvus, Pinecone (external vector DBs)
224//   - SQLite with vector extension
225// =============================================================================
226
227#[derive(Debug, Serialize, Clone)]
228pub struct ChromaDocument {
229    pub id: String,
230    pub namespace: String,
231    pub embedding: Vec<f32>,
232    pub metadata: serde_json::Value,
233    pub document: String,
234    /// Onion slice layer (1=Outer, 2=Middle, 3=Inner, 4=Core, 0=legacy flat)
235    pub layer: u8,
236    /// Parent slice ID in the onion hierarchy (None for Core slices)
237    pub parent_id: Option<String>,
238    /// Children slice IDs in the onion hierarchy
239    pub children_ids: Vec<String>,
240    /// Extracted keywords for this slice
241    pub keywords: Vec<String>,
242    /// SHA256 hash of THIS chunk's text. Used for chunk-level deduplication.
243    /// Pre-v4 rows may transitionally store the source-text hash here; the
244    /// `/admin/backfill-hashes` endpoint corrects them.
245    pub content_hash: Option<String>,
246    /// SHA256 hash of the SOURCE document text (same value across all four
247    /// onion layers from one source). Used for pre-index dedup so we never
248    /// re-embed an already-ingested file. Optional for backward compatibility
249    /// with v3 schemas — `None` means "unknown source provenance".
250    pub source_hash: Option<String>,
251}
252
253impl ChromaDocument {
254    /// Create a new document with default (legacy) slice values
255    pub fn new_flat(
256        id: String,
257        namespace: String,
258        embedding: Vec<f32>,
259        metadata: serde_json::Value,
260        document: String,
261    ) -> Self {
262        Self {
263            id,
264            namespace,
265            embedding,
266            metadata,
267            document,
268            layer: 0, // Legacy flat mode
269            parent_id: None,
270            children_ids: vec![],
271            keywords: vec![],
272            content_hash: None,
273            source_hash: None,
274        }
275    }
276
277    /// Create a new document with content hash for deduplication.
278    /// Source hash is left empty — prefer `new_flat_with_hashes` so callers
279    /// supply the source-document hash explicitly when they have it.
280    pub fn new_flat_with_hash(
281        id: String,
282        namespace: String,
283        embedding: Vec<f32>,
284        metadata: serde_json::Value,
285        document: String,
286        content_hash: String,
287    ) -> Self {
288        Self::new_flat_with_hashes(
289            id,
290            namespace,
291            embedding,
292            metadata,
293            document,
294            content_hash,
295            None,
296        )
297    }
298
299    /// Create a flat (legacy) document with both per-chunk and source hashes.
300    pub fn new_flat_with_hashes(
301        id: String,
302        namespace: String,
303        embedding: Vec<f32>,
304        metadata: serde_json::Value,
305        document: String,
306        content_hash: String,
307        source_hash: Option<String>,
308    ) -> Self {
309        Self {
310            id,
311            namespace,
312            embedding,
313            metadata,
314            document,
315            layer: 0,
316            parent_id: None,
317            children_ids: vec![],
318            keywords: vec![],
319            content_hash: Some(content_hash),
320            source_hash,
321        }
322    }
323
324    /// Create a document from an onion slice
325    pub fn from_onion_slice(
326        slice: &crate::rag::OnionSlice,
327        namespace: String,
328        embedding: Vec<f32>,
329        metadata: serde_json::Value,
330    ) -> Self {
331        Self {
332            id: slice.id.clone(),
333            namespace,
334            embedding,
335            metadata,
336            document: slice.content.clone(),
337            layer: slice.layer.as_u8(),
338            parent_id: slice.parent_id.clone(),
339            children_ids: slice.children_ids.clone(),
340            keywords: slice.keywords.clone(),
341            content_hash: None,
342            source_hash: None,
343        }
344    }
345
346    /// Create a document from an onion slice with content hash for deduplication.
347    /// Source hash is left empty — prefer `from_onion_slice_with_hashes` to
348    /// preserve source provenance for pre-index dedup.
349    pub fn from_onion_slice_with_hash(
350        slice: &crate::rag::OnionSlice,
351        namespace: String,
352        embedding: Vec<f32>,
353        metadata: serde_json::Value,
354        content_hash: String,
355    ) -> Self {
356        Self::from_onion_slice_with_hashes(
357            slice,
358            namespace,
359            embedding,
360            metadata,
361            content_hash,
362            None,
363        )
364    }
365
366    /// Create an onion-slice document with both per-chunk and source hashes.
367    pub fn from_onion_slice_with_hashes(
368        slice: &crate::rag::OnionSlice,
369        namespace: String,
370        embedding: Vec<f32>,
371        metadata: serde_json::Value,
372        content_hash: String,
373        source_hash: Option<String>,
374    ) -> Self {
375        Self {
376            id: slice.id.clone(),
377            namespace,
378            embedding,
379            metadata,
380            document: slice.content.clone(),
381            layer: slice.layer.as_u8(),
382            parent_id: slice.parent_id.clone(),
383            children_ids: slice.children_ids.clone(),
384            keywords: slice.keywords.clone(),
385            content_hash: Some(content_hash),
386            source_hash,
387        }
388    }
389
390    /// Check if this is a legacy flat chunk (not an onion slice)
391    pub fn is_flat(&self) -> bool {
392        self.layer == 0
393    }
394
395    /// Get the slice layer if this is an onion slice
396    pub fn slice_layer(&self) -> Option<SliceLayer> {
397        SliceLayer::from_u8(self.layer)
398    }
399}
400
401pub struct StorageManager {
402    lance: Connection,
403    table: Arc<Mutex<Option<Table>>>,
404    namespace_tables: Arc<Mutex<HashMap<String, Table>>>,
405    collection_name: String,
406    lance_path: String,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
410#[serde(rename_all = "snake_case")]
411pub enum CrossStoreRecoveryStatus {
412    #[default]
413    Pending,
414    RolledBack,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossStoreRecoveryDocumentRef {
419    pub namespace: String,
420    pub id: String,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct CrossStoreRecoveryBatch {
425    pub batch_id: String,
426    pub created_at: String,
427    #[serde(default)]
428    pub status: CrossStoreRecoveryStatus,
429    #[serde(default)]
430    pub last_error: Option<String>,
431    pub documents: Vec<CrossStoreRecoveryDocumentRef>,
432}
433
434impl CrossStoreRecoveryBatch {
435    pub fn from_documents(documents: &[ChromaDocument]) -> Self {
436        Self {
437            batch_id: Uuid::new_v4().to_string(),
438            created_at: chrono::Utc::now().to_rfc3339(),
439            status: CrossStoreRecoveryStatus::Pending,
440            last_error: None,
441            documents: documents
442                .iter()
443                .map(|document| CrossStoreRecoveryDocumentRef {
444                    namespace: document.namespace.clone(),
445                    id: document.id.clone(),
446                })
447                .collect(),
448        }
449    }
450}
451
452type BatchIter =
453    RecordBatchIterator<std::vec::IntoIter<std::result::Result<RecordBatch, ArrowError>>>;
454
455impl StorageManager {
456    pub async fn new(db_path: &str) -> Result<Self> {
457        // Embedded LanceDB path (expand ~, allow override via env)
458        let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
459        let lance_path = if lance_env.trim().is_empty() {
460            shellexpand::tilde("~/.rmcp-servers/rust-memex/lancedb").to_string()
461        } else {
462            shellexpand::tilde(&lance_env).to_string()
463        };
464
465        let lance = connect(&lance_path).execute().await?;
466
467        Ok(Self {
468            lance,
469            table: Arc::new(Mutex::new(None)),
470            namespace_tables: Arc::new(Mutex::new(HashMap::new())),
471            collection_name: DEFAULT_TABLE_NAME.to_string(),
472            lance_path,
473        })
474    }
475
476    /// Create a storage manager for CLI tools.
477    /// Use this for CLI tools that only need vector operations (index/search).
478    pub async fn new_lance_only(db_path: &str) -> Result<Self> {
479        let lance_path = shellexpand::tilde(db_path).to_string();
480        let lance = connect(&lance_path).execute().await?;
481
482        Ok(Self {
483            lance,
484            table: Arc::new(Mutex::new(None)),
485            namespace_tables: Arc::new(Mutex::new(HashMap::new())),
486            collection_name: DEFAULT_TABLE_NAME.to_string(),
487            lance_path,
488        })
489    }
490
491    pub fn lance_path(&self) -> &str {
492        &self.lance_path
493    }
494
495    pub async fn require_current_schema_for_writes(&self) -> Result<()> {
496        let Some(table) = self.open_table_if_exists().await? else {
497            return Ok(());
498        };
499        self.ensure_hash_schema_columns(&table).await
500    }
501
502    pub async fn schema_status(
503        &self,
504        expected_schema: SchemaVersion,
505    ) -> Result<SchemaStatusReport> {
506        let Some(table) = self.open_table_if_exists().await? else {
507            return Ok(SchemaStatusReport {
508                schema_version: expected_schema,
509                expected_schema,
510                needs_migration: false,
511                missing_columns: Vec::new(),
512                manifest_version: None,
513            });
514        };
515
516        let missing_columns = Self::missing_required_columns(&table, expected_schema)
517            .await?
518            .into_iter()
519            .map(|field| field.name().to_string())
520            .collect::<Vec<_>>();
521        let manifest_version = table
522            .list_versions()
523            .await
524            .ok()
525            .and_then(|versions| versions.iter().map(|version| version.version).max());
526
527        Ok(SchemaStatusReport {
528            schema_version: if missing_columns.is_empty() {
529                expected_schema
530            } else {
531                SchemaVersion::V3
532            },
533            expected_schema,
534            needs_migration: !missing_columns.is_empty(),
535            missing_columns,
536            manifest_version,
537        })
538    }
539
540    pub async fn missing_required_columns(
541        table: &Table,
542        target: SchemaVersion,
543    ) -> Result<Vec<Field>> {
544        let schema = table.schema().await?;
545        Ok(required_columns_for(target)
546            .into_iter()
547            .filter(|field| schema.field_with_name(field.name()).is_err())
548            .collect())
549    }
550
551    pub async fn migrate_lance_schema(
552        db_path: &str,
553        target: SchemaVersion,
554        check_only: bool,
555    ) -> Result<SchemaMigrationReport> {
556        let lance_path = shellexpand::tilde(db_path).to_string();
557        let lance = connect(&lance_path).execute().await?;
558        let table = lance.open_table(DEFAULT_TABLE_NAME).execute().await?;
559        let missing = Self::missing_required_columns(&table, target).await?;
560
561        if missing.is_empty() || check_only {
562            return Ok(SchemaMigrationReport {
563                target,
564                missing_columns: missing,
565                applied: false,
566            });
567        }
568
569        let transform = NewColumnTransform::AllNulls(Arc::new(Schema::new(missing.clone())));
570        if let Err(error) = table.add_columns(transform, None).await {
571            let _ = table.checkout_latest().await;
572            let remaining = Self::missing_required_columns(&table, target).await?;
573            if remaining.is_empty() {
574                warn!(
575                    "Lance table '{}' schema migration raced with another writer and is already complete",
576                    DEFAULT_TABLE_NAME
577                );
578                return Ok(SchemaMigrationReport {
579                    target,
580                    missing_columns: missing,
581                    applied: true,
582                });
583            }
584            return Err(anyhow!(
585                "failed to migrate Lance table '{}' schema to {target}: {error}",
586                DEFAULT_TABLE_NAME
587            ));
588        }
589
590        let _ = table.checkout_latest().await;
591        Ok(SchemaMigrationReport {
592            target,
593            missing_columns: missing,
594            applied: true,
595        })
596    }
597
598    pub fn cross_store_recovery_dir(&self) -> PathBuf {
599        let db_path = Path::new(&self.lance_path);
600        let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
601        let stem = db_path
602            .file_name()
603            .and_then(|name| name.to_str())
604            .unwrap_or("lancedb");
605        parent.join(format!(".{stem}-cross-store-recovery"))
606    }
607
608    fn cross_store_recovery_batch_path(&self, batch_id: &str) -> PathBuf {
609        self.cross_store_recovery_dir()
610            .join(format!("{batch_id}.json"))
611    }
612
613    fn write_cross_store_recovery_batch(&self, batch: &CrossStoreRecoveryBatch) -> Result<PathBuf> {
614        let dir = self.cross_store_recovery_dir();
615        std::fs::create_dir_all(&dir)?;
616
617        let path = self.cross_store_recovery_batch_path(&batch.batch_id);
618        let tmp_path = path.with_extension("json.tmp");
619        let payload = serde_json::to_vec_pretty(batch)?;
620
621        std::fs::write(&tmp_path, payload)?;
622        std::fs::rename(&tmp_path, &path)?;
623
624        Ok(path)
625    }
626
627    pub fn persist_cross_store_recovery_batch(
628        &self,
629        batch: &CrossStoreRecoveryBatch,
630    ) -> Result<PathBuf> {
631        self.write_cross_store_recovery_batch(batch)
632    }
633
634    pub fn update_cross_store_recovery_batch(
635        &self,
636        batch: &CrossStoreRecoveryBatch,
637    ) -> Result<PathBuf> {
638        self.write_cross_store_recovery_batch(batch)
639    }
640
641    pub fn clear_cross_store_recovery_batch(&self, batch_id: &str) -> Result<()> {
642        let path = self.cross_store_recovery_batch_path(batch_id);
643        match std::fs::remove_file(path) {
644            Ok(()) => Ok(()),
645            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
646            Err(error) => Err(error.into()),
647        }
648    }
649
650    pub fn list_cross_store_recovery_batches(&self) -> Result<Vec<CrossStoreRecoveryBatch>> {
651        let dir = self.cross_store_recovery_dir();
652        if !dir.exists() {
653            return Ok(vec![]);
654        }
655
656        let mut batches = Vec::new();
657        for entry in std::fs::read_dir(dir)? {
658            let entry = entry?;
659            let path = entry.path();
660            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
661                continue;
662            }
663
664            let payload = std::fs::read(&path)?;
665            let batch: CrossStoreRecoveryBatch = serde_json::from_slice(&payload)?;
666            batches.push(batch);
667        }
668
669        batches.sort_by(|left, right| left.created_at.cmp(&right.created_at));
670        Ok(batches)
671    }
672
673    /// Refresh the table connection to see new data written by other processes.
674    /// This clears the cached table reference, forcing it to be re-opened on next query.
675    pub async fn refresh(&self) -> Result<()> {
676        let mut guard = self.table.lock().await;
677        *guard = None;
678        self.namespace_tables.lock().await.clear();
679        tracing::info!("LanceDB table cache cleared - will refresh on next query");
680        Ok(())
681    }
682
683    pub async fn ensure_collection(&self) -> Result<()> {
684        // Attempt to open; if missing, create empty table lazily on first add
685        let mut guard = self.table.lock().await;
686        if guard.is_some() {
687            return Ok(());
688        }
689        match self
690            .lance
691            .open_table(self.collection_name.as_str())
692            .execute()
693            .await
694        {
695            Ok(table) => {
696                *guard = Some(table);
697                info!("Found existing Lance table '{}'", self.collection_name);
698            }
699            Err(_) => {
700                info!(
701                    "Lance table '{}' will be created on first insert",
702                    self.collection_name
703                );
704            }
705        }
706        Ok(())
707    }
708
709    pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
710        if documents.is_empty() {
711            return Ok(());
712        }
713
714        // Pre-validation: check all documents before writing anything
715        let dim = documents
716            .first()
717            .ok_or_else(|| anyhow!("No documents to add"))?
718            .embedding
719            .len();
720        if dim == 0 {
721            return Err(anyhow!("Embedding dimension is zero"));
722        }
723
724        // Validate ALL documents have consistent dimensions and required fields
725        for (i, doc) in documents.iter().enumerate() {
726            if doc.embedding.len() != dim {
727                return Err(anyhow!(
728                    "Document {} has inconsistent embedding dimension: expected {}, got {}. \
729                     Aborting batch to prevent database corruption.",
730                    i,
731                    dim,
732                    doc.embedding.len()
733                ));
734            }
735            if doc.id.is_empty() {
736                return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
737            }
738            if doc.namespace.is_empty() {
739                return Err(anyhow!(
740                    "Document {} has empty namespace. Aborting batch.",
741                    i
742                ));
743            }
744            // Check for NaN/Inf in embeddings
745            for (j, &val) in doc.embedding.iter().enumerate() {
746                if val.is_nan() || val.is_infinite() {
747                    return Err(anyhow!(
748                        "Document {} has invalid embedding value at index {}: {}. \
749                         Aborting batch to prevent database corruption.",
750                        i,
751                        j,
752                        val
753                    ));
754                }
755            }
756        }
757
758        let mut by_namespace: HashMap<String, Vec<ChromaDocument>> = HashMap::new();
759        for document in documents {
760            by_namespace
761                .entry(document.namespace.clone())
762                .or_default()
763                .push(document);
764        }
765
766        let mut inserted = 0usize;
767        for (namespace, docs) in by_namespace {
768            let table = self.ensure_namespace_table(&namespace, dim).await?;
769            self.ensure_hash_schema_columns(&table).await?;
770            let batch = self.docs_to_batch(&docs, dim)?;
771            if let Err(error) = table.add(batch).execute().await {
772                return Err(self.map_lancedb_write_error(error));
773            }
774            inserted += docs.len();
775        }
776        debug!("Inserted {} documents into Lance (validated)", inserted);
777        Ok(())
778    }
779
780    pub async fn search_store(
781        &self,
782        namespace: Option<&str>,
783        embedding: Vec<f32>,
784        k: usize,
785    ) -> Result<Vec<ChromaDocument>> {
786        if embedding.is_empty() {
787            return Ok(vec![]);
788        }
789        let mut results = Vec::new();
790        if let Some(ns) = namespace {
791            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
792                let mut stream = table
793                    .query()
794                    .nearest_to(embedding.clone())?
795                    .limit(k)
796                    .execute()
797                    .await?;
798                while let Some(batch) = stream.try_next().await? {
799                    let mut docs = self.batch_to_docs(&batch)?;
800                    results.append(&mut docs);
801                }
802            }
803            if let Some(table) = self.legacy_table_if_exists().await? {
804                let mut stream = table
805                    .query()
806                    .only_if(self.namespace_filter(ns).as_str())
807                    .nearest_to(embedding)?
808                    .limit(k)
809                    .execute()
810                    .await?;
811                while let Some(batch) = stream.try_next().await? {
812                    let mut docs = self.batch_to_docs(&batch)?;
813                    results.append(&mut docs);
814                }
815            }
816            results.truncate(k);
817        } else {
818            for table_name in self.data_table_names().await? {
819                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
820                    continue;
821                };
822                let mut stream = table
823                    .query()
824                    .nearest_to(embedding.clone())?
825                    .limit(k)
826                    .execute()
827                    .await?;
828                while let Some(batch) = stream.try_next().await? {
829                    let mut docs = self.batch_to_docs(&batch)?;
830                    results.append(&mut docs);
831                }
832            }
833            results.truncate(k);
834        }
835        debug!("Lance returned {} results", results.len());
836        Ok(results)
837    }
838
839    /// Return a single page of documents without running a vector search.
840    ///
841    /// Used by admin/reporting paths that need deterministic limit/offset
842    /// behavior without assuming any embedding dimension or creating a table on
843    /// read.
844    pub async fn all_documents_page(
845        &self,
846        namespace: Option<&str>,
847        offset: usize,
848        limit: usize,
849    ) -> Result<Vec<ChromaDocument>> {
850        let mut results = Vec::new();
851        if let Some(ns) = namespace {
852            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
853                results.append(
854                    &mut self
855                        .query_table_page(&table, None, 0, offset + limit)
856                        .await?,
857                );
858            }
859            if let Some(table) = self.legacy_table_if_exists().await? {
860                results.append(
861                    &mut self
862                        .query_table_page(
863                            &table,
864                            Some(self.namespace_filter(ns)),
865                            0,
866                            offset + limit,
867                        )
868                        .await?,
869                );
870            }
871        } else {
872            for table_name in self.data_table_names().await? {
873                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
874                    continue;
875                };
876                results.append(
877                    &mut self
878                        .query_table_page(&table, None, 0, offset + limit)
879                        .await?,
880                );
881            }
882        }
883
884        Ok(results.into_iter().skip(offset).take(limit).collect())
885    }
886
887    /// Return documents without running a vector search.
888    /// Used by admin/reporting paths that need a bounded full-table scan
889    /// starting from the first row.
890    pub async fn all_documents(
891        &self,
892        namespace: Option<&str>,
893        limit: usize,
894    ) -> Result<Vec<ChromaDocument>> {
895        self.all_documents_page(namespace, 0, limit).await
896    }
897
898    pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
899        let id_filter = self.id_filter(id);
900        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
901            let mut stream = table
902                .query()
903                .only_if(id_filter.as_str())
904                .limit(1)
905                .execute()
906                .await?;
907            if let Some(batch) = stream.try_next().await? {
908                let mut docs = self.batch_to_docs(&batch)?;
909                if let Some(doc) = docs.pop() {
910                    return Ok(Some(doc));
911                }
912            }
913        }
914
915        if let Some(table) = self.legacy_table_if_exists().await? {
916            let filter = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
917            let mut stream = table
918                .query()
919                .only_if(filter.as_str())
920                .limit(1)
921                .execute()
922                .await?;
923            if let Some(batch) = stream.try_next().await? {
924                let mut docs = self.batch_to_docs(&batch)?;
925                if let Some(doc) = docs.pop() {
926                    return Ok(Some(doc));
927                }
928            }
929        }
930        Ok(None)
931    }
932
933    pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
934        let mut deleted = 0usize;
935        let id_filter = self.id_filter(id);
936
937        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
938            let pre_count = table.count_rows(Some(id_filter.clone())).await?;
939            if pre_count > 0 {
940                table.delete(id_filter.as_str()).await?;
941                deleted += pre_count;
942            }
943        }
944
945        if let Some(table) = self.legacy_table_if_exists().await? {
946            let predicate = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
947            let pre_count = table.count_rows(Some(predicate.clone())).await?;
948            if pre_count > 0 {
949                table.delete(predicate.as_str()).await?;
950                deleted += pre_count;
951            }
952        }
953        Ok(deleted)
954    }
955
956    /// Batch delete documents by IDs within a namespace.
957    ///
958    /// Issues a single `DELETE WHERE namespace = X AND id IN (...)` per chunk,
959    /// avoiding the per-document table scan that `delete_document` incurs when
960    /// called in a loop. Predicate is split into 500-id chunks to keep SQL
961    /// length bounded regardless of caller batch size.
962    pub async fn delete_documents(&self, namespace: &str, ids: &[&str]) -> Result<usize> {
963        if ids.is_empty() {
964            return Ok(0);
965        }
966        const CHUNK: usize = 500;
967        let mut total_deleted = 0usize;
968        for batch in ids.chunks(CHUNK) {
969            let id_list = batch
970                .iter()
971                .map(|id| format!("'{}'", id.replace('\'', "''")))
972                .collect::<Vec<_>>()
973                .join(", ");
974            let id_predicate = format!("id IN ({})", id_list);
975            if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
976                let pre_count = table.count_rows(Some(id_predicate.clone())).await?;
977                if pre_count > 0 {
978                    table.delete(id_predicate.as_str()).await?;
979                    total_deleted += pre_count;
980                }
981            }
982            if let Some(table) = self.legacy_table_if_exists().await? {
983                let predicate =
984                    format!("{} AND {}", self.namespace_filter(namespace), id_predicate);
985                let pre_count = table.count_rows(Some(predicate.clone())).await?;
986                if pre_count > 0 {
987                    table.delete(predicate.as_str()).await?;
988                    total_deleted += pre_count;
989                }
990            }
991        }
992        Ok(total_deleted)
993    }
994
995    pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
996        let mut deleted = 0usize;
997        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
998            let pre_count = table.count_rows(None).await?;
999            if pre_count > 0 {
1000                table
1001                    .delete(self.namespace_filter(namespace).as_str())
1002                    .await?;
1003                deleted += pre_count;
1004            }
1005        }
1006        if let Some(table) = self.legacy_table_if_exists().await? {
1007            let predicate = self.namespace_filter(namespace);
1008            let pre_count = table.count_rows(Some(predicate.clone())).await?;
1009            if pre_count > 0 {
1010                table.delete(predicate.as_str()).await?;
1011                deleted += pre_count;
1012            }
1013        }
1014        Ok(deleted)
1015    }
1016
1017    pub async fn rename_namespace_atomic(&self, from: &str, to: &str) -> Result<usize> {
1018        if from == to {
1019            return Ok(0);
1020        }
1021
1022        let source_count = self.count_namespace(from).await?;
1023        if source_count == 0 {
1024            return Ok(0);
1025        }
1026
1027        let target_count = self.count_namespace(to).await?;
1028        if target_count > 0 {
1029            return Err(anyhow!(
1030                "Target namespace '{}' already exists with {} rows",
1031                to,
1032                target_count
1033            ));
1034        }
1035
1036        let mut docs = self.all_documents(Some(from), source_count).await?;
1037        for doc in &mut docs {
1038            doc.namespace = to.to_string();
1039        }
1040        self.add_to_store(docs).await?;
1041        let deleted = self.delete_namespace_documents(from).await?;
1042
1043        Ok(deleted)
1044    }
1045
1046    pub fn get_collection_name(&self) -> &str {
1047        &self.collection_name
1048    }
1049
1050    fn namespace_table_name(namespace: &str) -> String {
1051        let mut safe = namespace
1052            .chars()
1053            .map(|ch| {
1054                if ch.is_ascii_alphanumeric() {
1055                    ch.to_ascii_lowercase()
1056                } else {
1057                    '_'
1058                }
1059            })
1060            .collect::<String>();
1061        while safe.contains("__") {
1062            safe = safe.replace("__", "_");
1063        }
1064        let safe = safe.trim_matches('_');
1065        let safe = if safe.is_empty() { "default" } else { safe };
1066        let safe = safe.chars().take(48).collect::<String>();
1067        let hash = Sha256::digest(namespace.as_bytes());
1068        let suffix = hash[..6]
1069            .iter()
1070            .map(|byte| format!("{byte:02x}"))
1071            .collect::<String>();
1072        format!("{NAMESPACE_TABLE_PREFIX}{safe}_{suffix}")
1073    }
1074
1075    fn is_namespace_table_name(table_name: &str) -> bool {
1076        table_name.starts_with(NAMESPACE_TABLE_PREFIX)
1077    }
1078
1079    async fn data_table_names(&self) -> Result<Vec<String>> {
1080        let table_names = self.lance.table_names().execute().await?;
1081        Ok(table_names
1082            .into_iter()
1083            .filter(|name| name == DEFAULT_TABLE_NAME || Self::is_namespace_table_name(name))
1084            .collect())
1085    }
1086
1087    async fn open_named_table_if_exists(&self, table_name: &str) -> Result<Option<Table>> {
1088        match self.lance.open_table(table_name).execute().await {
1089            Ok(table) => Ok(Some(table)),
1090            Err(e) => {
1091                let msg = e.to_string().to_lowercase();
1092                if msg.contains("not found")
1093                    || msg.contains("does not exist")
1094                    || msg.contains("no such file")
1095                {
1096                    Ok(None)
1097                } else {
1098                    Err(anyhow!("LanceDB error on table '{}': {}", table_name, e))
1099                }
1100            }
1101        }
1102    }
1103
1104    async fn open_namespace_table_if_exists(&self, namespace: &str) -> Result<Option<Table>> {
1105        let table_name = Self::namespace_table_name(namespace);
1106        if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1107            return Ok(Some(table));
1108        }
1109        let table = self.open_named_table_if_exists(&table_name).await?;
1110        if let Some(table) = &table {
1111            self.namespace_tables
1112                .lock()
1113                .await
1114                .insert(table_name, table.clone());
1115        }
1116        Ok(table)
1117    }
1118
1119    async fn ensure_namespace_table(&self, namespace: &str, dim: usize) -> Result<Table> {
1120        let table_name = Self::namespace_table_name(namespace);
1121        if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1122            return Ok(table);
1123        }
1124
1125        let table = match self.open_named_table_if_exists(&table_name).await? {
1126            Some(table) => table,
1127            None => {
1128                if dim == 0 {
1129                    return Err(anyhow!(
1130                        "Vector table '{}' not found and dimension is unknown",
1131                        table_name
1132                    ));
1133                }
1134                info!(
1135                    "Creating Lance namespace table '{}' for '{}' with vector dimension {} (schema v{})",
1136                    table_name, namespace, dim, SCHEMA_VERSION
1137                );
1138                let schema = Arc::new(Self::create_schema(dim));
1139                self.lance
1140                    .create_empty_table(table_name.as_str(), schema)
1141                    .execute()
1142                    .await?
1143            }
1144        };
1145
1146        self.namespace_tables
1147            .lock()
1148            .await
1149            .insert(table_name, table.clone());
1150        Ok(table)
1151    }
1152
1153    async fn query_table_page(
1154        &self,
1155        table: &Table,
1156        filter: Option<String>,
1157        offset: usize,
1158        limit: usize,
1159    ) -> Result<Vec<ChromaDocument>> {
1160        let mut query = table.query().limit(limit).offset(offset);
1161        if let Some(filter) = filter {
1162            query = query.only_if(filter.as_str());
1163        }
1164        let mut stream = query.execute().await?;
1165        let mut results = Vec::new();
1166        while let Some(batch) = stream.try_next().await? {
1167            let mut docs = self.batch_to_docs(&batch)?;
1168            results.append(&mut docs);
1169        }
1170        Ok(results)
1171    }
1172
1173    async fn legacy_table_if_exists(&self) -> Result<Option<Table>> {
1174        self.open_table_if_exists().await
1175    }
1176
1177    /// Try to open the table without creating it.
1178    /// Returns `Ok(None)` when the table genuinely does not exist.
1179    /// Propagates real errors (I/O, corruption, permission) as `Err`.
1180    async fn open_table_if_exists(&self) -> Result<Option<Table>> {
1181        let mut guard = self.table.lock().await;
1182        if let Some(table) = guard.as_ref() {
1183            return Ok(Some(table.clone()));
1184        }
1185
1186        match self
1187            .lance
1188            .open_table(self.collection_name.as_str())
1189            .execute()
1190            .await
1191        {
1192            Ok(tbl) => {
1193                *guard = Some(tbl.clone());
1194                Ok(Some(tbl))
1195            }
1196            Err(e) => {
1197                let msg = e.to_string().to_lowercase();
1198                if msg.contains("not found")
1199                    || msg.contains("does not exist")
1200                    || msg.contains("no such file")
1201                {
1202                    Ok(None)
1203                } else {
1204                    tracing::warn!(
1205                        "LanceDB error opening table '{}': {}",
1206                        self.collection_name,
1207                        e
1208                    );
1209                    Err(anyhow!(
1210                        "LanceDB error on table '{}': {}",
1211                        self.collection_name,
1212                        e
1213                    ))
1214                }
1215            }
1216        }
1217    }
1218
1219    /// Create the LanceDB schema with onion slice fields and content hash
1220    fn create_schema(dim: usize) -> Schema {
1221        Schema::new(vec![
1222            Field::new("id", DataType::Utf8, false),
1223            Field::new("namespace", DataType::Utf8, false),
1224            Field::new(
1225                "vector",
1226                DataType::FixedSizeList(
1227                    Arc::new(Field::new("item", DataType::Float32, true)),
1228                    dim as i32,
1229                ),
1230                false,
1231            ),
1232            Field::new("text", DataType::Utf8, true),
1233            Field::new("metadata", DataType::Utf8, true),
1234            // Onion slice fields (v2 schema)
1235            Field::new("layer", DataType::UInt8, true), // 0=flat, 1=outer, 2=middle, 3=inner, 4=core
1236            Field::new("parent_id", DataType::Utf8, true), // Parent slice ID
1237            Field::new("children_ids", DataType::Utf8, true), // JSON array of children IDs
1238            Field::new("keywords", DataType::Utf8, true), // JSON array of keywords
1239            // Per-chunk dedup hash (v3 schema; v4 narrows semantic to chunk text)
1240            Field::new("content_hash", DataType::Utf8, true), // SHA256 of THIS chunk's text
1241            // Source-document hash (v4 schema) — same value across all 4 layers
1242            // from one source. Drives pre-index dedup so we never re-embed an
1243            // already-ingested file.
1244            Field::new("source_hash", DataType::Utf8, true), // SHA256 of source document text
1245        ])
1246    }
1247
1248    async fn ensure_hash_schema_columns(&self, table: &Table) -> Result<()> {
1249        let missing = Self::missing_required_columns(table, SchemaVersion::current()).await?;
1250
1251        if missing.is_empty() {
1252            return Ok(());
1253        }
1254
1255        let missing_columns = missing
1256            .iter()
1257            .map(|field| field.name().to_string())
1258            .collect::<Vec<_>>();
1259        let error = SchemaMismatchWriteError::new(
1260            self.collection_name.clone(),
1261            self.lance_path.clone(),
1262            missing_columns,
1263            "table is older than the current writer schema",
1264        );
1265        self.log_schema_mismatch(&error);
1266        Err(error.into())
1267    }
1268
1269    fn map_lancedb_write_error(&self, error: lancedb::error::Error) -> anyhow::Error {
1270        let message = match &error {
1271            lancedb::error::Error::Lance { source } => source.to_string(),
1272            lancedb::error::Error::Schema { message } => message.clone(),
1273            lancedb::error::Error::Arrow { source } => source.to_string(),
1274            _ => return error.into(),
1275        };
1276
1277        if !is_schema_mismatch_message(&message) {
1278            return error.into();
1279        }
1280
1281        let missing_columns = extract_missing_columns(&message);
1282        let error = SchemaMismatchWriteError::new(
1283            self.collection_name.clone(),
1284            self.lance_path.clone(),
1285            missing_columns,
1286            message,
1287        );
1288        self.log_schema_mismatch(&error);
1289        error.into()
1290    }
1291
1292    fn log_schema_mismatch(&self, error: &SchemaMismatchWriteError) {
1293        error!(
1294            error_kind = "schema_mismatch",
1295            table = %error.table_name(),
1296            db_path = %error.db_path(),
1297            missing_columns = ?error.missing_columns(),
1298            remediation = %error.remediation(),
1299            file = file!(),
1300            line = line!(),
1301            "write-path schema mismatch"
1302        );
1303    }
1304
1305    fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
1306        let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
1307        let namespaces = documents
1308            .iter()
1309            .map(|d| d.namespace.as_str())
1310            .collect::<Vec<_>>();
1311        let texts = documents
1312            .iter()
1313            .map(|d| d.document.as_str())
1314            .collect::<Vec<_>>();
1315        let metadata_strings = documents
1316            .iter()
1317            .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
1318            .collect::<Vec<_>>();
1319
1320        let vectors = documents.iter().map(|d| {
1321            if d.embedding.len() != dim {
1322                None
1323            } else {
1324                Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
1325            }
1326        });
1327
1328        // Onion slice fields
1329        let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
1330        let parent_ids: Vec<Option<&str>> =
1331            documents.iter().map(|d| d.parent_id.as_deref()).collect();
1332        let children_ids_json: Vec<String> = documents
1333            .iter()
1334            .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
1335            .collect();
1336        let keywords_json: Vec<String> = documents
1337            .iter()
1338            .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
1339            .collect();
1340        // Per-chunk content hash for chunk-level dedup
1341        let content_hashes: Vec<Option<&str>> = documents
1342            .iter()
1343            .map(|d| d.content_hash.as_deref())
1344            .collect();
1345        // Source-document hash for pre-index source-level dedup (v4 schema)
1346        let source_hashes: Vec<Option<&str>> =
1347            documents.iter().map(|d| d.source_hash.as_deref()).collect();
1348
1349        let schema = Arc::new(Self::create_schema(dim));
1350
1351        let batch = RecordBatch::try_new(
1352            schema.clone(),
1353            vec![
1354                Arc::new(StringArray::from(ids)),
1355                Arc::new(StringArray::from(namespaces)),
1356                Arc::new(
1357                    FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
1358                        vectors, dim as i32,
1359                    ),
1360                ),
1361                Arc::new(StringArray::from(texts)),
1362                Arc::new(StringArray::from(metadata_strings)),
1363                // Onion slice fields
1364                Arc::new(UInt8Array::from(layers)),
1365                Arc::new(StringArray::from(parent_ids)),
1366                Arc::new(StringArray::from(
1367                    children_ids_json
1368                        .iter()
1369                        .map(|s| s.as_str())
1370                        .collect::<Vec<_>>(),
1371                )),
1372                Arc::new(StringArray::from(
1373                    keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
1374                )),
1375                // Per-chunk content hash for chunk-level dedup
1376                Arc::new(StringArray::from(content_hashes)),
1377                // Source-document hash for source-level dedup (v4 schema)
1378                Arc::new(StringArray::from(source_hashes)),
1379            ],
1380        )?;
1381
1382        Ok(RecordBatchIterator::new(
1383            vec![Ok(batch)].into_iter(),
1384            schema,
1385        ))
1386    }
1387
1388    fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
1389        let id_col = batch
1390            .column_by_name("id")
1391            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1392            .ok_or_else(|| anyhow!("Missing id column"))?;
1393        let ns_col = batch
1394            .column_by_name("namespace")
1395            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1396            .ok_or_else(|| anyhow!("Missing namespace column"))?;
1397        let text_col = batch
1398            .column_by_name("text")
1399            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1400            .ok_or_else(|| anyhow!("Missing text column"))?;
1401        let metadata_col = batch
1402            .column_by_name("metadata")
1403            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1404            .ok_or_else(|| anyhow!("Missing metadata column"))?;
1405        let vector_col = batch
1406            .column_by_name("vector")
1407            .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
1408            .ok_or_else(|| anyhow!("Missing vector column"))?;
1409
1410        // Onion slice fields (optional for backward compatibility with v1 schema)
1411        let layer_col = batch
1412            .column_by_name("layer")
1413            .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
1414        let parent_id_col = batch
1415            .column_by_name("parent_id")
1416            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1417        let children_ids_col = batch
1418            .column_by_name("children_ids")
1419            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1420        let keywords_col = batch
1421            .column_by_name("keywords")
1422            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1423        // Content hash field (optional for backward compatibility with v2 schema)
1424        let content_hash_col = batch
1425            .column_by_name("content_hash")
1426            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1427        // Source hash column (v4 schema) — optional for pre-v4 tables.
1428        let source_hash_col = batch
1429            .column_by_name("source_hash")
1430            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1431
1432        let dim = vector_col.value_length() as usize;
1433        let values = vector_col
1434            .values()
1435            .as_any()
1436            .downcast_ref::<Float32Array>()
1437            .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
1438
1439        let mut docs = Vec::new();
1440        for i in 0..batch.num_rows() {
1441            let id = id_col.value(i).to_string();
1442            let text = text_col.value(i).to_string();
1443            let namespace = ns_col.value(i).to_string();
1444            let meta_str = metadata_col.value(i);
1445            let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
1446
1447            let offset = i * dim;
1448            let mut emb = Vec::with_capacity(dim);
1449            for j in 0..dim {
1450                emb.push(values.value(offset + j));
1451            }
1452
1453            // Read onion slice fields (with v1 schema compatibility)
1454            let layer = layer_col
1455                .and_then(|col| {
1456                    if col.is_null(i) {
1457                        None
1458                    } else {
1459                        Some(col.value(i))
1460                    }
1461                })
1462                .unwrap_or(0);
1463
1464            let parent_id = parent_id_col.and_then(|col| {
1465                if col.is_null(i) {
1466                    None
1467                } else {
1468                    Some(col.value(i).to_string())
1469                }
1470            });
1471
1472            let children_ids: Vec<String> = children_ids_col
1473                .and_then(|col| {
1474                    if col.is_null(i) {
1475                        None
1476                    } else {
1477                        serde_json::from_str(col.value(i)).ok()
1478                    }
1479                })
1480                .unwrap_or_default();
1481
1482            let keywords: Vec<String> = keywords_col
1483                .and_then(|col| {
1484                    if col.is_null(i) {
1485                        None
1486                    } else {
1487                        serde_json::from_str(col.value(i)).ok()
1488                    }
1489                })
1490                .unwrap_or_default();
1491
1492            let content_hash = content_hash_col.and_then(|col| {
1493                if col.is_null(i) {
1494                    None
1495                } else {
1496                    Some(col.value(i).to_string())
1497                }
1498            });
1499
1500            let source_hash = source_hash_col.and_then(|col| {
1501                if col.is_null(i) {
1502                    None
1503                } else {
1504                    Some(col.value(i).to_string())
1505                }
1506            });
1507
1508            docs.push(ChromaDocument {
1509                id,
1510                namespace,
1511                embedding: emb,
1512                metadata,
1513                document: text,
1514                layer,
1515                parent_id,
1516                children_ids,
1517                keywords,
1518                content_hash,
1519                source_hash,
1520            });
1521        }
1522        Ok(docs)
1523    }
1524
1525    pub async fn get_filtered_in_namespace(
1526        &self,
1527        namespace: &str,
1528        filter: &str,
1529    ) -> Result<Vec<ChromaDocument>> {
1530        let mut results = Vec::new();
1531        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1532            let mut stream = table.query().only_if(filter).execute().await?;
1533            while let Some(batch) = stream.try_next().await? {
1534                let mut docs = self.batch_to_docs(&batch)?;
1535                results.append(&mut docs);
1536            }
1537        }
1538        if let Some(table) = self.legacy_table_if_exists().await? {
1539            let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
1540            let mut stream = table.query().only_if(combined.as_str()).execute().await?;
1541            while let Some(batch) = stream.try_next().await? {
1542                let mut docs = self.batch_to_docs(&batch)?;
1543                results.append(&mut docs);
1544            }
1545        }
1546        Ok(results)
1547    }
1548
1549    /// Search with optional layer filtering for onion slice architecture
1550    pub async fn search_store_with_layer(
1551        &self,
1552        namespace: Option<&str>,
1553        embedding: Vec<f32>,
1554        k: usize,
1555        layer_filter: Option<SliceLayer>,
1556    ) -> Result<Vec<ChromaDocument>> {
1557        if embedding.is_empty() {
1558            return Ok(vec![]);
1559        }
1560        // Build combined filter
1561        let mut filters = Vec::new();
1562        if let Some(layer) = layer_filter {
1563            filters.push(self.layer_filter(layer));
1564        }
1565        let mut results = Vec::new();
1566
1567        if let Some(ns) = namespace {
1568            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
1569                let mut query = table.query();
1570                if !filters.is_empty() {
1571                    let combined = filters.join(" AND ");
1572                    query = query.only_if(combined.as_str());
1573                }
1574                let mut stream = query
1575                    .nearest_to(embedding.clone())?
1576                    .limit(k)
1577                    .execute()
1578                    .await?;
1579                while let Some(batch) = stream.try_next().await? {
1580                    let mut docs = self.batch_to_docs(&batch)?;
1581                    results.append(&mut docs);
1582                }
1583            }
1584            if let Some(table) = self.legacy_table_if_exists().await? {
1585                let mut legacy_filters = vec![self.namespace_filter(ns)];
1586                legacy_filters.extend(filters.clone());
1587                let combined = legacy_filters.join(" AND ");
1588                let mut stream = table
1589                    .query()
1590                    .only_if(combined.as_str())
1591                    .nearest_to(embedding)?
1592                    .limit(k)
1593                    .execute()
1594                    .await?;
1595                while let Some(batch) = stream.try_next().await? {
1596                    let mut docs = self.batch_to_docs(&batch)?;
1597                    results.append(&mut docs);
1598                }
1599            }
1600            results.truncate(k);
1601        } else {
1602            for table_name in self.data_table_names().await? {
1603                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1604                    continue;
1605                };
1606                let mut query = table.query();
1607                if !filters.is_empty() {
1608                    let combined = filters.join(" AND ");
1609                    query = query.only_if(combined.as_str());
1610                }
1611                let mut stream = query
1612                    .nearest_to(embedding.clone())?
1613                    .limit(k)
1614                    .execute()
1615                    .await?;
1616                while let Some(batch) = stream.try_next().await? {
1617                    let mut docs = self.batch_to_docs(&batch)?;
1618                    results.append(&mut docs);
1619                }
1620            }
1621            results.truncate(k);
1622        }
1623        debug!(
1624            "Lance returned {} results (layer filter: {:?})",
1625            results.len(),
1626            layer_filter
1627        );
1628        Ok(results)
1629    }
1630
1631    /// Get a document by ID and expand to get its children
1632    pub async fn get_children(
1633        &self,
1634        namespace: &str,
1635        parent_id: &str,
1636    ) -> Result<Vec<ChromaDocument>> {
1637        if self.open_table_if_exists().await?.is_none() {
1638            return Ok(vec![]);
1639        }
1640
1641        // First get the parent document to find children IDs
1642        if let Some(parent) = self.get_document(namespace, parent_id).await? {
1643            if parent.children_ids.is_empty() {
1644                return Ok(vec![]);
1645            }
1646
1647            // Query for all children
1648            let mut children = Vec::new();
1649            for child_id in &parent.children_ids {
1650                if let Some(child) = self.get_document(namespace, child_id).await? {
1651                    children.push(child);
1652                }
1653            }
1654            return Ok(children);
1655        }
1656
1657        Ok(vec![])
1658    }
1659
1660    /// Get the parent of a document (drill up in onion hierarchy)
1661    pub async fn get_parent(
1662        &self,
1663        namespace: &str,
1664        child_id: &str,
1665    ) -> Result<Option<ChromaDocument>> {
1666        if let Some(child) = self.get_document(namespace, child_id).await?
1667            && let Some(ref parent_id) = child.parent_id
1668        {
1669            return self.get_document(namespace, parent_id).await;
1670        }
1671        Ok(None)
1672    }
1673
1674    fn namespace_filter(&self, namespace: &str) -> String {
1675        format!("namespace = '{}'", namespace.replace('\'', "''"))
1676    }
1677
1678    fn id_filter(&self, id: &str) -> String {
1679        format!("id = '{}'", id.replace('\'', "''"))
1680    }
1681
1682    fn layer_filter(&self, layer: SliceLayer) -> String {
1683        if layer == SliceLayer::Outer {
1684            // Default search should surface onion summaries while still seeing legacy flat chunks.
1685            "(layer = 0 OR layer = 1)".to_string()
1686        } else {
1687            format!("layer = {}", layer.as_u8())
1688        }
1689    }
1690
1691    fn content_hash_filter(&self, hash: &str) -> String {
1692        format!("content_hash = '{}'", hash.replace('\'', "''"))
1693    }
1694
1695    fn source_hash_filter(&self, hash: &str) -> String {
1696        format!("source_hash = '{}'", hash.replace('\'', "''"))
1697    }
1698
1699    /// Check if the table schema has content_hash column (schema v3+)
1700    async fn table_has_content_hash(table: &Table) -> bool {
1701        table
1702            .schema()
1703            .await
1704            .map(|schema| schema.field_with_name("content_hash").is_ok())
1705            .unwrap_or(false)
1706    }
1707
1708    /// Check if the table schema has source_hash column (schema v4+)
1709    async fn table_has_source_hash(table: &Table) -> bool {
1710        table
1711            .schema()
1712            .await
1713            .map(|schema| schema.field_with_name("source_hash").is_ok())
1714            .unwrap_or(false)
1715    }
1716
1717    /// Check if a content hash already exists in a namespace (for exact-match deduplication)
1718    ///
1719    /// Returns Ok(false) if:
1720    /// - Table doesn't exist yet
1721    /// - Table has old schema without content_hash column (graceful degradation)
1722    pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1723        let hash_filter = self.content_hash_filter(hash);
1724        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1725            && Self::table_has_content_hash(&table).await
1726        {
1727            let mut stream = table
1728                .query()
1729                .only_if(hash_filter.as_str())
1730                .limit(1)
1731                .execute()
1732                .await?;
1733            if let Some(batch) = stream.try_next().await? {
1734                return Ok(batch.num_rows() > 0);
1735            }
1736        }
1737
1738        if let Some(table) = self.legacy_table_if_exists().await? {
1739            if !Self::table_has_content_hash(&table).await {
1740                tracing::warn!(
1741                    "Table '{}' has old schema without content_hash column. \
1742                     Deduplication disabled. Consider re-indexing with new schema.",
1743                    self.collection_name
1744                );
1745                return Ok(false); // Can't check for duplicates, treat as new
1746            }
1747
1748            let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1749            let mut stream = table
1750                .query()
1751                .only_if(filter.as_str())
1752                .limit(1)
1753                .execute()
1754                .await?;
1755
1756            if let Some(batch) = stream.try_next().await? {
1757                return Ok(batch.num_rows() > 0);
1758            }
1759        }
1760
1761        Ok(false)
1762    }
1763
1764    /// Check if any chunk in `namespace` already references the given source-document
1765    /// hash. Used by the indexing pipeline to skip re-embedding files that were
1766    /// already ingested (P4 — pre-index source-level dedup).
1767    ///
1768    /// Returns Ok(false) if the table doesn't exist yet, or if the table is on a
1769    /// pre-v4 schema without the `source_hash` column (graceful degradation —
1770    /// older namespaces should be backfilled via `/admin/backfill-hashes`).
1771    pub async fn has_source_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1772        let hash_filter = self.source_hash_filter(hash);
1773        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1774            && Self::table_has_source_hash(&table).await
1775        {
1776            let mut stream = table
1777                .query()
1778                .only_if(hash_filter.as_str())
1779                .limit(1)
1780                .execute()
1781                .await?;
1782            if let Some(batch) = stream.try_next().await? {
1783                return Ok(batch.num_rows() > 0);
1784            }
1785        }
1786
1787        if let Some(table) = self.legacy_table_if_exists().await? {
1788            if !Self::table_has_source_hash(&table).await {
1789                tracing::debug!(
1790                    "Table '{}' has pre-v4 schema without source_hash column. \
1791                     Source-level dedup disabled until backfill.",
1792                    self.collection_name
1793                );
1794                return Ok(false);
1795            }
1796
1797            let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1798            let mut stream = table
1799                .query()
1800                .only_if(filter.as_str())
1801                .limit(1)
1802                .execute()
1803                .await?;
1804
1805            if let Some(batch) = stream.try_next().await? {
1806                return Ok(batch.num_rows() > 0);
1807            }
1808        }
1809
1810        Ok(false)
1811    }
1812
1813    /// Filter a list of hashes to return only those that don't exist in the namespace.
1814    /// This is more efficient than calling has_content_hash for each hash individually.
1815    ///
1816    /// Returns all hashes as "new" if table has old schema without content_hash column.
1817    pub async fn filter_existing_hashes<'a>(
1818        &self,
1819        namespace: &str,
1820        hashes: &'a [String],
1821    ) -> Result<Vec<&'a String>> {
1822        if hashes.is_empty() {
1823            return Ok(vec![]);
1824        }
1825
1826        // Query for existing hashes in this namespace
1827        // We build a filter with OR conditions for all hashes
1828        let hash_conditions: Vec<String> =
1829            hashes.iter().map(|h| self.content_hash_filter(h)).collect();
1830
1831        let mut existing_hashes = std::collections::HashSet::new();
1832
1833        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1834            && Self::table_has_content_hash(&table).await
1835        {
1836            let filter = hash_conditions.join(" OR ");
1837            let mut stream = table
1838                .query()
1839                .only_if(filter.as_str())
1840                .limit(hashes.len())
1841                .execute()
1842                .await?;
1843            while let Some(batch) = stream.try_next().await? {
1844                if let Some(hash_col) = batch
1845                    .column_by_name("content_hash")
1846                    .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1847                {
1848                    for i in 0..batch.num_rows() {
1849                        if !hash_col.is_null(i) {
1850                            existing_hashes.insert(hash_col.value(i).to_string());
1851                        }
1852                    }
1853                }
1854            }
1855        }
1856
1857        if let Some(table) = self.legacy_table_if_exists().await? {
1858            // Graceful handling of old schema without content_hash column
1859            if !Self::table_has_content_hash(&table).await {
1860                tracing::warn!(
1861                    "Table '{}' has old schema without content_hash column. \
1862                     Deduplication disabled. Consider re-indexing with new schema.",
1863                    self.collection_name
1864                );
1865                return Ok(hashes.iter().collect()); // All are "new" since we can't check
1866            }
1867
1868            let filter = format!(
1869                "{} AND ({})",
1870                self.namespace_filter(namespace),
1871                hash_conditions.join(" OR ")
1872            );
1873            let mut stream = table
1874                .query()
1875                .only_if(filter.as_str())
1876                .limit(hashes.len())
1877                .execute()
1878                .await?;
1879            while let Some(batch) = stream.try_next().await? {
1880                if let Some(hash_col) = batch
1881                    .column_by_name("content_hash")
1882                    .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1883                {
1884                    for i in 0..batch.num_rows() {
1885                        if !hash_col.is_null(i) {
1886                            existing_hashes.insert(hash_col.value(i).to_string());
1887                        }
1888                    }
1889                }
1890            }
1891        }
1892
1893        // Return only hashes that don't exist
1894        Ok(hashes
1895            .iter()
1896            .filter(|h| !existing_hashes.contains(h.as_str()))
1897            .collect())
1898    }
1899
1900    // =========================================================================
1901    // MAINTENANCE OPERATIONS
1902    // =========================================================================
1903
1904    /// Run all optimizations (compact + prune old versions)
1905    pub async fn optimize(&self) -> Result<OptimizeStats> {
1906        let mut stats = OptimizeStats {
1907            compaction: None,
1908            prune: None,
1909        };
1910        for table_name in self.data_table_names().await? {
1911            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1912                continue;
1913            };
1914            stats = table.optimize(OptimizeAction::All).await?;
1915        }
1916        info!(
1917            "Optimize complete: compaction={:?}, prune={:?}",
1918            stats.compaction, stats.prune
1919        );
1920        Ok(stats)
1921    }
1922
1923    /// Compact small files into larger ones for better performance
1924    pub async fn compact(&self) -> Result<OptimizeStats> {
1925        let mut stats = OptimizeStats {
1926            compaction: None,
1927            prune: None,
1928        };
1929        for table_name in self.data_table_names().await? {
1930            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1931                continue;
1932            };
1933            stats = table
1934                .optimize(OptimizeAction::Compact {
1935                    options: Default::default(),
1936                    remap_options: None,
1937                })
1938                .await?;
1939        }
1940        info!("Compaction complete: {:?}", stats.compaction);
1941        Ok(stats)
1942    }
1943
1944    /// Remove old versions older than specified duration (default: 7 days)
1945    pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
1946        let days = older_than_days.unwrap_or(7) as i64;
1947        let duration = chrono::TimeDelta::days(days);
1948        let mut stats = OptimizeStats {
1949            compaction: None,
1950            prune: None,
1951        };
1952        for table_name in self.data_table_names().await? {
1953            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1954                continue;
1955            };
1956            stats = table
1957                .optimize(OptimizeAction::Prune {
1958                    older_than: Some(duration),
1959                    delete_unverified: Some(false),
1960                    error_if_tagged_old_versions: None,
1961                })
1962                .await?;
1963        }
1964        info!("Cleanup complete: {:?}", stats.prune);
1965        Ok(stats)
1966    }
1967
1968    /// Get table statistics (row count, fragments, etc.)
1969    pub async fn stats(&self) -> Result<TableStats> {
1970        let table_names = self.data_table_names().await?;
1971        let mut row_count = 0usize;
1972        let mut version_count = 0usize;
1973
1974        for table_name in &table_names {
1975            let Some(table) = self.open_named_table_if_exists(table_name).await? else {
1976                continue;
1977            };
1978            row_count += table.count_rows(None).await.unwrap_or(0);
1979            version_count += table.list_versions().await.unwrap_or_default().len();
1980        }
1981
1982        Ok(TableStats {
1983            row_count,
1984            version_count,
1985            table_name: self.collection_name.clone(),
1986            db_path: self.lance_path.clone(),
1987        })
1988    }
1989
1990    /// Count rows in a specific namespace
1991    pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1992        let mut count = 0usize;
1993        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1994            count += table.count_rows(None).await?;
1995        }
1996        if let Some(table) = self.legacy_table_if_exists().await? {
1997            let filter = self.namespace_filter(namespace);
1998            count += table.count_rows(Some(filter)).await?;
1999        }
2000        Ok(count)
2001    }
2002
2003    /// Get all documents from a namespace (for migration/export)
2004    ///
2005    /// Note: This uses a full table scan with namespace filter.
2006    /// For very large namespaces, consider batching.
2007    pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
2008        let results = self.all_documents(Some(namespace), 100_000).await?;
2009        debug!(
2010            "Retrieved {} documents from namespace '{}'",
2011            results.len(),
2012            namespace
2013        );
2014        Ok(results)
2015    }
2016
2017    /// Check if a namespace exists (has any documents)
2018    pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
2019        let count = self.count_namespace(namespace).await?;
2020        Ok(count > 0)
2021    }
2022}
2023
2024/// Statistics about the LanceDB table
2025#[derive(Debug, Clone, Serialize)]
2026pub struct TableStats {
2027    pub row_count: usize,
2028    pub version_count: usize,
2029    pub table_name: String,
2030    pub db_path: String,
2031}
2032
2033// =============================================================================
2034// GARBAGE COLLECTION
2035// =============================================================================
2036
2037/// Statistics from garbage collection operations
2038#[derive(Debug, Clone, Default, Serialize)]
2039pub struct GcStats {
2040    /// Number of orphan embeddings found (embeddings without valid parent references)
2041    pub orphans_found: usize,
2042    /// Number of orphan embeddings removed
2043    pub orphans_removed: usize,
2044    /// Number of empty namespaces found
2045    pub empty_namespaces_found: usize,
2046    /// Number of empty namespaces removed (documents deleted)
2047    pub empty_namespaces_removed: usize,
2048    /// Number of old documents found (older than threshold)
2049    pub old_docs_found: usize,
2050    /// Number of old documents removed
2051    pub old_docs_removed: usize,
2052    /// Estimated space freed in bytes (if calculable)
2053    pub bytes_freed: Option<u64>,
2054    /// List of namespaces that were empty
2055    pub empty_namespace_names: Vec<String>,
2056    /// List of namespaces affected by old doc cleanup
2057    pub affected_namespaces: Vec<String>,
2058}
2059
2060impl GcStats {
2061    /// Check if any issues were found
2062    pub fn has_issues(&self) -> bool {
2063        self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
2064    }
2065
2066    /// Check if any deletions occurred
2067    pub fn has_deletions(&self) -> bool {
2068        self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
2069    }
2070}
2071
2072/// Configuration for garbage collection
2073#[derive(Debug, Clone)]
2074pub struct GcConfig {
2075    /// Remove orphan embeddings (embeddings with no parent document)
2076    pub remove_orphans: bool,
2077    /// Remove empty namespaces (namespaces with 0 documents)
2078    pub remove_empty: bool,
2079    /// Remove documents older than this duration
2080    pub older_than: Option<chrono::Duration>,
2081    /// Dry run mode - only report what would be removed
2082    pub dry_run: bool,
2083    /// Limit to specific namespace (None = all namespaces)
2084    pub namespace: Option<String>,
2085}
2086
2087impl Default for GcConfig {
2088    fn default() -> Self {
2089        Self {
2090            remove_orphans: false,
2091            remove_empty: false,
2092            older_than: None,
2093            dry_run: true,
2094            namespace: None,
2095        }
2096    }
2097}
2098
2099/// Parse a duration string like "30d", "6m", "1y"
2100pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
2101    let s = s.trim().to_lowercase();
2102    if s.is_empty() {
2103        return Err(anyhow!("Empty duration string"));
2104    }
2105
2106    // Extract numeric part and unit
2107    let (num_str, unit) = if s.ends_with('d') {
2108        (&s[..s.len() - 1], 'd')
2109    } else if s.ends_with('m') {
2110        (&s[..s.len() - 1], 'm')
2111    } else if s.ends_with('y') {
2112        (&s[..s.len() - 1], 'y')
2113    } else {
2114        return Err(anyhow!(
2115            "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
2116            s
2117        ));
2118    };
2119
2120    let num: i64 = num_str.parse().map_err(|_| {
2121        anyhow!(
2122            "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
2123            s
2124        )
2125    })?;
2126
2127    if num <= 0 {
2128        return Err(anyhow!("Duration must be positive, got '{}'", s));
2129    }
2130
2131    match unit {
2132        'd' => Ok(chrono::Duration::days(num)),
2133        'm' => Ok(chrono::Duration::days(num * 30)), // Approximate month
2134        'y' => Ok(chrono::Duration::days(num * 365)), // Approximate year
2135        _ => unreachable!(),
2136    }
2137}
2138
2139impl StorageManager {
2140    // =========================================================================
2141    // GARBAGE COLLECTION OPERATIONS
2142    // =========================================================================
2143
2144    /// Run garbage collection based on configuration
2145    #[doc(alias = "run_gc")]
2146    pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
2147        let mut stats = GcStats::default();
2148
2149        const PAGE_SIZE: usize = 5000;
2150        let mut all_docs: Vec<ChromaDocument> = Vec::new();
2151        let mut offset = 0;
2152        loop {
2153            let page = self
2154                .all_documents_page(config.namespace.as_deref(), offset, PAGE_SIZE)
2155                .await?;
2156            let page_len = page.len();
2157            all_docs.extend(page);
2158            if page_len < PAGE_SIZE {
2159                break;
2160            }
2161            offset += page_len;
2162        }
2163
2164        if all_docs.is_empty() {
2165            return Ok(stats);
2166        }
2167
2168        // Group documents by namespace
2169        let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
2170            std::collections::HashMap::new();
2171        for doc in &all_docs {
2172            by_namespace
2173                .entry(doc.namespace.clone())
2174                .or_default()
2175                .push(doc);
2176        }
2177
2178        // 1. Find orphan embeddings (documents with parent_id that doesn't exist)
2179        if config.remove_orphans {
2180            let orphan_stats = self
2181                .find_and_remove_orphans(&all_docs, config.dry_run)
2182                .await?;
2183            stats.orphans_found = orphan_stats.0;
2184            stats.orphans_removed = orphan_stats.1;
2185        }
2186
2187        // 2. Find and optionally remove empty namespaces
2188        if config.remove_empty {
2189            let empty_stats = self
2190                .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
2191                .await?;
2192            stats.empty_namespaces_found = empty_stats.0;
2193            stats.empty_namespaces_removed = empty_stats.1;
2194            stats.empty_namespace_names = empty_stats.2;
2195        }
2196
2197        // 3. Find and optionally remove old documents
2198        if let Some(ref duration) = config.older_than {
2199            let old_stats = self
2200                .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
2201                .await?;
2202            stats.old_docs_found = old_stats.0;
2203            stats.old_docs_removed = old_stats.1;
2204            stats.affected_namespaces = old_stats.2;
2205        }
2206
2207        Ok(stats)
2208    }
2209
2210    #[deprecated(note = "use garbage_collect")]
2211    pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
2212        self.garbage_collect(config).await
2213    }
2214
2215    /// Find orphan embeddings - documents with parent_id pointing to non-existent documents
2216    async fn find_and_remove_orphans(
2217        &self,
2218        docs: &[ChromaDocument],
2219        dry_run: bool,
2220    ) -> Result<(usize, usize)> {
2221        // Build a set of all document IDs
2222        let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
2223
2224        // Find documents with parent_id that doesn't exist in the ID set
2225        let mut orphans: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
2226        for doc in docs {
2227            if let Some(ref parent_id) = doc.parent_id
2228                && !all_ids.contains(parent_id.as_str())
2229            {
2230                orphans.push((&doc.namespace, &doc.id));
2231            }
2232        }
2233
2234        let found = orphans.len();
2235        let mut removed = 0;
2236
2237        if !dry_run && !orphans.is_empty() {
2238            for (namespace, id) in &orphans {
2239                if self.delete_document(namespace, id).await.is_ok() {
2240                    removed += 1;
2241                }
2242            }
2243        }
2244
2245        Ok((found, removed))
2246    }
2247
2248    /// Find empty namespaces - this checks if namespaces have 0 documents
2249    /// Note: In LanceDB, namespaces are implicit (just a column value), so "removing"
2250    /// an empty namespace means there are no documents to delete
2251    async fn find_and_remove_empty_namespaces(
2252        &self,
2253        by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
2254        _dry_run: bool,
2255    ) -> Result<(usize, usize, Vec<String>)> {
2256        // Find namespaces with 0 documents
2257        let empty_namespaces: Vec<String> = by_namespace
2258            .iter()
2259            .filter(|(_, docs)| docs.is_empty())
2260            .map(|(ns, _)| ns.clone())
2261            .collect();
2262
2263        let found = empty_namespaces.len();
2264        // Empty namespaces don't need deletion - they have no documents
2265        // Just report them
2266        let removed = 0;
2267
2268        Ok((found, removed, empty_namespaces))
2269    }
2270
2271    /// Find and optionally remove documents older than the specified duration
2272    async fn find_and_remove_old_docs(
2273        &self,
2274        docs: &[ChromaDocument],
2275        older_than: &chrono::Duration,
2276        dry_run: bool,
2277    ) -> Result<(usize, usize, Vec<String>)> {
2278        let cutoff = chrono::Utc::now() - *older_than;
2279
2280        let mut old_docs: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
2281        let mut affected_namespaces: std::collections::HashSet<String> =
2282            std::collections::HashSet::new();
2283
2284        for doc in docs {
2285            // Check for timestamp in metadata
2286            if let Some(obj) = doc.metadata.as_object() {
2287                let mut doc_timestamp: Option<String> = None;
2288
2289                // Look for common timestamp field names
2290                for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
2291                    if let Some(value) = obj.get(*key)
2292                        && let Some(ts) = value.as_str()
2293                    {
2294                        doc_timestamp = Some(ts.to_string());
2295                        break;
2296                    }
2297                }
2298
2299                // Check if document is older than cutoff
2300                if let Some(ts) = doc_timestamp {
2301                    // Parse the timestamp - try RFC3339 first, then other formats
2302                    let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
2303                        parsed < cutoff
2304                    } else if let Ok(parsed) =
2305                        chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
2306                    {
2307                        parsed < cutoff.naive_utc()
2308                    } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
2309                        parsed < cutoff.date_naive()
2310                    } else {
2311                        // Can't parse timestamp, skip this document
2312                        false
2313                    };
2314
2315                    if is_old {
2316                        old_docs.push((&doc.namespace, &doc.id));
2317                        affected_namespaces.insert(doc.namespace.clone());
2318                    }
2319                }
2320            }
2321        }
2322
2323        let found = old_docs.len();
2324        let mut removed = 0;
2325
2326        if !dry_run && !old_docs.is_empty() {
2327            for (namespace, id) in &old_docs {
2328                if self.delete_document(namespace, id).await.is_ok() {
2329                    removed += 1;
2330                }
2331            }
2332        }
2333
2334        Ok((found, removed, affected_namespaces.into_iter().collect()))
2335    }
2336
2337    /// List all unique namespaces in the database
2338    pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
2339        self.refresh().await?;
2340
2341        let mut namespace_counts: std::collections::HashMap<String, usize> =
2342            std::collections::HashMap::new();
2343
2344        for table_name in self.data_table_names().await? {
2345            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
2346                continue;
2347            };
2348            const PAGE_SIZE: usize = 5000;
2349            let mut offset = 0;
2350            loop {
2351                let page = self
2352                    .query_table_page(&table, None, offset, PAGE_SIZE)
2353                    .await?;
2354                let page_len = page.len();
2355                for doc in &page {
2356                    *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
2357                }
2358                if page_len < PAGE_SIZE {
2359                    break;
2360                }
2361                offset += page_len;
2362            }
2363        }
2364
2365        let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
2366        namespaces.sort_by(|a, b| a.0.cmp(&b.0));
2367        Ok(namespaces)
2368    }
2369}
2370
2371#[cfg(test)]
2372mod tests {
2373    use super::*;
2374    use serde_json::json;
2375    use tempfile::TempDir;
2376
2377    #[test]
2378    fn flat_documents_preserve_separate_chunk_and_source_hashes() {
2379        let doc = ChromaDocument::new_flat_with_hashes(
2380            "doc-1".to_string(),
2381            "kb:transcripts".to_string(),
2382            vec![0.0, 1.0],
2383            json!({"path": "sample.md"}),
2384            "outer summary chunk".to_string(),
2385            "chunk-sha256".to_string(),
2386            Some("source-sha256".to_string()),
2387        );
2388
2389        assert_eq!(doc.content_hash.as_deref(), Some("chunk-sha256"));
2390        assert_eq!(doc.source_hash.as_deref(), Some("source-sha256"));
2391        assert_ne!(doc.content_hash, doc.source_hash);
2392    }
2393
2394    #[tokio::test]
2395    async fn namespace_writes_use_separate_lance_tables_and_keep_contracts() {
2396        let tmp = TempDir::new().expect("temp dir");
2397        let db_path = tmp.path().join("lancedb");
2398        let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
2399            .await
2400            .expect("storage");
2401
2402        let embedding = vec![0.25_f32; 8];
2403        storage
2404            .add_to_store(vec![
2405                ChromaDocument::new_flat(
2406                    "shared-id".to_string(),
2407                    "kb:alpha".to_string(),
2408                    embedding.clone(),
2409                    json!({"ns": "alpha"}),
2410                    "alpha memory".to_string(),
2411                ),
2412                ChromaDocument::new_flat(
2413                    "shared-id".to_string(),
2414                    "kb:beta".to_string(),
2415                    embedding.clone(),
2416                    json!({"ns": "beta"}),
2417                    "beta memory".to_string(),
2418                ),
2419            ])
2420            .await
2421            .expect("write two namespaces");
2422
2423        let table_names = storage.lance.table_names().execute().await.expect("tables");
2424        let namespace_tables = table_names
2425            .iter()
2426            .filter(|name| StorageManager::is_namespace_table_name(name))
2427            .count();
2428        assert_eq!(namespace_tables, 2, "{table_names:?}");
2429        assert!(!table_names.iter().any(|name| name == DEFAULT_TABLE_NAME));
2430
2431        assert_eq!(storage.count_namespace("kb:alpha").await.unwrap(), 1);
2432        assert_eq!(storage.count_namespace("kb:beta").await.unwrap(), 1);
2433        assert_eq!(
2434            storage
2435                .get_document("kb:alpha", "shared-id")
2436                .await
2437                .unwrap()
2438                .unwrap()
2439                .document,
2440            "alpha memory"
2441        );
2442        assert_eq!(
2443            storage
2444                .search_store(Some("kb:beta"), embedding, 10)
2445                .await
2446                .unwrap()
2447                .into_iter()
2448                .map(|doc| doc.document)
2449                .collect::<Vec<_>>(),
2450            vec!["beta memory"]
2451        );
2452    }
2453}