Skip to main content

rust_memex/storage/
mod.rs

1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4    Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator,
5    RecordBatchReader, StringArray, UInt8Array,
6};
7use arrow_schema::{DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{NewColumnTransform, OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16use std::collections::HashMap;
17use std::fmt;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tracing::{debug, error, info, warn};
23use uuid::Uuid;
24
25use crate::rag::SliceLayer;
26
27/// Schema version for LanceDB tables. Increment when changing table structure.
28/// Version 2: Added onion slice fields (layer, parent_id, children_ids, keywords)
29/// Version 3: Added content_hash for exact-match deduplication
30/// Version 4: Split `content_hash` into per-chunk SHA256 (this column) and a
31///            new `source_hash` column for the SHA256 of the source document
32///            text. This enables both layer-aware chunk dedup and pre-index
33///            source-level dedup. Pre-v4 rows store source-text hash in
34///            `content_hash`; backfill via `/admin/backfill-hashes` corrects
35///            this without breaking old data.
36/// See docs/MIGRATION.md for migration procedures.
37pub const SCHEMA_VERSION: u32 = 4;
38pub const DEFAULT_TABLE_NAME: &str = "mcp_documents";
39const NAMESPACE_TABLE_PREFIX: &str = "mcp_documents__ns__";
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaVersion {
43    V3,
44    V4,
45}
46
47impl SchemaVersion {
48    pub fn current() -> Self {
49        Self::V4
50    }
51}
52
53impl fmt::Display for SchemaVersion {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        match self {
56            Self::V3 => f.write_str("v3"),
57            Self::V4 => f.write_str("v4"),
58        }
59    }
60}
61
62impl FromStr for SchemaVersion {
63    type Err = anyhow::Error;
64
65    fn from_str(value: &str) -> Result<Self> {
66        match value.trim().to_ascii_lowercase().as_str() {
67            "3" | "v3" => Ok(Self::V3),
68            "4" | "v4" => Ok(Self::V4),
69            other => Err(anyhow!(
70                "unsupported schema target '{other}' (expected v3 or v4)"
71            )),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaMigrationReport {
78    pub target: SchemaVersion,
79    pub missing_columns: Vec<Field>,
80    pub applied: bool,
81}
82
83impl SchemaMigrationReport {
84    pub fn missing_column_names(&self) -> Vec<&str> {
85        self.missing_columns
86            .iter()
87            .map(|field| field.name().as_str())
88            .collect()
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct SchemaStatusReport {
94    pub schema_version: SchemaVersion,
95    pub expected_schema: SchemaVersion,
96    pub needs_migration: bool,
97    pub missing_columns: Vec<String>,
98    pub manifest_version: Option<u64>,
99}
100
101pub fn required_columns_for(version: SchemaVersion) -> Vec<Field> {
102    let mut fields = vec![Field::new("content_hash", DataType::Utf8, true)];
103    if matches!(version, SchemaVersion::V4) {
104        fields.push(Field::new("source_hash", DataType::Utf8, true));
105    }
106    fields
107}
108
109#[derive(Debug, Clone)]
110pub struct SchemaMismatchWriteError {
111    table_name: String,
112    db_path: String,
113    missing_columns: Vec<String>,
114    message: String,
115}
116
117impl SchemaMismatchWriteError {
118    fn new(
119        table_name: impl Into<String>,
120        db_path: impl Into<String>,
121        missing_columns: Vec<String>,
122        message: impl Into<String>,
123    ) -> Self {
124        Self {
125            table_name: table_name.into(),
126            db_path: db_path.into(),
127            missing_columns,
128            message: message.into(),
129        }
130    }
131
132    pub fn missing_columns(&self) -> &[String] {
133        &self.missing_columns
134    }
135
136    pub fn db_path(&self) -> &str {
137        &self.db_path
138    }
139
140    pub fn table_name(&self) -> &str {
141        &self.table_name
142    }
143
144    pub fn remediation(&self) -> String {
145        format!("rust-memex migrate-schema --db-path {}", self.db_path)
146    }
147}
148
149impl fmt::Display for SchemaMismatchWriteError {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        write!(
152            f,
153            "ERROR schema mismatch while writing Lance table '{}': missing columns {:?}. {}. Remediation: {}",
154            self.table_name,
155            self.missing_columns,
156            self.message,
157            self.remediation()
158        )
159    }
160}
161
162impl std::error::Error for SchemaMismatchWriteError {}
163
164fn is_schema_mismatch_message(message: &str) -> bool {
165    let lower = message.to_ascii_lowercase();
166    lower.contains("schema mismatch")
167        || lower.contains("append with different schema")
168        || lower.contains("fields did not match")
169        || lower.contains("missing=[")
170}
171
172fn extract_missing_columns(message: &str) -> Vec<String> {
173    let mut columns = Vec::new();
174    let mut rest = message;
175
176    while let Some(start) = rest.find("missing=[") {
177        rest = &rest[start + "missing=[".len()..];
178        let Some(end) = rest.find(']') else {
179            break;
180        };
181        let list = &rest[..end];
182        for item in list.split(',') {
183            let column = item
184                .trim()
185                .trim_matches('`')
186                .trim_matches('"')
187                .trim_matches('\'');
188            if !column.is_empty() && !columns.iter().any(|existing| existing == column) {
189                columns.push(column.to_string());
190            }
191        }
192        rest = &rest[end + 1..];
193    }
194
195    if columns.is_empty() {
196        for field in required_columns_for(SchemaVersion::current()) {
197            let name = field.name();
198            if message.contains(name) {
199                columns.push(name.to_string());
200            }
201        }
202    }
203
204    columns
205}
206
207// =============================================================================
208// STORAGE BACKEND INTERFACE
209// =============================================================================
210//
211// To add a new storage backend, implement a struct with the following methods:
212//
213//   async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()>
214//   async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>>
215//   async fn search(&self, namespace: Option<&str>, embedding: &[f32], k: usize) -> Result<Vec<ChromaDocument>>
216//   async fn delete(&self, namespace: &str, id: &str) -> Result<usize>
217//   async fn delete_namespace(&self, namespace: &str) -> Result<usize>
218//
219// Current implementation:
220//   - `StorageManager`: LanceDB embedded vector store
221//
222// Future alternatives to consider:
223//   - Qdrant, Milvus, Pinecone (external vector DBs)
224//   - SQLite with vector extension
225// =============================================================================
226
227#[derive(Debug, Serialize, Clone)]
228pub struct ChromaDocument {
229    pub id: String,
230    pub namespace: String,
231    pub embedding: Vec<f32>,
232    pub metadata: serde_json::Value,
233    pub document: String,
234    /// Onion slice layer (1=Outer, 2=Middle, 3=Inner, 4=Core, 0=legacy flat)
235    pub layer: u8,
236    /// Parent slice ID in the onion hierarchy (None for Core slices)
237    pub parent_id: Option<String>,
238    /// Children slice IDs in the onion hierarchy
239    pub children_ids: Vec<String>,
240    /// Extracted keywords for this slice
241    pub keywords: Vec<String>,
242    /// SHA256 hash of THIS chunk's text. Used for chunk-level deduplication.
243    /// Pre-v4 rows may transitionally store the source-text hash here; the
244    /// `/admin/backfill-hashes` endpoint corrects them.
245    pub content_hash: Option<String>,
246    /// SHA256 hash of the SOURCE document text (same value across all four
247    /// onion layers from one source). Used for pre-index dedup so we never
248    /// re-embed an already-ingested file. Optional for backward compatibility
249    /// with v3 schemas — `None` means "unknown source provenance".
250    pub source_hash: Option<String>,
251}
252
253impl ChromaDocument {
254    /// Create a new document with default (legacy) slice values
255    pub fn new_flat(
256        id: String,
257        namespace: String,
258        embedding: Vec<f32>,
259        metadata: serde_json::Value,
260        document: String,
261    ) -> Self {
262        Self {
263            id,
264            namespace,
265            embedding,
266            metadata,
267            document,
268            layer: 0, // Legacy flat mode
269            parent_id: None,
270            children_ids: vec![],
271            keywords: vec![],
272            content_hash: None,
273            source_hash: None,
274        }
275    }
276
277    /// Create a new document with content hash for deduplication.
278    /// Source hash is left empty — prefer `new_flat_with_hashes` so callers
279    /// supply the source-document hash explicitly when they have it.
280    pub fn new_flat_with_hash(
281        id: String,
282        namespace: String,
283        embedding: Vec<f32>,
284        metadata: serde_json::Value,
285        document: String,
286        content_hash: String,
287    ) -> Self {
288        Self::new_flat_with_hashes(
289            id,
290            namespace,
291            embedding,
292            metadata,
293            document,
294            content_hash,
295            None,
296        )
297    }
298
299    /// Create a flat (legacy) document with both per-chunk and source hashes.
300    pub fn new_flat_with_hashes(
301        id: String,
302        namespace: String,
303        embedding: Vec<f32>,
304        metadata: serde_json::Value,
305        document: String,
306        content_hash: String,
307        source_hash: Option<String>,
308    ) -> Self {
309        Self {
310            id,
311            namespace,
312            embedding,
313            metadata,
314            document,
315            layer: 0,
316            parent_id: None,
317            children_ids: vec![],
318            keywords: vec![],
319            content_hash: Some(content_hash),
320            source_hash,
321        }
322    }
323
324    /// Create a document from an onion slice
325    pub fn from_onion_slice(
326        slice: &crate::rag::OnionSlice,
327        namespace: String,
328        embedding: Vec<f32>,
329        metadata: serde_json::Value,
330    ) -> Self {
331        Self {
332            id: slice.id.clone(),
333            namespace,
334            embedding,
335            metadata,
336            document: slice.content.clone(),
337            layer: slice.layer.as_u8(),
338            parent_id: slice.parent_id.clone(),
339            children_ids: slice.children_ids.clone(),
340            keywords: slice.keywords.clone(),
341            content_hash: None,
342            source_hash: None,
343        }
344    }
345
346    /// Create a document from an onion slice with content hash for deduplication.
347    /// Source hash is left empty — prefer `from_onion_slice_with_hashes` to
348    /// preserve source provenance for pre-index dedup.
349    pub fn from_onion_slice_with_hash(
350        slice: &crate::rag::OnionSlice,
351        namespace: String,
352        embedding: Vec<f32>,
353        metadata: serde_json::Value,
354        content_hash: String,
355    ) -> Self {
356        Self::from_onion_slice_with_hashes(
357            slice,
358            namespace,
359            embedding,
360            metadata,
361            content_hash,
362            None,
363        )
364    }
365
366    /// Create an onion-slice document with both per-chunk and source hashes.
367    pub fn from_onion_slice_with_hashes(
368        slice: &crate::rag::OnionSlice,
369        namespace: String,
370        embedding: Vec<f32>,
371        metadata: serde_json::Value,
372        content_hash: String,
373        source_hash: Option<String>,
374    ) -> Self {
375        Self {
376            id: slice.id.clone(),
377            namespace,
378            embedding,
379            metadata,
380            document: slice.content.clone(),
381            layer: slice.layer.as_u8(),
382            parent_id: slice.parent_id.clone(),
383            children_ids: slice.children_ids.clone(),
384            keywords: slice.keywords.clone(),
385            content_hash: Some(content_hash),
386            source_hash,
387        }
388    }
389
390    /// Check if this is a legacy flat chunk (not an onion slice)
391    pub fn is_flat(&self) -> bool {
392        self.layer == 0
393    }
394
395    /// Get the slice layer if this is an onion slice
396    pub fn slice_layer(&self) -> Option<SliceLayer> {
397        SliceLayer::from_u8(self.layer)
398    }
399}
400
401pub struct StorageManager {
402    lance: Connection,
403    table: Arc<Mutex<Option<Table>>>,
404    namespace_tables: Arc<Mutex<HashMap<String, Table>>>,
405    collection_name: String,
406    lance_path: String,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
410#[serde(rename_all = "snake_case")]
411pub enum CrossStoreRecoveryStatus {
412    #[default]
413    Pending,
414    RolledBack,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossStoreRecoveryDocumentRef {
419    pub namespace: String,
420    pub id: String,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct CrossStoreRecoveryBatch {
425    pub batch_id: String,
426    pub created_at: String,
427    #[serde(default)]
428    pub status: CrossStoreRecoveryStatus,
429    #[serde(default)]
430    pub last_error: Option<String>,
431    pub documents: Vec<CrossStoreRecoveryDocumentRef>,
432}
433
434impl CrossStoreRecoveryBatch {
435    pub fn from_documents(documents: &[ChromaDocument]) -> Self {
436        Self {
437            batch_id: Uuid::new_v4().to_string(),
438            created_at: chrono::Utc::now().to_rfc3339(),
439            status: CrossStoreRecoveryStatus::Pending,
440            last_error: None,
441            documents: documents
442                .iter()
443                .map(|document| CrossStoreRecoveryDocumentRef {
444                    namespace: document.namespace.clone(),
445                    id: document.id.clone(),
446                })
447                .collect(),
448        }
449    }
450}
451
452// lancedb 0.27 `Table::add` requires `T: Scannable`. `RecordBatchIterator`
453// impls `RecordBatchReader`, and `Box<dyn RecordBatchReader + Send + 'static>`
454// has a blanket `Scannable` impl — boxing satisfies the bound without changing
455// the producer side.
456type BatchIter = Box<dyn RecordBatchReader + Send + 'static>;
457
458impl StorageManager {
459    pub async fn new(db_path: &str) -> Result<Self> {
460        // Embedded LanceDB path (expand ~, allow override via env)
461        let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
462        let lance_path = if lance_env.trim().is_empty() {
463            shellexpand::tilde("~/.rmcp-servers/rust-memex/lancedb").to_string()
464        } else {
465            shellexpand::tilde(&lance_env).to_string()
466        };
467
468        let lance = connect(&lance_path).execute().await?;
469
470        Ok(Self {
471            lance,
472            table: Arc::new(Mutex::new(None)),
473            namespace_tables: Arc::new(Mutex::new(HashMap::new())),
474            collection_name: DEFAULT_TABLE_NAME.to_string(),
475            lance_path,
476        })
477    }
478
479    /// Create a storage manager for CLI tools.
480    /// Use this for CLI tools that only need vector operations (index/search).
481    pub async fn new_lance_only(db_path: &str) -> Result<Self> {
482        let lance_path = shellexpand::tilde(db_path).to_string();
483        let lance = connect(&lance_path).execute().await?;
484
485        Ok(Self {
486            lance,
487            table: Arc::new(Mutex::new(None)),
488            namespace_tables: Arc::new(Mutex::new(HashMap::new())),
489            collection_name: DEFAULT_TABLE_NAME.to_string(),
490            lance_path,
491        })
492    }
493
494    pub fn lance_path(&self) -> &str {
495        &self.lance_path
496    }
497
498    pub async fn require_current_schema_for_writes(&self) -> Result<()> {
499        let Some(table) = self.open_table_if_exists().await? else {
500            return Ok(());
501        };
502        self.ensure_hash_schema_columns(&table).await
503    }
504
505    pub async fn schema_status(
506        &self,
507        expected_schema: SchemaVersion,
508    ) -> Result<SchemaStatusReport> {
509        let Some(table) = self.open_table_if_exists().await? else {
510            return Ok(SchemaStatusReport {
511                schema_version: expected_schema,
512                expected_schema,
513                needs_migration: false,
514                missing_columns: Vec::new(),
515                manifest_version: None,
516            });
517        };
518
519        let missing_columns = Self::missing_required_columns(&table, expected_schema)
520            .await?
521            .into_iter()
522            .map(|field| field.name().to_string())
523            .collect::<Vec<_>>();
524        let manifest_version = table
525            .list_versions()
526            .await
527            .ok()
528            .and_then(|versions| versions.iter().map(|version| version.version).max());
529
530        Ok(SchemaStatusReport {
531            schema_version: if missing_columns.is_empty() {
532                expected_schema
533            } else {
534                SchemaVersion::V3
535            },
536            expected_schema,
537            needs_migration: !missing_columns.is_empty(),
538            missing_columns,
539            manifest_version,
540        })
541    }
542
543    pub async fn missing_required_columns(
544        table: &Table,
545        target: SchemaVersion,
546    ) -> Result<Vec<Field>> {
547        let schema = table.schema().await?;
548        Ok(required_columns_for(target)
549            .into_iter()
550            .filter(|field| schema.field_with_name(field.name()).is_err())
551            .collect())
552    }
553
554    pub async fn migrate_lance_schema(
555        db_path: &str,
556        target: SchemaVersion,
557        check_only: bool,
558    ) -> Result<SchemaMigrationReport> {
559        let lance_path = shellexpand::tilde(db_path).to_string();
560        let lance = connect(&lance_path).execute().await?;
561        let table = lance.open_table(DEFAULT_TABLE_NAME).execute().await?;
562        let missing = Self::missing_required_columns(&table, target).await?;
563
564        if missing.is_empty() || check_only {
565            return Ok(SchemaMigrationReport {
566                target,
567                missing_columns: missing,
568                applied: false,
569            });
570        }
571
572        let transform = NewColumnTransform::AllNulls(Arc::new(Schema::new(missing.clone())));
573        if let Err(error) = table.add_columns(transform, None).await {
574            let _ = table.checkout_latest().await;
575            let remaining = Self::missing_required_columns(&table, target).await?;
576            if remaining.is_empty() {
577                warn!(
578                    "Lance table '{}' schema migration raced with another writer and is already complete",
579                    DEFAULT_TABLE_NAME
580                );
581                return Ok(SchemaMigrationReport {
582                    target,
583                    missing_columns: missing,
584                    applied: true,
585                });
586            }
587            return Err(anyhow!(
588                "failed to migrate Lance table '{}' schema to {target}: {error}",
589                DEFAULT_TABLE_NAME
590            ));
591        }
592
593        let _ = table.checkout_latest().await;
594        Ok(SchemaMigrationReport {
595            target,
596            missing_columns: missing,
597            applied: true,
598        })
599    }
600
601    pub fn cross_store_recovery_dir(&self) -> PathBuf {
602        let db_path = Path::new(&self.lance_path);
603        let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
604        let stem = db_path
605            .file_name()
606            .and_then(|name| name.to_str())
607            .unwrap_or("lancedb");
608        parent.join(format!(".{stem}-cross-store-recovery"))
609    }
610
611    fn cross_store_recovery_batch_path(&self, batch_id: &str) -> PathBuf {
612        self.cross_store_recovery_dir()
613            .join(format!("{batch_id}.json"))
614    }
615
616    fn write_cross_store_recovery_batch(&self, batch: &CrossStoreRecoveryBatch) -> Result<PathBuf> {
617        let dir = self.cross_store_recovery_dir();
618        std::fs::create_dir_all(&dir)?;
619
620        let path = self.cross_store_recovery_batch_path(&batch.batch_id);
621        let tmp_path = path.with_extension("json.tmp");
622        let payload = serde_json::to_vec_pretty(batch)?;
623
624        std::fs::write(&tmp_path, payload)?;
625        std::fs::rename(&tmp_path, &path)?;
626
627        Ok(path)
628    }
629
630    pub fn persist_cross_store_recovery_batch(
631        &self,
632        batch: &CrossStoreRecoveryBatch,
633    ) -> Result<PathBuf> {
634        self.write_cross_store_recovery_batch(batch)
635    }
636
637    pub fn update_cross_store_recovery_batch(
638        &self,
639        batch: &CrossStoreRecoveryBatch,
640    ) -> Result<PathBuf> {
641        self.write_cross_store_recovery_batch(batch)
642    }
643
644    pub fn clear_cross_store_recovery_batch(&self, batch_id: &str) -> Result<()> {
645        let path = self.cross_store_recovery_batch_path(batch_id);
646        match std::fs::remove_file(path) {
647            Ok(()) => Ok(()),
648            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
649            Err(error) => Err(error.into()),
650        }
651    }
652
653    pub fn list_cross_store_recovery_batches(&self) -> Result<Vec<CrossStoreRecoveryBatch>> {
654        let dir = self.cross_store_recovery_dir();
655        if !dir.exists() {
656            return Ok(vec![]);
657        }
658
659        let mut batches = Vec::new();
660        for entry in std::fs::read_dir(dir)? {
661            let entry = entry?;
662            let path = entry.path();
663            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
664                continue;
665            }
666
667            let payload = std::fs::read(&path)?;
668            let batch: CrossStoreRecoveryBatch = serde_json::from_slice(&payload)?;
669            batches.push(batch);
670        }
671
672        batches.sort_by(|left, right| left.created_at.cmp(&right.created_at));
673        Ok(batches)
674    }
675
676    /// Refresh the table connection to see new data written by other processes.
677    /// This clears the cached table reference, forcing it to be re-opened on next query.
678    pub async fn refresh(&self) -> Result<()> {
679        let mut guard = self.table.lock().await;
680        *guard = None;
681        self.namespace_tables.lock().await.clear();
682        tracing::info!("LanceDB table cache cleared - will refresh on next query");
683        Ok(())
684    }
685
686    pub async fn ensure_collection(&self) -> Result<()> {
687        // Attempt to open; if missing, create empty table lazily on first add
688        let mut guard = self.table.lock().await;
689        if guard.is_some() {
690            return Ok(());
691        }
692        match self
693            .lance
694            .open_table(self.collection_name.as_str())
695            .execute()
696            .await
697        {
698            Ok(table) => {
699                *guard = Some(table);
700                info!("Found existing Lance table '{}'", self.collection_name);
701            }
702            Err(_) => {
703                info!(
704                    "Lance table '{}' will be created on first insert",
705                    self.collection_name
706                );
707            }
708        }
709        Ok(())
710    }
711
712    pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
713        if documents.is_empty() {
714            return Ok(());
715        }
716
717        // Pre-validation: check all documents before writing anything
718        let dim = documents
719            .first()
720            .ok_or_else(|| anyhow!("No documents to add"))?
721            .embedding
722            .len();
723        if dim == 0 {
724            return Err(anyhow!("Embedding dimension is zero"));
725        }
726
727        // Validate ALL documents have consistent dimensions and required fields
728        for (i, doc) in documents.iter().enumerate() {
729            if doc.embedding.len() != dim {
730                return Err(anyhow!(
731                    "Document {} has inconsistent embedding dimension: expected {}, got {}. \
732                     Aborting batch to prevent database corruption.",
733                    i,
734                    dim,
735                    doc.embedding.len()
736                ));
737            }
738            if doc.id.is_empty() {
739                return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
740            }
741            if doc.namespace.is_empty() {
742                return Err(anyhow!(
743                    "Document {} has empty namespace. Aborting batch.",
744                    i
745                ));
746            }
747            // Check for NaN/Inf in embeddings
748            for (j, &val) in doc.embedding.iter().enumerate() {
749                if val.is_nan() || val.is_infinite() {
750                    return Err(anyhow!(
751                        "Document {} has invalid embedding value at index {}: {}. \
752                         Aborting batch to prevent database corruption.",
753                        i,
754                        j,
755                        val
756                    ));
757                }
758            }
759        }
760
761        let mut by_namespace: HashMap<String, Vec<ChromaDocument>> = HashMap::new();
762        for document in documents {
763            by_namespace
764                .entry(document.namespace.clone())
765                .or_default()
766                .push(document);
767        }
768
769        let mut inserted = 0usize;
770        for (namespace, docs) in by_namespace {
771            let table = self.ensure_namespace_table(&namespace, dim).await?;
772            self.ensure_hash_schema_columns(&table).await?;
773            let batch = self.docs_to_batch(&docs, dim)?;
774            if let Err(error) = table.add(batch).execute().await {
775                return Err(self.map_lancedb_write_error(error));
776            }
777            inserted += docs.len();
778        }
779        debug!("Inserted {} documents into Lance (validated)", inserted);
780        Ok(())
781    }
782
783    pub async fn search_store(
784        &self,
785        namespace: Option<&str>,
786        embedding: Vec<f32>,
787        k: usize,
788    ) -> Result<Vec<ChromaDocument>> {
789        if embedding.is_empty() {
790            return Ok(vec![]);
791        }
792        let mut results = Vec::new();
793        if let Some(ns) = namespace {
794            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
795                let mut stream = table
796                    .query()
797                    .nearest_to(embedding.clone())?
798                    .limit(k)
799                    .execute()
800                    .await?;
801                while let Some(batch) = stream.try_next().await? {
802                    let mut docs = self.batch_to_docs(&batch)?;
803                    results.append(&mut docs);
804                }
805            }
806            if let Some(table) = self.legacy_table_if_exists().await? {
807                let mut stream = table
808                    .query()
809                    .only_if(self.namespace_filter(ns).as_str())
810                    .nearest_to(embedding)?
811                    .limit(k)
812                    .execute()
813                    .await?;
814                while let Some(batch) = stream.try_next().await? {
815                    let mut docs = self.batch_to_docs(&batch)?;
816                    results.append(&mut docs);
817                }
818            }
819            results.truncate(k);
820        } else {
821            for table_name in self.data_table_names().await? {
822                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
823                    continue;
824                };
825                let mut stream = table
826                    .query()
827                    .nearest_to(embedding.clone())?
828                    .limit(k)
829                    .execute()
830                    .await?;
831                while let Some(batch) = stream.try_next().await? {
832                    let mut docs = self.batch_to_docs(&batch)?;
833                    results.append(&mut docs);
834                }
835            }
836            results.truncate(k);
837        }
838        debug!("Lance returned {} results", results.len());
839        Ok(results)
840    }
841
842    /// Return a single page of documents without running a vector search.
843    ///
844    /// Used by admin/reporting paths that need deterministic limit/offset
845    /// behavior without assuming any embedding dimension or creating a table on
846    /// read.
847    pub async fn all_documents_page(
848        &self,
849        namespace: Option<&str>,
850        offset: usize,
851        limit: usize,
852    ) -> Result<Vec<ChromaDocument>> {
853        let mut results = Vec::new();
854        if let Some(ns) = namespace {
855            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
856                results.append(
857                    &mut self
858                        .query_table_page(&table, None, 0, offset + limit)
859                        .await?,
860                );
861            }
862            if let Some(table) = self.legacy_table_if_exists().await? {
863                results.append(
864                    &mut self
865                        .query_table_page(
866                            &table,
867                            Some(self.namespace_filter(ns)),
868                            0,
869                            offset + limit,
870                        )
871                        .await?,
872                );
873            }
874        } else {
875            for table_name in self.data_table_names().await? {
876                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
877                    continue;
878                };
879                results.append(
880                    &mut self
881                        .query_table_page(&table, None, 0, offset + limit)
882                        .await?,
883                );
884            }
885        }
886
887        Ok(results.into_iter().skip(offset).take(limit).collect())
888    }
889
890    /// Return documents without running a vector search.
891    /// Used by admin/reporting paths that need a bounded full-table scan
892    /// starting from the first row.
893    pub async fn all_documents(
894        &self,
895        namespace: Option<&str>,
896        limit: usize,
897    ) -> Result<Vec<ChromaDocument>> {
898        self.all_documents_page(namespace, 0, limit).await
899    }
900
901    pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
902        let id_filter = self.id_filter(id);
903        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
904            let mut stream = table
905                .query()
906                .only_if(id_filter.as_str())
907                .limit(1)
908                .execute()
909                .await?;
910            if let Some(batch) = stream.try_next().await? {
911                let mut docs = self.batch_to_docs(&batch)?;
912                if let Some(doc) = docs.pop() {
913                    return Ok(Some(doc));
914                }
915            }
916        }
917
918        if let Some(table) = self.legacy_table_if_exists().await? {
919            let filter = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
920            let mut stream = table
921                .query()
922                .only_if(filter.as_str())
923                .limit(1)
924                .execute()
925                .await?;
926            if let Some(batch) = stream.try_next().await? {
927                let mut docs = self.batch_to_docs(&batch)?;
928                if let Some(doc) = docs.pop() {
929                    return Ok(Some(doc));
930                }
931            }
932        }
933        Ok(None)
934    }
935
936    pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
937        let mut deleted = 0usize;
938        let id_filter = self.id_filter(id);
939
940        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
941            let pre_count = table.count_rows(Some(id_filter.clone())).await?;
942            if pre_count > 0 {
943                table.delete(id_filter.as_str()).await?;
944                deleted += pre_count;
945            }
946        }
947
948        if let Some(table) = self.legacy_table_if_exists().await? {
949            let predicate = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
950            let pre_count = table.count_rows(Some(predicate.clone())).await?;
951            if pre_count > 0 {
952                table.delete(predicate.as_str()).await?;
953                deleted += pre_count;
954            }
955        }
956        Ok(deleted)
957    }
958
959    /// Batch delete documents by IDs within a namespace.
960    ///
961    /// Issues a single `DELETE WHERE namespace = X AND id IN (...)` per chunk,
962    /// avoiding the per-document table scan that `delete_document` incurs when
963    /// called in a loop. Predicate is split into 500-id chunks to keep SQL
964    /// length bounded regardless of caller batch size.
965    pub async fn delete_documents(&self, namespace: &str, ids: &[&str]) -> Result<usize> {
966        if ids.is_empty() {
967            return Ok(0);
968        }
969        const CHUNK: usize = 500;
970        let mut total_deleted = 0usize;
971        for batch in ids.chunks(CHUNK) {
972            let id_list = batch
973                .iter()
974                .map(|id| format!("'{}'", id.replace('\'', "''")))
975                .collect::<Vec<_>>()
976                .join(", ");
977            let id_predicate = format!("id IN ({})", id_list);
978            if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
979                let pre_count = table.count_rows(Some(id_predicate.clone())).await?;
980                if pre_count > 0 {
981                    table.delete(id_predicate.as_str()).await?;
982                    total_deleted += pre_count;
983                }
984            }
985            if let Some(table) = self.legacy_table_if_exists().await? {
986                let predicate =
987                    format!("{} AND {}", self.namespace_filter(namespace), id_predicate);
988                let pre_count = table.count_rows(Some(predicate.clone())).await?;
989                if pre_count > 0 {
990                    table.delete(predicate.as_str()).await?;
991                    total_deleted += pre_count;
992                }
993            }
994        }
995        Ok(total_deleted)
996    }
997
998    pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
999        let mut deleted = 0usize;
1000        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1001            let pre_count = table.count_rows(None).await?;
1002            if pre_count > 0 {
1003                table
1004                    .delete(self.namespace_filter(namespace).as_str())
1005                    .await?;
1006                deleted += pre_count;
1007            }
1008        }
1009        if let Some(table) = self.legacy_table_if_exists().await? {
1010            let predicate = self.namespace_filter(namespace);
1011            let pre_count = table.count_rows(Some(predicate.clone())).await?;
1012            if pre_count > 0 {
1013                table.delete(predicate.as_str()).await?;
1014                deleted += pre_count;
1015            }
1016        }
1017        Ok(deleted)
1018    }
1019
1020    pub async fn rename_namespace_atomic(&self, from: &str, to: &str) -> Result<usize> {
1021        if from == to {
1022            return Ok(0);
1023        }
1024
1025        let source_count = self.count_namespace(from).await?;
1026        if source_count == 0 {
1027            return Ok(0);
1028        }
1029
1030        let target_count = self.count_namespace(to).await?;
1031        if target_count > 0 {
1032            return Err(anyhow!(
1033                "Target namespace '{}' already exists with {} rows",
1034                to,
1035                target_count
1036            ));
1037        }
1038
1039        let mut docs = self.all_documents(Some(from), source_count).await?;
1040        for doc in &mut docs {
1041            doc.namespace = to.to_string();
1042        }
1043        self.add_to_store(docs).await?;
1044        let deleted = self.delete_namespace_documents(from).await?;
1045
1046        Ok(deleted)
1047    }
1048
1049    pub fn get_collection_name(&self) -> &str {
1050        &self.collection_name
1051    }
1052
1053    fn namespace_table_name(namespace: &str) -> String {
1054        let mut safe = namespace
1055            .chars()
1056            .map(|ch| {
1057                if ch.is_ascii_alphanumeric() {
1058                    ch.to_ascii_lowercase()
1059                } else {
1060                    '_'
1061                }
1062            })
1063            .collect::<String>();
1064        while safe.contains("__") {
1065            safe = safe.replace("__", "_");
1066        }
1067        let safe = safe.trim_matches('_');
1068        let safe = if safe.is_empty() { "default" } else { safe };
1069        let safe = safe.chars().take(48).collect::<String>();
1070        let hash = Sha256::digest(namespace.as_bytes());
1071        let suffix = hash[..6]
1072            .iter()
1073            .map(|byte| format!("{byte:02x}"))
1074            .collect::<String>();
1075        format!("{NAMESPACE_TABLE_PREFIX}{safe}_{suffix}")
1076    }
1077
1078    fn is_namespace_table_name(table_name: &str) -> bool {
1079        table_name.starts_with(NAMESPACE_TABLE_PREFIX)
1080    }
1081
1082    async fn data_table_names(&self) -> Result<Vec<String>> {
1083        let table_names = self.lance.table_names().execute().await?;
1084        Ok(table_names
1085            .into_iter()
1086            .filter(|name| name == DEFAULT_TABLE_NAME || Self::is_namespace_table_name(name))
1087            .collect())
1088    }
1089
1090    async fn open_named_table_if_exists(&self, table_name: &str) -> Result<Option<Table>> {
1091        match self.lance.open_table(table_name).execute().await {
1092            Ok(table) => Ok(Some(table)),
1093            Err(e) => {
1094                let msg = e.to_string().to_lowercase();
1095                if msg.contains("not found")
1096                    || msg.contains("does not exist")
1097                    || msg.contains("no such file")
1098                {
1099                    Ok(None)
1100                } else {
1101                    Err(anyhow!("LanceDB error on table '{}': {}", table_name, e))
1102                }
1103            }
1104        }
1105    }
1106
1107    async fn open_namespace_table_if_exists(&self, namespace: &str) -> Result<Option<Table>> {
1108        let table_name = Self::namespace_table_name(namespace);
1109        if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1110            return Ok(Some(table));
1111        }
1112        let table = self.open_named_table_if_exists(&table_name).await?;
1113        if let Some(table) = &table {
1114            self.namespace_tables
1115                .lock()
1116                .await
1117                .insert(table_name, table.clone());
1118        }
1119        Ok(table)
1120    }
1121
1122    async fn ensure_namespace_table(&self, namespace: &str, dim: usize) -> Result<Table> {
1123        let table_name = Self::namespace_table_name(namespace);
1124        if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1125            return Ok(table);
1126        }
1127
1128        let table = match self.open_named_table_if_exists(&table_name).await? {
1129            Some(table) => table,
1130            None => {
1131                if dim == 0 {
1132                    return Err(anyhow!(
1133                        "Vector table '{}' not found and dimension is unknown",
1134                        table_name
1135                    ));
1136                }
1137                info!(
1138                    "Creating Lance namespace table '{}' for '{}' with vector dimension {} (schema v{})",
1139                    table_name, namespace, dim, SCHEMA_VERSION
1140                );
1141                let schema = Arc::new(Self::create_schema(dim));
1142                self.lance
1143                    .create_empty_table(table_name.as_str(), schema)
1144                    .execute()
1145                    .await?
1146            }
1147        };
1148
1149        self.namespace_tables
1150            .lock()
1151            .await
1152            .insert(table_name, table.clone());
1153        Ok(table)
1154    }
1155
1156    async fn query_table_page(
1157        &self,
1158        table: &Table,
1159        filter: Option<String>,
1160        offset: usize,
1161        limit: usize,
1162    ) -> Result<Vec<ChromaDocument>> {
1163        let mut query = table.query().limit(limit).offset(offset);
1164        if let Some(filter) = filter {
1165            query = query.only_if(filter.as_str());
1166        }
1167        let mut stream = query.execute().await?;
1168        let mut results = Vec::new();
1169        while let Some(batch) = stream.try_next().await? {
1170            let mut docs = self.batch_to_docs(&batch)?;
1171            results.append(&mut docs);
1172        }
1173        Ok(results)
1174    }
1175
1176    async fn legacy_table_if_exists(&self) -> Result<Option<Table>> {
1177        self.open_table_if_exists().await
1178    }
1179
1180    /// Try to open the table without creating it.
1181    /// Returns `Ok(None)` when the table genuinely does not exist.
1182    /// Propagates real errors (I/O, corruption, permission) as `Err`.
1183    async fn open_table_if_exists(&self) -> Result<Option<Table>> {
1184        let mut guard = self.table.lock().await;
1185        if let Some(table) = guard.as_ref() {
1186            return Ok(Some(table.clone()));
1187        }
1188
1189        match self
1190            .lance
1191            .open_table(self.collection_name.as_str())
1192            .execute()
1193            .await
1194        {
1195            Ok(tbl) => {
1196                *guard = Some(tbl.clone());
1197                Ok(Some(tbl))
1198            }
1199            Err(e) => {
1200                let msg = e.to_string().to_lowercase();
1201                if msg.contains("not found")
1202                    || msg.contains("does not exist")
1203                    || msg.contains("no such file")
1204                {
1205                    Ok(None)
1206                } else {
1207                    tracing::warn!(
1208                        "LanceDB error opening table '{}': {}",
1209                        self.collection_name,
1210                        e
1211                    );
1212                    Err(anyhow!(
1213                        "LanceDB error on table '{}': {}",
1214                        self.collection_name,
1215                        e
1216                    ))
1217                }
1218            }
1219        }
1220    }
1221
1222    /// Create the LanceDB schema with onion slice fields and content hash
1223    fn create_schema(dim: usize) -> Schema {
1224        Schema::new(vec![
1225            Field::new("id", DataType::Utf8, false),
1226            Field::new("namespace", DataType::Utf8, false),
1227            Field::new(
1228                "vector",
1229                DataType::FixedSizeList(
1230                    Arc::new(Field::new("item", DataType::Float32, true)),
1231                    dim as i32,
1232                ),
1233                false,
1234            ),
1235            Field::new("text", DataType::Utf8, true),
1236            Field::new("metadata", DataType::Utf8, true),
1237            // Onion slice fields (v2 schema)
1238            Field::new("layer", DataType::UInt8, true), // 0=flat, 1=outer, 2=middle, 3=inner, 4=core
1239            Field::new("parent_id", DataType::Utf8, true), // Parent slice ID
1240            Field::new("children_ids", DataType::Utf8, true), // JSON array of children IDs
1241            Field::new("keywords", DataType::Utf8, true), // JSON array of keywords
1242            // Per-chunk dedup hash (v3 schema; v4 narrows semantic to chunk text)
1243            Field::new("content_hash", DataType::Utf8, true), // SHA256 of THIS chunk's text
1244            // Source-document hash (v4 schema) — same value across all 4 layers
1245            // from one source. Drives pre-index dedup so we never re-embed an
1246            // already-ingested file.
1247            Field::new("source_hash", DataType::Utf8, true), // SHA256 of source document text
1248        ])
1249    }
1250
1251    async fn ensure_hash_schema_columns(&self, table: &Table) -> Result<()> {
1252        let missing = Self::missing_required_columns(table, SchemaVersion::current()).await?;
1253
1254        if missing.is_empty() {
1255            return Ok(());
1256        }
1257
1258        let missing_columns = missing
1259            .iter()
1260            .map(|field| field.name().to_string())
1261            .collect::<Vec<_>>();
1262        let error = SchemaMismatchWriteError::new(
1263            self.collection_name.clone(),
1264            self.lance_path.clone(),
1265            missing_columns,
1266            "table is older than the current writer schema",
1267        );
1268        self.log_schema_mismatch(&error);
1269        Err(error.into())
1270    }
1271
1272    fn map_lancedb_write_error(&self, error: lancedb::error::Error) -> anyhow::Error {
1273        let message = match &error {
1274            lancedb::error::Error::Lance { source } => source.to_string(),
1275            lancedb::error::Error::Schema { message } => message.clone(),
1276            lancedb::error::Error::Arrow { source } => source.to_string(),
1277            _ => return error.into(),
1278        };
1279
1280        if !is_schema_mismatch_message(&message) {
1281            return error.into();
1282        }
1283
1284        let missing_columns = extract_missing_columns(&message);
1285        let error = SchemaMismatchWriteError::new(
1286            self.collection_name.clone(),
1287            self.lance_path.clone(),
1288            missing_columns,
1289            message,
1290        );
1291        self.log_schema_mismatch(&error);
1292        error.into()
1293    }
1294
1295    fn log_schema_mismatch(&self, error: &SchemaMismatchWriteError) {
1296        error!(
1297            error_kind = "schema_mismatch",
1298            table = %error.table_name(),
1299            db_path = %error.db_path(),
1300            missing_columns = ?error.missing_columns(),
1301            remediation = %error.remediation(),
1302            file = file!(),
1303            line = line!(),
1304            "write-path schema mismatch"
1305        );
1306    }
1307
1308    fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
1309        let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
1310        let namespaces = documents
1311            .iter()
1312            .map(|d| d.namespace.as_str())
1313            .collect::<Vec<_>>();
1314        let texts = documents
1315            .iter()
1316            .map(|d| d.document.as_str())
1317            .collect::<Vec<_>>();
1318        let metadata_strings = documents
1319            .iter()
1320            .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
1321            .collect::<Vec<_>>();
1322
1323        let vectors = documents.iter().map(|d| {
1324            if d.embedding.len() != dim {
1325                None
1326            } else {
1327                Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
1328            }
1329        });
1330
1331        // Onion slice fields
1332        let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
1333        let parent_ids: Vec<Option<&str>> =
1334            documents.iter().map(|d| d.parent_id.as_deref()).collect();
1335        let children_ids_json: Vec<String> = documents
1336            .iter()
1337            .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
1338            .collect();
1339        let keywords_json: Vec<String> = documents
1340            .iter()
1341            .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
1342            .collect();
1343        // Per-chunk content hash for chunk-level dedup
1344        let content_hashes: Vec<Option<&str>> = documents
1345            .iter()
1346            .map(|d| d.content_hash.as_deref())
1347            .collect();
1348        // Source-document hash for pre-index source-level dedup (v4 schema)
1349        let source_hashes: Vec<Option<&str>> =
1350            documents.iter().map(|d| d.source_hash.as_deref()).collect();
1351
1352        let schema = Arc::new(Self::create_schema(dim));
1353
1354        let batch = RecordBatch::try_new(
1355            schema.clone(),
1356            vec![
1357                Arc::new(StringArray::from(ids)),
1358                Arc::new(StringArray::from(namespaces)),
1359                Arc::new(
1360                    FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
1361                        vectors, dim as i32,
1362                    ),
1363                ),
1364                Arc::new(StringArray::from(texts)),
1365                Arc::new(StringArray::from(metadata_strings)),
1366                // Onion slice fields
1367                Arc::new(UInt8Array::from(layers)),
1368                Arc::new(StringArray::from(parent_ids)),
1369                Arc::new(StringArray::from(
1370                    children_ids_json
1371                        .iter()
1372                        .map(|s| s.as_str())
1373                        .collect::<Vec<_>>(),
1374                )),
1375                Arc::new(StringArray::from(
1376                    keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
1377                )),
1378                // Per-chunk content hash for chunk-level dedup
1379                Arc::new(StringArray::from(content_hashes)),
1380                // Source-document hash for source-level dedup (v4 schema)
1381                Arc::new(StringArray::from(source_hashes)),
1382            ],
1383        )?;
1384
1385        Ok(Box::new(RecordBatchIterator::new(
1386            vec![Ok(batch)].into_iter(),
1387            schema,
1388        )))
1389    }
1390
1391    fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
1392        let id_col = batch
1393            .column_by_name("id")
1394            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1395            .ok_or_else(|| anyhow!("Missing id column"))?;
1396        let ns_col = batch
1397            .column_by_name("namespace")
1398            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1399            .ok_or_else(|| anyhow!("Missing namespace column"))?;
1400        let text_col = batch
1401            .column_by_name("text")
1402            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1403            .ok_or_else(|| anyhow!("Missing text column"))?;
1404        let metadata_col = batch
1405            .column_by_name("metadata")
1406            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1407            .ok_or_else(|| anyhow!("Missing metadata column"))?;
1408        let vector_col = batch
1409            .column_by_name("vector")
1410            .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
1411            .ok_or_else(|| anyhow!("Missing vector column"))?;
1412
1413        // Onion slice fields (optional for backward compatibility with v1 schema)
1414        let layer_col = batch
1415            .column_by_name("layer")
1416            .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
1417        let parent_id_col = batch
1418            .column_by_name("parent_id")
1419            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1420        let children_ids_col = batch
1421            .column_by_name("children_ids")
1422            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1423        let keywords_col = batch
1424            .column_by_name("keywords")
1425            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1426        // Content hash field (optional for backward compatibility with v2 schema)
1427        let content_hash_col = batch
1428            .column_by_name("content_hash")
1429            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1430        // Source hash column (v4 schema) — optional for pre-v4 tables.
1431        let source_hash_col = batch
1432            .column_by_name("source_hash")
1433            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1434
1435        let dim = vector_col.value_length() as usize;
1436        let values = vector_col
1437            .values()
1438            .as_any()
1439            .downcast_ref::<Float32Array>()
1440            .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
1441
1442        let mut docs = Vec::new();
1443        for i in 0..batch.num_rows() {
1444            let id = id_col.value(i).to_string();
1445            let text = text_col.value(i).to_string();
1446            let namespace = ns_col.value(i).to_string();
1447            let meta_str = metadata_col.value(i);
1448            let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
1449
1450            let offset = i * dim;
1451            let mut emb = Vec::with_capacity(dim);
1452            for j in 0..dim {
1453                emb.push(values.value(offset + j));
1454            }
1455
1456            // Read onion slice fields (with v1 schema compatibility)
1457            let layer = layer_col
1458                .and_then(|col| {
1459                    if col.is_null(i) {
1460                        None
1461                    } else {
1462                        Some(col.value(i))
1463                    }
1464                })
1465                .unwrap_or(0);
1466
1467            let parent_id = parent_id_col.and_then(|col| {
1468                if col.is_null(i) {
1469                    None
1470                } else {
1471                    Some(col.value(i).to_string())
1472                }
1473            });
1474
1475            let children_ids: Vec<String> = children_ids_col
1476                .and_then(|col| {
1477                    if col.is_null(i) {
1478                        None
1479                    } else {
1480                        serde_json::from_str(col.value(i)).ok()
1481                    }
1482                })
1483                .unwrap_or_default();
1484
1485            let keywords: Vec<String> = keywords_col
1486                .and_then(|col| {
1487                    if col.is_null(i) {
1488                        None
1489                    } else {
1490                        serde_json::from_str(col.value(i)).ok()
1491                    }
1492                })
1493                .unwrap_or_default();
1494
1495            let content_hash = content_hash_col.and_then(|col| {
1496                if col.is_null(i) {
1497                    None
1498                } else {
1499                    Some(col.value(i).to_string())
1500                }
1501            });
1502
1503            let source_hash = source_hash_col.and_then(|col| {
1504                if col.is_null(i) {
1505                    None
1506                } else {
1507                    Some(col.value(i).to_string())
1508                }
1509            });
1510
1511            docs.push(ChromaDocument {
1512                id,
1513                namespace,
1514                embedding: emb,
1515                metadata,
1516                document: text,
1517                layer,
1518                parent_id,
1519                children_ids,
1520                keywords,
1521                content_hash,
1522                source_hash,
1523            });
1524        }
1525        Ok(docs)
1526    }
1527
1528    pub async fn get_filtered_in_namespace(
1529        &self,
1530        namespace: &str,
1531        filter: &str,
1532    ) -> Result<Vec<ChromaDocument>> {
1533        let mut results = Vec::new();
1534        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1535            let mut stream = table.query().only_if(filter).execute().await?;
1536            while let Some(batch) = stream.try_next().await? {
1537                let mut docs = self.batch_to_docs(&batch)?;
1538                results.append(&mut docs);
1539            }
1540        }
1541        if let Some(table) = self.legacy_table_if_exists().await? {
1542            let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
1543            let mut stream = table.query().only_if(combined.as_str()).execute().await?;
1544            while let Some(batch) = stream.try_next().await? {
1545                let mut docs = self.batch_to_docs(&batch)?;
1546                results.append(&mut docs);
1547            }
1548        }
1549        Ok(results)
1550    }
1551
1552    /// Search with optional layer filtering for onion slice architecture
1553    pub async fn search_store_with_layer(
1554        &self,
1555        namespace: Option<&str>,
1556        embedding: Vec<f32>,
1557        k: usize,
1558        layer_filter: Option<SliceLayer>,
1559    ) -> Result<Vec<ChromaDocument>> {
1560        if embedding.is_empty() {
1561            return Ok(vec![]);
1562        }
1563        // Build combined filter
1564        let mut filters = Vec::new();
1565        if let Some(layer) = layer_filter {
1566            filters.push(self.layer_filter(layer));
1567        }
1568        let mut results = Vec::new();
1569
1570        if let Some(ns) = namespace {
1571            if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
1572                let mut query = table.query();
1573                if !filters.is_empty() {
1574                    let combined = filters.join(" AND ");
1575                    query = query.only_if(combined.as_str());
1576                }
1577                let mut stream = query
1578                    .nearest_to(embedding.clone())?
1579                    .limit(k)
1580                    .execute()
1581                    .await?;
1582                while let Some(batch) = stream.try_next().await? {
1583                    let mut docs = self.batch_to_docs(&batch)?;
1584                    results.append(&mut docs);
1585                }
1586            }
1587            if let Some(table) = self.legacy_table_if_exists().await? {
1588                let mut legacy_filters = vec![self.namespace_filter(ns)];
1589                legacy_filters.extend(filters.clone());
1590                let combined = legacy_filters.join(" AND ");
1591                let mut stream = table
1592                    .query()
1593                    .only_if(combined.as_str())
1594                    .nearest_to(embedding)?
1595                    .limit(k)
1596                    .execute()
1597                    .await?;
1598                while let Some(batch) = stream.try_next().await? {
1599                    let mut docs = self.batch_to_docs(&batch)?;
1600                    results.append(&mut docs);
1601                }
1602            }
1603            results.truncate(k);
1604        } else {
1605            for table_name in self.data_table_names().await? {
1606                let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1607                    continue;
1608                };
1609                let mut query = table.query();
1610                if !filters.is_empty() {
1611                    let combined = filters.join(" AND ");
1612                    query = query.only_if(combined.as_str());
1613                }
1614                let mut stream = query
1615                    .nearest_to(embedding.clone())?
1616                    .limit(k)
1617                    .execute()
1618                    .await?;
1619                while let Some(batch) = stream.try_next().await? {
1620                    let mut docs = self.batch_to_docs(&batch)?;
1621                    results.append(&mut docs);
1622                }
1623            }
1624            results.truncate(k);
1625        }
1626        debug!(
1627            "Lance returned {} results (layer filter: {:?})",
1628            results.len(),
1629            layer_filter
1630        );
1631        Ok(results)
1632    }
1633
1634    /// Get a document by ID and expand to get its children
1635    pub async fn get_children(
1636        &self,
1637        namespace: &str,
1638        parent_id: &str,
1639    ) -> Result<Vec<ChromaDocument>> {
1640        if self.open_table_if_exists().await?.is_none() {
1641            return Ok(vec![]);
1642        }
1643
1644        // First get the parent document to find children IDs
1645        if let Some(parent) = self.get_document(namespace, parent_id).await? {
1646            if parent.children_ids.is_empty() {
1647                return Ok(vec![]);
1648            }
1649
1650            // Query for all children
1651            let mut children = Vec::new();
1652            for child_id in &parent.children_ids {
1653                if let Some(child) = self.get_document(namespace, child_id).await? {
1654                    children.push(child);
1655                }
1656            }
1657            return Ok(children);
1658        }
1659
1660        Ok(vec![])
1661    }
1662
1663    /// Get the parent of a document (drill up in onion hierarchy)
1664    pub async fn get_parent(
1665        &self,
1666        namespace: &str,
1667        child_id: &str,
1668    ) -> Result<Option<ChromaDocument>> {
1669        if let Some(child) = self.get_document(namespace, child_id).await?
1670            && let Some(ref parent_id) = child.parent_id
1671        {
1672            return self.get_document(namespace, parent_id).await;
1673        }
1674        Ok(None)
1675    }
1676
1677    fn namespace_filter(&self, namespace: &str) -> String {
1678        format!("namespace = '{}'", namespace.replace('\'', "''"))
1679    }
1680
1681    fn id_filter(&self, id: &str) -> String {
1682        format!("id = '{}'", id.replace('\'', "''"))
1683    }
1684
1685    fn layer_filter(&self, layer: SliceLayer) -> String {
1686        if layer == SliceLayer::Outer {
1687            // Default search should surface onion summaries while still seeing legacy flat chunks.
1688            "(layer = 0 OR layer = 1)".to_string()
1689        } else {
1690            format!("layer = {}", layer.as_u8())
1691        }
1692    }
1693
1694    fn content_hash_filter(&self, hash: &str) -> String {
1695        format!("content_hash = '{}'", hash.replace('\'', "''"))
1696    }
1697
1698    fn source_hash_filter(&self, hash: &str) -> String {
1699        format!("source_hash = '{}'", hash.replace('\'', "''"))
1700    }
1701
1702    /// Check if the table schema has content_hash column (schema v3+)
1703    async fn table_has_content_hash(table: &Table) -> bool {
1704        table
1705            .schema()
1706            .await
1707            .map(|schema| schema.field_with_name("content_hash").is_ok())
1708            .unwrap_or(false)
1709    }
1710
1711    /// Check if the table schema has source_hash column (schema v4+)
1712    async fn table_has_source_hash(table: &Table) -> bool {
1713        table
1714            .schema()
1715            .await
1716            .map(|schema| schema.field_with_name("source_hash").is_ok())
1717            .unwrap_or(false)
1718    }
1719
1720    /// Check if a content hash already exists in a namespace (for exact-match deduplication)
1721    ///
1722    /// Returns Ok(false) if:
1723    /// - Table doesn't exist yet
1724    /// - Table has old schema without content_hash column (graceful degradation)
1725    pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1726        let hash_filter = self.content_hash_filter(hash);
1727        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1728            && Self::table_has_content_hash(&table).await
1729        {
1730            let mut stream = table
1731                .query()
1732                .only_if(hash_filter.as_str())
1733                .limit(1)
1734                .execute()
1735                .await?;
1736            if let Some(batch) = stream.try_next().await? {
1737                return Ok(batch.num_rows() > 0);
1738            }
1739        }
1740
1741        if let Some(table) = self.legacy_table_if_exists().await? {
1742            if !Self::table_has_content_hash(&table).await {
1743                tracing::warn!(
1744                    "Table '{}' has old schema without content_hash column. \
1745                     Deduplication disabled. Consider re-indexing with new schema.",
1746                    self.collection_name
1747                );
1748                return Ok(false); // Can't check for duplicates, treat as new
1749            }
1750
1751            let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1752            let mut stream = table
1753                .query()
1754                .only_if(filter.as_str())
1755                .limit(1)
1756                .execute()
1757                .await?;
1758
1759            if let Some(batch) = stream.try_next().await? {
1760                return Ok(batch.num_rows() > 0);
1761            }
1762        }
1763
1764        Ok(false)
1765    }
1766
1767    /// Check if any chunk in `namespace` already references the given source-document
1768    /// hash. Used by the indexing pipeline to skip re-embedding files that were
1769    /// already ingested (P4 — pre-index source-level dedup).
1770    ///
1771    /// Returns Ok(false) if the table doesn't exist yet, or if the table is on a
1772    /// pre-v4 schema without the `source_hash` column (graceful degradation —
1773    /// older namespaces should be backfilled via `/admin/backfill-hashes`).
1774    pub async fn has_source_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1775        let hash_filter = self.source_hash_filter(hash);
1776        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1777            && Self::table_has_source_hash(&table).await
1778        {
1779            let mut stream = table
1780                .query()
1781                .only_if(hash_filter.as_str())
1782                .limit(1)
1783                .execute()
1784                .await?;
1785            if let Some(batch) = stream.try_next().await? {
1786                return Ok(batch.num_rows() > 0);
1787            }
1788        }
1789
1790        if let Some(table) = self.legacy_table_if_exists().await? {
1791            if !Self::table_has_source_hash(&table).await {
1792                tracing::debug!(
1793                    "Table '{}' has pre-v4 schema without source_hash column. \
1794                     Source-level dedup disabled until backfill.",
1795                    self.collection_name
1796                );
1797                return Ok(false);
1798            }
1799
1800            let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1801            let mut stream = table
1802                .query()
1803                .only_if(filter.as_str())
1804                .limit(1)
1805                .execute()
1806                .await?;
1807
1808            if let Some(batch) = stream.try_next().await? {
1809                return Ok(batch.num_rows() > 0);
1810            }
1811        }
1812
1813        Ok(false)
1814    }
1815
1816    /// Filter a list of hashes to return only those that don't exist in the namespace.
1817    /// This is more efficient than calling has_content_hash for each hash individually.
1818    ///
1819    /// Returns all hashes as "new" if table has old schema without content_hash column.
1820    pub async fn filter_existing_hashes<'a>(
1821        &self,
1822        namespace: &str,
1823        hashes: &'a [String],
1824    ) -> Result<Vec<&'a String>> {
1825        if hashes.is_empty() {
1826            return Ok(vec![]);
1827        }
1828
1829        // Query for existing hashes in this namespace
1830        // We build a filter with OR conditions for all hashes
1831        let hash_conditions: Vec<String> =
1832            hashes.iter().map(|h| self.content_hash_filter(h)).collect();
1833
1834        let mut existing_hashes = std::collections::HashSet::new();
1835
1836        if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1837            && Self::table_has_content_hash(&table).await
1838        {
1839            let filter = hash_conditions.join(" OR ");
1840            let mut stream = table
1841                .query()
1842                .only_if(filter.as_str())
1843                .limit(hashes.len())
1844                .execute()
1845                .await?;
1846            while let Some(batch) = stream.try_next().await? {
1847                if let Some(hash_col) = batch
1848                    .column_by_name("content_hash")
1849                    .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1850                {
1851                    for i in 0..batch.num_rows() {
1852                        if !hash_col.is_null(i) {
1853                            existing_hashes.insert(hash_col.value(i).to_string());
1854                        }
1855                    }
1856                }
1857            }
1858        }
1859
1860        if let Some(table) = self.legacy_table_if_exists().await? {
1861            // Graceful handling of old schema without content_hash column
1862            if !Self::table_has_content_hash(&table).await {
1863                tracing::warn!(
1864                    "Table '{}' has old schema without content_hash column. \
1865                     Deduplication disabled. Consider re-indexing with new schema.",
1866                    self.collection_name
1867                );
1868                return Ok(hashes.iter().collect()); // All are "new" since we can't check
1869            }
1870
1871            let filter = format!(
1872                "{} AND ({})",
1873                self.namespace_filter(namespace),
1874                hash_conditions.join(" OR ")
1875            );
1876            let mut stream = table
1877                .query()
1878                .only_if(filter.as_str())
1879                .limit(hashes.len())
1880                .execute()
1881                .await?;
1882            while let Some(batch) = stream.try_next().await? {
1883                if let Some(hash_col) = batch
1884                    .column_by_name("content_hash")
1885                    .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1886                {
1887                    for i in 0..batch.num_rows() {
1888                        if !hash_col.is_null(i) {
1889                            existing_hashes.insert(hash_col.value(i).to_string());
1890                        }
1891                    }
1892                }
1893            }
1894        }
1895
1896        // Return only hashes that don't exist
1897        Ok(hashes
1898            .iter()
1899            .filter(|h| !existing_hashes.contains(h.as_str()))
1900            .collect())
1901    }
1902
1903    // =========================================================================
1904    // MAINTENANCE OPERATIONS
1905    // =========================================================================
1906
1907    /// Run all optimizations (compact + prune old versions)
1908    pub async fn optimize(&self) -> Result<OptimizeStats> {
1909        let mut stats = OptimizeStats {
1910            compaction: None,
1911            prune: None,
1912        };
1913        for table_name in self.data_table_names().await? {
1914            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1915                continue;
1916            };
1917            stats = table.optimize(OptimizeAction::All).await?;
1918        }
1919        info!(
1920            "Optimize complete: compaction={:?}, prune={:?}",
1921            stats.compaction, stats.prune
1922        );
1923        Ok(stats)
1924    }
1925
1926    /// Compact small files into larger ones for better performance
1927    pub async fn compact(&self) -> Result<OptimizeStats> {
1928        let mut stats = OptimizeStats {
1929            compaction: None,
1930            prune: None,
1931        };
1932        for table_name in self.data_table_names().await? {
1933            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1934                continue;
1935            };
1936            stats = table
1937                .optimize(OptimizeAction::Compact {
1938                    options: Default::default(),
1939                    remap_options: None,
1940                })
1941                .await?;
1942        }
1943        info!("Compaction complete: {:?}", stats.compaction);
1944        Ok(stats)
1945    }
1946
1947    /// Remove old versions older than specified duration (default: 7 days)
1948    pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
1949        let days = older_than_days.unwrap_or(7) as i64;
1950        let duration = chrono::TimeDelta::days(days);
1951        let mut stats = OptimizeStats {
1952            compaction: None,
1953            prune: None,
1954        };
1955        for table_name in self.data_table_names().await? {
1956            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1957                continue;
1958            };
1959            stats = table
1960                .optimize(OptimizeAction::Prune {
1961                    older_than: Some(duration),
1962                    delete_unverified: Some(false),
1963                    error_if_tagged_old_versions: None,
1964                })
1965                .await?;
1966        }
1967        info!("Cleanup complete: {:?}", stats.prune);
1968        Ok(stats)
1969    }
1970
1971    /// Get table statistics (row count, fragments, etc.)
1972    pub async fn stats(&self) -> Result<TableStats> {
1973        let table_names = self.data_table_names().await?;
1974        let mut row_count = 0usize;
1975        let mut version_count = 0usize;
1976
1977        for table_name in &table_names {
1978            let Some(table) = self.open_named_table_if_exists(table_name).await? else {
1979                continue;
1980            };
1981            row_count += table.count_rows(None).await.unwrap_or(0);
1982            version_count += table.list_versions().await.unwrap_or_default().len();
1983        }
1984
1985        Ok(TableStats {
1986            row_count,
1987            version_count,
1988            table_name: self.collection_name.clone(),
1989            db_path: self.lance_path.clone(),
1990        })
1991    }
1992
1993    /// Count rows in a specific namespace
1994    pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1995        let mut count = 0usize;
1996        if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1997            count += table.count_rows(None).await?;
1998        }
1999        if let Some(table) = self.legacy_table_if_exists().await? {
2000            let filter = self.namespace_filter(namespace);
2001            count += table.count_rows(Some(filter)).await?;
2002        }
2003        Ok(count)
2004    }
2005
2006    /// Get all documents from a namespace (for migration/export)
2007    ///
2008    /// Note: This uses a full table scan with namespace filter.
2009    /// For very large namespaces, consider batching.
2010    pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
2011        let results = self.all_documents(Some(namespace), 100_000).await?;
2012        debug!(
2013            "Retrieved {} documents from namespace '{}'",
2014            results.len(),
2015            namespace
2016        );
2017        Ok(results)
2018    }
2019
2020    /// Check if a namespace exists (has any documents)
2021    pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
2022        let count = self.count_namespace(namespace).await?;
2023        Ok(count > 0)
2024    }
2025}
2026
2027/// Statistics about the LanceDB table
2028#[derive(Debug, Clone, Serialize)]
2029pub struct TableStats {
2030    pub row_count: usize,
2031    pub version_count: usize,
2032    pub table_name: String,
2033    pub db_path: String,
2034}
2035
2036// =============================================================================
2037// GARBAGE COLLECTION
2038// =============================================================================
2039
2040/// Statistics from garbage collection operations
2041#[derive(Debug, Clone, Default, Serialize)]
2042pub struct GcStats {
2043    /// Number of orphan embeddings found (embeddings without valid parent references)
2044    pub orphans_found: usize,
2045    /// Number of orphan embeddings removed
2046    pub orphans_removed: usize,
2047    /// Number of empty namespaces found
2048    pub empty_namespaces_found: usize,
2049    /// Number of empty namespaces removed (documents deleted)
2050    pub empty_namespaces_removed: usize,
2051    /// Number of old documents found (older than threshold)
2052    pub old_docs_found: usize,
2053    /// Number of old documents removed
2054    pub old_docs_removed: usize,
2055    /// Estimated space freed in bytes (if calculable)
2056    pub bytes_freed: Option<u64>,
2057    /// List of namespaces that were empty
2058    pub empty_namespace_names: Vec<String>,
2059    /// List of namespaces affected by old doc cleanup
2060    pub affected_namespaces: Vec<String>,
2061}
2062
2063impl GcStats {
2064    /// Check if any issues were found
2065    pub fn has_issues(&self) -> bool {
2066        self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
2067    }
2068
2069    /// Check if any deletions occurred
2070    pub fn has_deletions(&self) -> bool {
2071        self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
2072    }
2073}
2074
2075/// Configuration for garbage collection
2076#[derive(Debug, Clone)]
2077pub struct GcConfig {
2078    /// Remove orphan embeddings (embeddings with no parent document)
2079    pub remove_orphans: bool,
2080    /// Remove empty namespaces (namespaces with 0 documents)
2081    pub remove_empty: bool,
2082    /// Remove documents older than this duration
2083    pub older_than: Option<chrono::Duration>,
2084    /// Dry run mode - only report what would be removed
2085    pub dry_run: bool,
2086    /// Limit to specific namespace (None = all namespaces)
2087    pub namespace: Option<String>,
2088}
2089
2090impl Default for GcConfig {
2091    fn default() -> Self {
2092        Self {
2093            remove_orphans: false,
2094            remove_empty: false,
2095            older_than: None,
2096            dry_run: true,
2097            namespace: None,
2098        }
2099    }
2100}
2101
2102/// Parse a duration string like "30d", "6m", "1y"
2103pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
2104    let s = s.trim().to_lowercase();
2105    if s.is_empty() {
2106        return Err(anyhow!("Empty duration string"));
2107    }
2108
2109    // Extract numeric part and unit
2110    let (num_str, unit) = if s.ends_with('d') {
2111        (&s[..s.len() - 1], 'd')
2112    } else if s.ends_with('m') {
2113        (&s[..s.len() - 1], 'm')
2114    } else if s.ends_with('y') {
2115        (&s[..s.len() - 1], 'y')
2116    } else {
2117        return Err(anyhow!(
2118            "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
2119            s
2120        ));
2121    };
2122
2123    let num: i64 = num_str.parse().map_err(|_| {
2124        anyhow!(
2125            "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
2126            s
2127        )
2128    })?;
2129
2130    if num <= 0 {
2131        return Err(anyhow!("Duration must be positive, got '{}'", s));
2132    }
2133
2134    match unit {
2135        'd' => Ok(chrono::Duration::days(num)),
2136        'm' => Ok(chrono::Duration::days(num * 30)), // Approximate month
2137        'y' => Ok(chrono::Duration::days(num * 365)), // Approximate year
2138        _ => unreachable!(),
2139    }
2140}
2141
2142impl StorageManager {
2143    // =========================================================================
2144    // GARBAGE COLLECTION OPERATIONS
2145    // =========================================================================
2146
2147    /// Run garbage collection based on configuration
2148    #[doc(alias = "run_gc")]
2149    pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
2150        let mut stats = GcStats::default();
2151
2152        const PAGE_SIZE: usize = 5000;
2153        let mut all_docs: Vec<ChromaDocument> = Vec::new();
2154        let mut offset = 0;
2155        loop {
2156            let page = self
2157                .all_documents_page(config.namespace.as_deref(), offset, PAGE_SIZE)
2158                .await?;
2159            let page_len = page.len();
2160            all_docs.extend(page);
2161            if page_len < PAGE_SIZE {
2162                break;
2163            }
2164            offset += page_len;
2165        }
2166
2167        if all_docs.is_empty() {
2168            return Ok(stats);
2169        }
2170
2171        // Group documents by namespace
2172        let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
2173            std::collections::HashMap::new();
2174        for doc in &all_docs {
2175            by_namespace
2176                .entry(doc.namespace.clone())
2177                .or_default()
2178                .push(doc);
2179        }
2180
2181        // 1. Find orphan embeddings (documents with parent_id that doesn't exist)
2182        if config.remove_orphans {
2183            let orphan_stats = self
2184                .find_and_remove_orphans(&all_docs, config.dry_run)
2185                .await?;
2186            stats.orphans_found = orphan_stats.0;
2187            stats.orphans_removed = orphan_stats.1;
2188        }
2189
2190        // 2. Find and optionally remove empty namespaces
2191        if config.remove_empty {
2192            let empty_stats = self
2193                .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
2194                .await?;
2195            stats.empty_namespaces_found = empty_stats.0;
2196            stats.empty_namespaces_removed = empty_stats.1;
2197            stats.empty_namespace_names = empty_stats.2;
2198        }
2199
2200        // 3. Find and optionally remove old documents
2201        if let Some(ref duration) = config.older_than {
2202            let old_stats = self
2203                .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
2204                .await?;
2205            stats.old_docs_found = old_stats.0;
2206            stats.old_docs_removed = old_stats.1;
2207            stats.affected_namespaces = old_stats.2;
2208        }
2209
2210        Ok(stats)
2211    }
2212
2213    #[deprecated(note = "use garbage_collect")]
2214    pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
2215        self.garbage_collect(config).await
2216    }
2217
2218    /// Find orphan embeddings - documents with parent_id pointing to non-existent documents
2219    async fn find_and_remove_orphans(
2220        &self,
2221        docs: &[ChromaDocument],
2222        dry_run: bool,
2223    ) -> Result<(usize, usize)> {
2224        // Build a set of all document IDs
2225        let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
2226
2227        // Find documents with parent_id that doesn't exist in the ID set
2228        let mut orphans: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
2229        for doc in docs {
2230            if let Some(ref parent_id) = doc.parent_id
2231                && !all_ids.contains(parent_id.as_str())
2232            {
2233                orphans.push((&doc.namespace, &doc.id));
2234            }
2235        }
2236
2237        let found = orphans.len();
2238        let mut removed = 0;
2239
2240        if !dry_run && !orphans.is_empty() {
2241            for (namespace, id) in &orphans {
2242                if self.delete_document(namespace, id).await.is_ok() {
2243                    removed += 1;
2244                }
2245            }
2246        }
2247
2248        Ok((found, removed))
2249    }
2250
2251    /// Find empty namespaces - this checks if namespaces have 0 documents
2252    /// Note: In LanceDB, namespaces are implicit (just a column value), so "removing"
2253    /// an empty namespace means there are no documents to delete
2254    async fn find_and_remove_empty_namespaces(
2255        &self,
2256        by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
2257        _dry_run: bool,
2258    ) -> Result<(usize, usize, Vec<String>)> {
2259        // Find namespaces with 0 documents
2260        let empty_namespaces: Vec<String> = by_namespace
2261            .iter()
2262            .filter(|(_, docs)| docs.is_empty())
2263            .map(|(ns, _)| ns.clone())
2264            .collect();
2265
2266        let found = empty_namespaces.len();
2267        // Empty namespaces don't need deletion - they have no documents
2268        // Just report them
2269        let removed = 0;
2270
2271        Ok((found, removed, empty_namespaces))
2272    }
2273
2274    /// Find and optionally remove documents older than the specified duration
2275    async fn find_and_remove_old_docs(
2276        &self,
2277        docs: &[ChromaDocument],
2278        older_than: &chrono::Duration,
2279        dry_run: bool,
2280    ) -> Result<(usize, usize, Vec<String>)> {
2281        let cutoff = chrono::Utc::now() - *older_than;
2282
2283        let mut old_docs: Vec<(&str, &str)> = Vec::new(); // (namespace, id)
2284        let mut affected_namespaces: std::collections::HashSet<String> =
2285            std::collections::HashSet::new();
2286
2287        for doc in docs {
2288            // Check for timestamp in metadata
2289            if let Some(obj) = doc.metadata.as_object() {
2290                let mut doc_timestamp: Option<String> = None;
2291
2292                // Look for common timestamp field names
2293                for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
2294                    if let Some(value) = obj.get(*key)
2295                        && let Some(ts) = value.as_str()
2296                    {
2297                        doc_timestamp = Some(ts.to_string());
2298                        break;
2299                    }
2300                }
2301
2302                // Check if document is older than cutoff
2303                if let Some(ts) = doc_timestamp {
2304                    // Parse the timestamp - try RFC3339 first, then other formats
2305                    let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
2306                        parsed < cutoff
2307                    } else if let Ok(parsed) =
2308                        chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
2309                    {
2310                        parsed < cutoff.naive_utc()
2311                    } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
2312                        parsed < cutoff.date_naive()
2313                    } else {
2314                        // Can't parse timestamp, skip this document
2315                        false
2316                    };
2317
2318                    if is_old {
2319                        old_docs.push((&doc.namespace, &doc.id));
2320                        affected_namespaces.insert(doc.namespace.clone());
2321                    }
2322                }
2323            }
2324        }
2325
2326        let found = old_docs.len();
2327        let mut removed = 0;
2328
2329        if !dry_run && !old_docs.is_empty() {
2330            for (namespace, id) in &old_docs {
2331                if self.delete_document(namespace, id).await.is_ok() {
2332                    removed += 1;
2333                }
2334            }
2335        }
2336
2337        Ok((found, removed, affected_namespaces.into_iter().collect()))
2338    }
2339
2340    /// List all unique namespaces in the database
2341    pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
2342        self.refresh().await?;
2343
2344        let mut namespace_counts: std::collections::HashMap<String, usize> =
2345            std::collections::HashMap::new();
2346
2347        for table_name in self.data_table_names().await? {
2348            let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
2349                continue;
2350            };
2351            const PAGE_SIZE: usize = 5000;
2352            let mut offset = 0;
2353            loop {
2354                let page = self
2355                    .query_table_page(&table, None, offset, PAGE_SIZE)
2356                    .await?;
2357                let page_len = page.len();
2358                for doc in &page {
2359                    *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
2360                }
2361                if page_len < PAGE_SIZE {
2362                    break;
2363                }
2364                offset += page_len;
2365            }
2366        }
2367
2368        let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
2369        namespaces.sort_by(|a, b| a.0.cmp(&b.0));
2370        Ok(namespaces)
2371    }
2372}
2373
2374#[cfg(test)]
2375mod tests {
2376    use super::*;
2377    use serde_json::json;
2378    use tempfile::TempDir;
2379
2380    #[test]
2381    fn flat_documents_preserve_separate_chunk_and_source_hashes() {
2382        let doc = ChromaDocument::new_flat_with_hashes(
2383            "doc-1".to_string(),
2384            "kb:transcripts".to_string(),
2385            vec![0.0, 1.0],
2386            json!({"path": "sample.md"}),
2387            "outer summary chunk".to_string(),
2388            "chunk-sha256".to_string(),
2389            Some("source-sha256".to_string()),
2390        );
2391
2392        assert_eq!(doc.content_hash.as_deref(), Some("chunk-sha256"));
2393        assert_eq!(doc.source_hash.as_deref(), Some("source-sha256"));
2394        assert_ne!(doc.content_hash, doc.source_hash);
2395    }
2396
2397    #[tokio::test]
2398    async fn namespace_writes_use_separate_lance_tables_and_keep_contracts() {
2399        let tmp = TempDir::new().expect("temp dir");
2400        let db_path = tmp.path().join("lancedb");
2401        let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
2402            .await
2403            .expect("storage");
2404
2405        let embedding = vec![0.25_f32; 8];
2406        storage
2407            .add_to_store(vec![
2408                ChromaDocument::new_flat(
2409                    "shared-id".to_string(),
2410                    "kb:alpha".to_string(),
2411                    embedding.clone(),
2412                    json!({"ns": "alpha"}),
2413                    "alpha memory".to_string(),
2414                ),
2415                ChromaDocument::new_flat(
2416                    "shared-id".to_string(),
2417                    "kb:beta".to_string(),
2418                    embedding.clone(),
2419                    json!({"ns": "beta"}),
2420                    "beta memory".to_string(),
2421                ),
2422            ])
2423            .await
2424            .expect("write two namespaces");
2425
2426        let table_names = storage.lance.table_names().execute().await.expect("tables");
2427        let namespace_tables = table_names
2428            .iter()
2429            .filter(|name| StorageManager::is_namespace_table_name(name))
2430            .count();
2431        assert_eq!(namespace_tables, 2, "{table_names:?}");
2432        assert!(!table_names.iter().any(|name| name == DEFAULT_TABLE_NAME));
2433
2434        assert_eq!(storage.count_namespace("kb:alpha").await.unwrap(), 1);
2435        assert_eq!(storage.count_namespace("kb:beta").await.unwrap(), 1);
2436        assert_eq!(
2437            storage
2438                .get_document("kb:alpha", "shared-id")
2439                .await
2440                .unwrap()
2441                .unwrap()
2442                .document,
2443            "alpha memory"
2444        );
2445        assert_eq!(
2446            storage
2447                .search_store(Some("kb:beta"), embedding, 10)
2448                .await
2449                .unwrap()
2450                .into_iter()
2451                .map(|doc| doc.document)
2452                .collect::<Vec<_>>(),
2453            vec!["beta memory"]
2454        );
2455    }
2456}