1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4 Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
5 UInt8Array,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{NewColumnTransform, OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16use std::collections::HashMap;
17use std::fmt;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tracing::{debug, error, info, warn};
23use uuid::Uuid;
24
25use crate::rag::SliceLayer;
26
27pub const SCHEMA_VERSION: u32 = 4;
38pub const DEFAULT_TABLE_NAME: &str = "mcp_documents";
39const NAMESPACE_TABLE_PREFIX: &str = "mcp_documents__ns__";
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaVersion {
43 V3,
44 V4,
45}
46
47impl SchemaVersion {
48 pub fn current() -> Self {
49 Self::V4
50 }
51}
52
53impl fmt::Display for SchemaVersion {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 Self::V3 => f.write_str("v3"),
57 Self::V4 => f.write_str("v4"),
58 }
59 }
60}
61
62impl FromStr for SchemaVersion {
63 type Err = anyhow::Error;
64
65 fn from_str(value: &str) -> Result<Self> {
66 match value.trim().to_ascii_lowercase().as_str() {
67 "3" | "v3" => Ok(Self::V3),
68 "4" | "v4" => Ok(Self::V4),
69 other => Err(anyhow!(
70 "unsupported schema target '{other}' (expected v3 or v4)"
71 )),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaMigrationReport {
78 pub target: SchemaVersion,
79 pub missing_columns: Vec<Field>,
80 pub applied: bool,
81}
82
83impl SchemaMigrationReport {
84 pub fn missing_column_names(&self) -> Vec<&str> {
85 self.missing_columns
86 .iter()
87 .map(|field| field.name().as_str())
88 .collect()
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct SchemaStatusReport {
94 pub schema_version: SchemaVersion,
95 pub expected_schema: SchemaVersion,
96 pub needs_migration: bool,
97 pub missing_columns: Vec<String>,
98 pub manifest_version: Option<u64>,
99}
100
101pub fn required_columns_for(version: SchemaVersion) -> Vec<Field> {
102 let mut fields = vec![Field::new("content_hash", DataType::Utf8, true)];
103 if matches!(version, SchemaVersion::V4) {
104 fields.push(Field::new("source_hash", DataType::Utf8, true));
105 }
106 fields
107}
108
109#[derive(Debug, Clone)]
110pub struct SchemaMismatchWriteError {
111 table_name: String,
112 db_path: String,
113 missing_columns: Vec<String>,
114 message: String,
115}
116
117impl SchemaMismatchWriteError {
118 fn new(
119 table_name: impl Into<String>,
120 db_path: impl Into<String>,
121 missing_columns: Vec<String>,
122 message: impl Into<String>,
123 ) -> Self {
124 Self {
125 table_name: table_name.into(),
126 db_path: db_path.into(),
127 missing_columns,
128 message: message.into(),
129 }
130 }
131
132 pub fn missing_columns(&self) -> &[String] {
133 &self.missing_columns
134 }
135
136 pub fn db_path(&self) -> &str {
137 &self.db_path
138 }
139
140 pub fn table_name(&self) -> &str {
141 &self.table_name
142 }
143
144 pub fn remediation(&self) -> String {
145 format!("rust-memex migrate-schema --db-path {}", self.db_path)
146 }
147}
148
149impl fmt::Display for SchemaMismatchWriteError {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 write!(
152 f,
153 "ERROR schema mismatch while writing Lance table '{}': missing columns {:?}. {}. Remediation: {}",
154 self.table_name,
155 self.missing_columns,
156 self.message,
157 self.remediation()
158 )
159 }
160}
161
162impl std::error::Error for SchemaMismatchWriteError {}
163
164fn is_schema_mismatch_message(message: &str) -> bool {
165 let lower = message.to_ascii_lowercase();
166 lower.contains("schema mismatch")
167 || lower.contains("append with different schema")
168 || lower.contains("fields did not match")
169 || lower.contains("missing=[")
170}
171
172fn extract_missing_columns(message: &str) -> Vec<String> {
173 let mut columns = Vec::new();
174 let mut rest = message;
175
176 while let Some(start) = rest.find("missing=[") {
177 rest = &rest[start + "missing=[".len()..];
178 let Some(end) = rest.find(']') else {
179 break;
180 };
181 let list = &rest[..end];
182 for item in list.split(',') {
183 let column = item
184 .trim()
185 .trim_matches('`')
186 .trim_matches('"')
187 .trim_matches('\'');
188 if !column.is_empty() && !columns.iter().any(|existing| existing == column) {
189 columns.push(column.to_string());
190 }
191 }
192 rest = &rest[end + 1..];
193 }
194
195 if columns.is_empty() {
196 for field in required_columns_for(SchemaVersion::current()) {
197 let name = field.name();
198 if message.contains(name) {
199 columns.push(name.to_string());
200 }
201 }
202 }
203
204 columns
205}
206
207#[derive(Debug, Serialize, Clone)]
228pub struct ChromaDocument {
229 pub id: String,
230 pub namespace: String,
231 pub embedding: Vec<f32>,
232 pub metadata: serde_json::Value,
233 pub document: String,
234 pub layer: u8,
236 pub parent_id: Option<String>,
238 pub children_ids: Vec<String>,
240 pub keywords: Vec<String>,
242 pub content_hash: Option<String>,
246 pub source_hash: Option<String>,
251}
252
253impl ChromaDocument {
254 pub fn new_flat(
256 id: String,
257 namespace: String,
258 embedding: Vec<f32>,
259 metadata: serde_json::Value,
260 document: String,
261 ) -> Self {
262 Self {
263 id,
264 namespace,
265 embedding,
266 metadata,
267 document,
268 layer: 0, parent_id: None,
270 children_ids: vec![],
271 keywords: vec![],
272 content_hash: None,
273 source_hash: None,
274 }
275 }
276
277 pub fn new_flat_with_hash(
281 id: String,
282 namespace: String,
283 embedding: Vec<f32>,
284 metadata: serde_json::Value,
285 document: String,
286 content_hash: String,
287 ) -> Self {
288 Self::new_flat_with_hashes(
289 id,
290 namespace,
291 embedding,
292 metadata,
293 document,
294 content_hash,
295 None,
296 )
297 }
298
299 pub fn new_flat_with_hashes(
301 id: String,
302 namespace: String,
303 embedding: Vec<f32>,
304 metadata: serde_json::Value,
305 document: String,
306 content_hash: String,
307 source_hash: Option<String>,
308 ) -> Self {
309 Self {
310 id,
311 namespace,
312 embedding,
313 metadata,
314 document,
315 layer: 0,
316 parent_id: None,
317 children_ids: vec![],
318 keywords: vec![],
319 content_hash: Some(content_hash),
320 source_hash,
321 }
322 }
323
324 pub fn from_onion_slice(
326 slice: &crate::rag::OnionSlice,
327 namespace: String,
328 embedding: Vec<f32>,
329 metadata: serde_json::Value,
330 ) -> Self {
331 Self {
332 id: slice.id.clone(),
333 namespace,
334 embedding,
335 metadata,
336 document: slice.content.clone(),
337 layer: slice.layer.as_u8(),
338 parent_id: slice.parent_id.clone(),
339 children_ids: slice.children_ids.clone(),
340 keywords: slice.keywords.clone(),
341 content_hash: None,
342 source_hash: None,
343 }
344 }
345
346 pub fn from_onion_slice_with_hash(
350 slice: &crate::rag::OnionSlice,
351 namespace: String,
352 embedding: Vec<f32>,
353 metadata: serde_json::Value,
354 content_hash: String,
355 ) -> Self {
356 Self::from_onion_slice_with_hashes(
357 slice,
358 namespace,
359 embedding,
360 metadata,
361 content_hash,
362 None,
363 )
364 }
365
366 pub fn from_onion_slice_with_hashes(
368 slice: &crate::rag::OnionSlice,
369 namespace: String,
370 embedding: Vec<f32>,
371 metadata: serde_json::Value,
372 content_hash: String,
373 source_hash: Option<String>,
374 ) -> Self {
375 Self {
376 id: slice.id.clone(),
377 namespace,
378 embedding,
379 metadata,
380 document: slice.content.clone(),
381 layer: slice.layer.as_u8(),
382 parent_id: slice.parent_id.clone(),
383 children_ids: slice.children_ids.clone(),
384 keywords: slice.keywords.clone(),
385 content_hash: Some(content_hash),
386 source_hash,
387 }
388 }
389
390 pub fn is_flat(&self) -> bool {
392 self.layer == 0
393 }
394
395 pub fn slice_layer(&self) -> Option<SliceLayer> {
397 SliceLayer::from_u8(self.layer)
398 }
399}
400
401pub struct StorageManager {
402 lance: Connection,
403 table: Arc<Mutex<Option<Table>>>,
404 namespace_tables: Arc<Mutex<HashMap<String, Table>>>,
405 collection_name: String,
406 lance_path: String,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
410#[serde(rename_all = "snake_case")]
411pub enum CrossStoreRecoveryStatus {
412 #[default]
413 Pending,
414 RolledBack,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossStoreRecoveryDocumentRef {
419 pub namespace: String,
420 pub id: String,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct CrossStoreRecoveryBatch {
425 pub batch_id: String,
426 pub created_at: String,
427 #[serde(default)]
428 pub status: CrossStoreRecoveryStatus,
429 #[serde(default)]
430 pub last_error: Option<String>,
431 pub documents: Vec<CrossStoreRecoveryDocumentRef>,
432}
433
434impl CrossStoreRecoveryBatch {
435 pub fn from_documents(documents: &[ChromaDocument]) -> Self {
436 Self {
437 batch_id: Uuid::new_v4().to_string(),
438 created_at: chrono::Utc::now().to_rfc3339(),
439 status: CrossStoreRecoveryStatus::Pending,
440 last_error: None,
441 documents: documents
442 .iter()
443 .map(|document| CrossStoreRecoveryDocumentRef {
444 namespace: document.namespace.clone(),
445 id: document.id.clone(),
446 })
447 .collect(),
448 }
449 }
450}
451
452type BatchIter =
453 RecordBatchIterator<std::vec::IntoIter<std::result::Result<RecordBatch, ArrowError>>>;
454
455impl StorageManager {
456 pub async fn new(db_path: &str) -> Result<Self> {
457 let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
459 let lance_path = if lance_env.trim().is_empty() {
460 shellexpand::tilde("~/.rmcp-servers/rust-memex/lancedb").to_string()
461 } else {
462 shellexpand::tilde(&lance_env).to_string()
463 };
464
465 let lance = connect(&lance_path).execute().await?;
466
467 Ok(Self {
468 lance,
469 table: Arc::new(Mutex::new(None)),
470 namespace_tables: Arc::new(Mutex::new(HashMap::new())),
471 collection_name: DEFAULT_TABLE_NAME.to_string(),
472 lance_path,
473 })
474 }
475
476 pub async fn new_lance_only(db_path: &str) -> Result<Self> {
479 let lance_path = shellexpand::tilde(db_path).to_string();
480 let lance = connect(&lance_path).execute().await?;
481
482 Ok(Self {
483 lance,
484 table: Arc::new(Mutex::new(None)),
485 namespace_tables: Arc::new(Mutex::new(HashMap::new())),
486 collection_name: DEFAULT_TABLE_NAME.to_string(),
487 lance_path,
488 })
489 }
490
491 pub fn lance_path(&self) -> &str {
492 &self.lance_path
493 }
494
495 pub async fn require_current_schema_for_writes(&self) -> Result<()> {
496 let Some(table) = self.open_table_if_exists().await? else {
497 return Ok(());
498 };
499 self.ensure_hash_schema_columns(&table).await
500 }
501
502 pub async fn schema_status(
503 &self,
504 expected_schema: SchemaVersion,
505 ) -> Result<SchemaStatusReport> {
506 let Some(table) = self.open_table_if_exists().await? else {
507 return Ok(SchemaStatusReport {
508 schema_version: expected_schema,
509 expected_schema,
510 needs_migration: false,
511 missing_columns: Vec::new(),
512 manifest_version: None,
513 });
514 };
515
516 let missing_columns = Self::missing_required_columns(&table, expected_schema)
517 .await?
518 .into_iter()
519 .map(|field| field.name().to_string())
520 .collect::<Vec<_>>();
521 let manifest_version = table
522 .list_versions()
523 .await
524 .ok()
525 .and_then(|versions| versions.iter().map(|version| version.version).max());
526
527 Ok(SchemaStatusReport {
528 schema_version: if missing_columns.is_empty() {
529 expected_schema
530 } else {
531 SchemaVersion::V3
532 },
533 expected_schema,
534 needs_migration: !missing_columns.is_empty(),
535 missing_columns,
536 manifest_version,
537 })
538 }
539
540 pub async fn missing_required_columns(
541 table: &Table,
542 target: SchemaVersion,
543 ) -> Result<Vec<Field>> {
544 let schema = table.schema().await?;
545 Ok(required_columns_for(target)
546 .into_iter()
547 .filter(|field| schema.field_with_name(field.name()).is_err())
548 .collect())
549 }
550
551 pub async fn migrate_lance_schema(
552 db_path: &str,
553 target: SchemaVersion,
554 check_only: bool,
555 ) -> Result<SchemaMigrationReport> {
556 let lance_path = shellexpand::tilde(db_path).to_string();
557 let lance = connect(&lance_path).execute().await?;
558 let table = lance.open_table(DEFAULT_TABLE_NAME).execute().await?;
559 let missing = Self::missing_required_columns(&table, target).await?;
560
561 if missing.is_empty() || check_only {
562 return Ok(SchemaMigrationReport {
563 target,
564 missing_columns: missing,
565 applied: false,
566 });
567 }
568
569 let transform = NewColumnTransform::AllNulls(Arc::new(Schema::new(missing.clone())));
570 if let Err(error) = table.add_columns(transform, None).await {
571 let _ = table.checkout_latest().await;
572 let remaining = Self::missing_required_columns(&table, target).await?;
573 if remaining.is_empty() {
574 warn!(
575 "Lance table '{}' schema migration raced with another writer and is already complete",
576 DEFAULT_TABLE_NAME
577 );
578 return Ok(SchemaMigrationReport {
579 target,
580 missing_columns: missing,
581 applied: true,
582 });
583 }
584 return Err(anyhow!(
585 "failed to migrate Lance table '{}' schema to {target}: {error}",
586 DEFAULT_TABLE_NAME
587 ));
588 }
589
590 let _ = table.checkout_latest().await;
591 Ok(SchemaMigrationReport {
592 target,
593 missing_columns: missing,
594 applied: true,
595 })
596 }
597
598 pub fn cross_store_recovery_dir(&self) -> PathBuf {
599 let db_path = Path::new(&self.lance_path);
600 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
601 let stem = db_path
602 .file_name()
603 .and_then(|name| name.to_str())
604 .unwrap_or("lancedb");
605 parent.join(format!(".{stem}-cross-store-recovery"))
606 }
607
608 fn cross_store_recovery_batch_path(&self, batch_id: &str) -> PathBuf {
609 self.cross_store_recovery_dir()
610 .join(format!("{batch_id}.json"))
611 }
612
613 fn write_cross_store_recovery_batch(&self, batch: &CrossStoreRecoveryBatch) -> Result<PathBuf> {
614 let dir = self.cross_store_recovery_dir();
615 std::fs::create_dir_all(&dir)?;
616
617 let path = self.cross_store_recovery_batch_path(&batch.batch_id);
618 let tmp_path = path.with_extension("json.tmp");
619 let payload = serde_json::to_vec_pretty(batch)?;
620
621 std::fs::write(&tmp_path, payload)?;
622 std::fs::rename(&tmp_path, &path)?;
623
624 Ok(path)
625 }
626
627 pub fn persist_cross_store_recovery_batch(
628 &self,
629 batch: &CrossStoreRecoveryBatch,
630 ) -> Result<PathBuf> {
631 self.write_cross_store_recovery_batch(batch)
632 }
633
634 pub fn update_cross_store_recovery_batch(
635 &self,
636 batch: &CrossStoreRecoveryBatch,
637 ) -> Result<PathBuf> {
638 self.write_cross_store_recovery_batch(batch)
639 }
640
641 pub fn clear_cross_store_recovery_batch(&self, batch_id: &str) -> Result<()> {
642 let path = self.cross_store_recovery_batch_path(batch_id);
643 match std::fs::remove_file(path) {
644 Ok(()) => Ok(()),
645 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
646 Err(error) => Err(error.into()),
647 }
648 }
649
650 pub fn list_cross_store_recovery_batches(&self) -> Result<Vec<CrossStoreRecoveryBatch>> {
651 let dir = self.cross_store_recovery_dir();
652 if !dir.exists() {
653 return Ok(vec![]);
654 }
655
656 let mut batches = Vec::new();
657 for entry in std::fs::read_dir(dir)? {
658 let entry = entry?;
659 let path = entry.path();
660 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
661 continue;
662 }
663
664 let payload = std::fs::read(&path)?;
665 let batch: CrossStoreRecoveryBatch = serde_json::from_slice(&payload)?;
666 batches.push(batch);
667 }
668
669 batches.sort_by(|left, right| left.created_at.cmp(&right.created_at));
670 Ok(batches)
671 }
672
673 pub async fn refresh(&self) -> Result<()> {
676 let mut guard = self.table.lock().await;
677 *guard = None;
678 self.namespace_tables.lock().await.clear();
679 tracing::info!("LanceDB table cache cleared - will refresh on next query");
680 Ok(())
681 }
682
683 pub async fn ensure_collection(&self) -> Result<()> {
684 let mut guard = self.table.lock().await;
686 if guard.is_some() {
687 return Ok(());
688 }
689 match self
690 .lance
691 .open_table(self.collection_name.as_str())
692 .execute()
693 .await
694 {
695 Ok(table) => {
696 *guard = Some(table);
697 info!("Found existing Lance table '{}'", self.collection_name);
698 }
699 Err(_) => {
700 info!(
701 "Lance table '{}' will be created on first insert",
702 self.collection_name
703 );
704 }
705 }
706 Ok(())
707 }
708
709 pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
710 if documents.is_empty() {
711 return Ok(());
712 }
713
714 let dim = documents
716 .first()
717 .ok_or_else(|| anyhow!("No documents to add"))?
718 .embedding
719 .len();
720 if dim == 0 {
721 return Err(anyhow!("Embedding dimension is zero"));
722 }
723
724 for (i, doc) in documents.iter().enumerate() {
726 if doc.embedding.len() != dim {
727 return Err(anyhow!(
728 "Document {} has inconsistent embedding dimension: expected {}, got {}. \
729 Aborting batch to prevent database corruption.",
730 i,
731 dim,
732 doc.embedding.len()
733 ));
734 }
735 if doc.id.is_empty() {
736 return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
737 }
738 if doc.namespace.is_empty() {
739 return Err(anyhow!(
740 "Document {} has empty namespace. Aborting batch.",
741 i
742 ));
743 }
744 for (j, &val) in doc.embedding.iter().enumerate() {
746 if val.is_nan() || val.is_infinite() {
747 return Err(anyhow!(
748 "Document {} has invalid embedding value at index {}: {}. \
749 Aborting batch to prevent database corruption.",
750 i,
751 j,
752 val
753 ));
754 }
755 }
756 }
757
758 let mut by_namespace: HashMap<String, Vec<ChromaDocument>> = HashMap::new();
759 for document in documents {
760 by_namespace
761 .entry(document.namespace.clone())
762 .or_default()
763 .push(document);
764 }
765
766 let mut inserted = 0usize;
767 for (namespace, docs) in by_namespace {
768 let table = self.ensure_namespace_table(&namespace, dim).await?;
769 self.ensure_hash_schema_columns(&table).await?;
770 let batch = self.docs_to_batch(&docs, dim)?;
771 if let Err(error) = table.add(batch).execute().await {
772 return Err(self.map_lancedb_write_error(error));
773 }
774 inserted += docs.len();
775 }
776 debug!("Inserted {} documents into Lance (validated)", inserted);
777 Ok(())
778 }
779
780 pub async fn search_store(
781 &self,
782 namespace: Option<&str>,
783 embedding: Vec<f32>,
784 k: usize,
785 ) -> Result<Vec<ChromaDocument>> {
786 if embedding.is_empty() {
787 return Ok(vec![]);
788 }
789 let mut results = Vec::new();
790 if let Some(ns) = namespace {
791 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
792 let mut stream = table
793 .query()
794 .nearest_to(embedding.clone())?
795 .limit(k)
796 .execute()
797 .await?;
798 while let Some(batch) = stream.try_next().await? {
799 let mut docs = self.batch_to_docs(&batch)?;
800 results.append(&mut docs);
801 }
802 }
803 if let Some(table) = self.legacy_table_if_exists().await? {
804 let mut stream = table
805 .query()
806 .only_if(self.namespace_filter(ns).as_str())
807 .nearest_to(embedding)?
808 .limit(k)
809 .execute()
810 .await?;
811 while let Some(batch) = stream.try_next().await? {
812 let mut docs = self.batch_to_docs(&batch)?;
813 results.append(&mut docs);
814 }
815 }
816 results.truncate(k);
817 } else {
818 for table_name in self.data_table_names().await? {
819 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
820 continue;
821 };
822 let mut stream = table
823 .query()
824 .nearest_to(embedding.clone())?
825 .limit(k)
826 .execute()
827 .await?;
828 while let Some(batch) = stream.try_next().await? {
829 let mut docs = self.batch_to_docs(&batch)?;
830 results.append(&mut docs);
831 }
832 }
833 results.truncate(k);
834 }
835 debug!("Lance returned {} results", results.len());
836 Ok(results)
837 }
838
839 pub async fn all_documents_page(
845 &self,
846 namespace: Option<&str>,
847 offset: usize,
848 limit: usize,
849 ) -> Result<Vec<ChromaDocument>> {
850 let mut results = Vec::new();
851 if let Some(ns) = namespace {
852 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
853 results.append(
854 &mut self
855 .query_table_page(&table, None, 0, offset + limit)
856 .await?,
857 );
858 }
859 if let Some(table) = self.legacy_table_if_exists().await? {
860 results.append(
861 &mut self
862 .query_table_page(
863 &table,
864 Some(self.namespace_filter(ns)),
865 0,
866 offset + limit,
867 )
868 .await?,
869 );
870 }
871 } else {
872 for table_name in self.data_table_names().await? {
873 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
874 continue;
875 };
876 results.append(
877 &mut self
878 .query_table_page(&table, None, 0, offset + limit)
879 .await?,
880 );
881 }
882 }
883
884 Ok(results.into_iter().skip(offset).take(limit).collect())
885 }
886
887 pub async fn all_documents(
891 &self,
892 namespace: Option<&str>,
893 limit: usize,
894 ) -> Result<Vec<ChromaDocument>> {
895 self.all_documents_page(namespace, 0, limit).await
896 }
897
898 pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
899 let id_filter = self.id_filter(id);
900 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
901 let mut stream = table
902 .query()
903 .only_if(id_filter.as_str())
904 .limit(1)
905 .execute()
906 .await?;
907 if let Some(batch) = stream.try_next().await? {
908 let mut docs = self.batch_to_docs(&batch)?;
909 if let Some(doc) = docs.pop() {
910 return Ok(Some(doc));
911 }
912 }
913 }
914
915 if let Some(table) = self.legacy_table_if_exists().await? {
916 let filter = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
917 let mut stream = table
918 .query()
919 .only_if(filter.as_str())
920 .limit(1)
921 .execute()
922 .await?;
923 if let Some(batch) = stream.try_next().await? {
924 let mut docs = self.batch_to_docs(&batch)?;
925 if let Some(doc) = docs.pop() {
926 return Ok(Some(doc));
927 }
928 }
929 }
930 Ok(None)
931 }
932
933 pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
934 let mut deleted = 0usize;
935 let id_filter = self.id_filter(id);
936
937 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
938 let pre_count = table.count_rows(Some(id_filter.clone())).await?;
939 if pre_count > 0 {
940 table.delete(id_filter.as_str()).await?;
941 deleted += pre_count;
942 }
943 }
944
945 if let Some(table) = self.legacy_table_if_exists().await? {
946 let predicate = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
947 let pre_count = table.count_rows(Some(predicate.clone())).await?;
948 if pre_count > 0 {
949 table.delete(predicate.as_str()).await?;
950 deleted += pre_count;
951 }
952 }
953 Ok(deleted)
954 }
955
956 pub async fn delete_documents(&self, namespace: &str, ids: &[&str]) -> Result<usize> {
963 if ids.is_empty() {
964 return Ok(0);
965 }
966 const CHUNK: usize = 500;
967 let mut total_deleted = 0usize;
968 for batch in ids.chunks(CHUNK) {
969 let id_list = batch
970 .iter()
971 .map(|id| format!("'{}'", id.replace('\'', "''")))
972 .collect::<Vec<_>>()
973 .join(", ");
974 let id_predicate = format!("id IN ({})", id_list);
975 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
976 let pre_count = table.count_rows(Some(id_predicate.clone())).await?;
977 if pre_count > 0 {
978 table.delete(id_predicate.as_str()).await?;
979 total_deleted += pre_count;
980 }
981 }
982 if let Some(table) = self.legacy_table_if_exists().await? {
983 let predicate =
984 format!("{} AND {}", self.namespace_filter(namespace), id_predicate);
985 let pre_count = table.count_rows(Some(predicate.clone())).await?;
986 if pre_count > 0 {
987 table.delete(predicate.as_str()).await?;
988 total_deleted += pre_count;
989 }
990 }
991 }
992 Ok(total_deleted)
993 }
994
995 pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
996 let mut deleted = 0usize;
997 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
998 let pre_count = table.count_rows(None).await?;
999 if pre_count > 0 {
1000 table
1001 .delete(self.namespace_filter(namespace).as_str())
1002 .await?;
1003 deleted += pre_count;
1004 }
1005 }
1006 if let Some(table) = self.legacy_table_if_exists().await? {
1007 let predicate = self.namespace_filter(namespace);
1008 let pre_count = table.count_rows(Some(predicate.clone())).await?;
1009 if pre_count > 0 {
1010 table.delete(predicate.as_str()).await?;
1011 deleted += pre_count;
1012 }
1013 }
1014 Ok(deleted)
1015 }
1016
1017 pub async fn rename_namespace_atomic(&self, from: &str, to: &str) -> Result<usize> {
1018 if from == to {
1019 return Ok(0);
1020 }
1021
1022 let source_count = self.count_namespace(from).await?;
1023 if source_count == 0 {
1024 return Ok(0);
1025 }
1026
1027 let target_count = self.count_namespace(to).await?;
1028 if target_count > 0 {
1029 return Err(anyhow!(
1030 "Target namespace '{}' already exists with {} rows",
1031 to,
1032 target_count
1033 ));
1034 }
1035
1036 let mut docs = self.all_documents(Some(from), source_count).await?;
1037 for doc in &mut docs {
1038 doc.namespace = to.to_string();
1039 }
1040 self.add_to_store(docs).await?;
1041 let deleted = self.delete_namespace_documents(from).await?;
1042
1043 Ok(deleted)
1044 }
1045
1046 pub fn get_collection_name(&self) -> &str {
1047 &self.collection_name
1048 }
1049
1050 fn namespace_table_name(namespace: &str) -> String {
1051 let mut safe = namespace
1052 .chars()
1053 .map(|ch| {
1054 if ch.is_ascii_alphanumeric() {
1055 ch.to_ascii_lowercase()
1056 } else {
1057 '_'
1058 }
1059 })
1060 .collect::<String>();
1061 while safe.contains("__") {
1062 safe = safe.replace("__", "_");
1063 }
1064 let safe = safe.trim_matches('_');
1065 let safe = if safe.is_empty() { "default" } else { safe };
1066 let safe = safe.chars().take(48).collect::<String>();
1067 let hash = Sha256::digest(namespace.as_bytes());
1068 let suffix = hash[..6]
1069 .iter()
1070 .map(|byte| format!("{byte:02x}"))
1071 .collect::<String>();
1072 format!("{NAMESPACE_TABLE_PREFIX}{safe}_{suffix}")
1073 }
1074
1075 fn is_namespace_table_name(table_name: &str) -> bool {
1076 table_name.starts_with(NAMESPACE_TABLE_PREFIX)
1077 }
1078
1079 async fn data_table_names(&self) -> Result<Vec<String>> {
1080 let table_names = self.lance.table_names().execute().await?;
1081 Ok(table_names
1082 .into_iter()
1083 .filter(|name| name == DEFAULT_TABLE_NAME || Self::is_namespace_table_name(name))
1084 .collect())
1085 }
1086
1087 async fn open_named_table_if_exists(&self, table_name: &str) -> Result<Option<Table>> {
1088 match self.lance.open_table(table_name).execute().await {
1089 Ok(table) => Ok(Some(table)),
1090 Err(e) => {
1091 let msg = e.to_string().to_lowercase();
1092 if msg.contains("not found")
1093 || msg.contains("does not exist")
1094 || msg.contains("no such file")
1095 {
1096 Ok(None)
1097 } else {
1098 Err(anyhow!("LanceDB error on table '{}': {}", table_name, e))
1099 }
1100 }
1101 }
1102 }
1103
1104 async fn open_namespace_table_if_exists(&self, namespace: &str) -> Result<Option<Table>> {
1105 let table_name = Self::namespace_table_name(namespace);
1106 if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1107 return Ok(Some(table));
1108 }
1109 let table = self.open_named_table_if_exists(&table_name).await?;
1110 if let Some(table) = &table {
1111 self.namespace_tables
1112 .lock()
1113 .await
1114 .insert(table_name, table.clone());
1115 }
1116 Ok(table)
1117 }
1118
1119 async fn ensure_namespace_table(&self, namespace: &str, dim: usize) -> Result<Table> {
1120 let table_name = Self::namespace_table_name(namespace);
1121 if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1122 return Ok(table);
1123 }
1124
1125 let table = match self.open_named_table_if_exists(&table_name).await? {
1126 Some(table) => table,
1127 None => {
1128 if dim == 0 {
1129 return Err(anyhow!(
1130 "Vector table '{}' not found and dimension is unknown",
1131 table_name
1132 ));
1133 }
1134 info!(
1135 "Creating Lance namespace table '{}' for '{}' with vector dimension {} (schema v{})",
1136 table_name, namespace, dim, SCHEMA_VERSION
1137 );
1138 let schema = Arc::new(Self::create_schema(dim));
1139 self.lance
1140 .create_empty_table(table_name.as_str(), schema)
1141 .execute()
1142 .await?
1143 }
1144 };
1145
1146 self.namespace_tables
1147 .lock()
1148 .await
1149 .insert(table_name, table.clone());
1150 Ok(table)
1151 }
1152
1153 async fn query_table_page(
1154 &self,
1155 table: &Table,
1156 filter: Option<String>,
1157 offset: usize,
1158 limit: usize,
1159 ) -> Result<Vec<ChromaDocument>> {
1160 let mut query = table.query().limit(limit).offset(offset);
1161 if let Some(filter) = filter {
1162 query = query.only_if(filter.as_str());
1163 }
1164 let mut stream = query.execute().await?;
1165 let mut results = Vec::new();
1166 while let Some(batch) = stream.try_next().await? {
1167 let mut docs = self.batch_to_docs(&batch)?;
1168 results.append(&mut docs);
1169 }
1170 Ok(results)
1171 }
1172
1173 async fn legacy_table_if_exists(&self) -> Result<Option<Table>> {
1174 self.open_table_if_exists().await
1175 }
1176
1177 async fn open_table_if_exists(&self) -> Result<Option<Table>> {
1181 let mut guard = self.table.lock().await;
1182 if let Some(table) = guard.as_ref() {
1183 return Ok(Some(table.clone()));
1184 }
1185
1186 match self
1187 .lance
1188 .open_table(self.collection_name.as_str())
1189 .execute()
1190 .await
1191 {
1192 Ok(tbl) => {
1193 *guard = Some(tbl.clone());
1194 Ok(Some(tbl))
1195 }
1196 Err(e) => {
1197 let msg = e.to_string().to_lowercase();
1198 if msg.contains("not found")
1199 || msg.contains("does not exist")
1200 || msg.contains("no such file")
1201 {
1202 Ok(None)
1203 } else {
1204 tracing::warn!(
1205 "LanceDB error opening table '{}': {}",
1206 self.collection_name,
1207 e
1208 );
1209 Err(anyhow!(
1210 "LanceDB error on table '{}': {}",
1211 self.collection_name,
1212 e
1213 ))
1214 }
1215 }
1216 }
1217 }
1218
1219 fn create_schema(dim: usize) -> Schema {
1221 Schema::new(vec![
1222 Field::new("id", DataType::Utf8, false),
1223 Field::new("namespace", DataType::Utf8, false),
1224 Field::new(
1225 "vector",
1226 DataType::FixedSizeList(
1227 Arc::new(Field::new("item", DataType::Float32, true)),
1228 dim as i32,
1229 ),
1230 false,
1231 ),
1232 Field::new("text", DataType::Utf8, true),
1233 Field::new("metadata", DataType::Utf8, true),
1234 Field::new("layer", DataType::UInt8, true), Field::new("parent_id", DataType::Utf8, true), Field::new("children_ids", DataType::Utf8, true), Field::new("keywords", DataType::Utf8, true), Field::new("content_hash", DataType::Utf8, true), Field::new("source_hash", DataType::Utf8, true), ])
1246 }
1247
1248 async fn ensure_hash_schema_columns(&self, table: &Table) -> Result<()> {
1249 let missing = Self::missing_required_columns(table, SchemaVersion::current()).await?;
1250
1251 if missing.is_empty() {
1252 return Ok(());
1253 }
1254
1255 let missing_columns = missing
1256 .iter()
1257 .map(|field| field.name().to_string())
1258 .collect::<Vec<_>>();
1259 let error = SchemaMismatchWriteError::new(
1260 self.collection_name.clone(),
1261 self.lance_path.clone(),
1262 missing_columns,
1263 "table is older than the current writer schema",
1264 );
1265 self.log_schema_mismatch(&error);
1266 Err(error.into())
1267 }
1268
1269 fn map_lancedb_write_error(&self, error: lancedb::error::Error) -> anyhow::Error {
1270 let message = match &error {
1271 lancedb::error::Error::Lance { source } => source.to_string(),
1272 lancedb::error::Error::Schema { message } => message.clone(),
1273 lancedb::error::Error::Arrow { source } => source.to_string(),
1274 _ => return error.into(),
1275 };
1276
1277 if !is_schema_mismatch_message(&message) {
1278 return error.into();
1279 }
1280
1281 let missing_columns = extract_missing_columns(&message);
1282 let error = SchemaMismatchWriteError::new(
1283 self.collection_name.clone(),
1284 self.lance_path.clone(),
1285 missing_columns,
1286 message,
1287 );
1288 self.log_schema_mismatch(&error);
1289 error.into()
1290 }
1291
1292 fn log_schema_mismatch(&self, error: &SchemaMismatchWriteError) {
1293 error!(
1294 error_kind = "schema_mismatch",
1295 table = %error.table_name(),
1296 db_path = %error.db_path(),
1297 missing_columns = ?error.missing_columns(),
1298 remediation = %error.remediation(),
1299 file = file!(),
1300 line = line!(),
1301 "write-path schema mismatch"
1302 );
1303 }
1304
1305 fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
1306 let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
1307 let namespaces = documents
1308 .iter()
1309 .map(|d| d.namespace.as_str())
1310 .collect::<Vec<_>>();
1311 let texts = documents
1312 .iter()
1313 .map(|d| d.document.as_str())
1314 .collect::<Vec<_>>();
1315 let metadata_strings = documents
1316 .iter()
1317 .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
1318 .collect::<Vec<_>>();
1319
1320 let vectors = documents.iter().map(|d| {
1321 if d.embedding.len() != dim {
1322 None
1323 } else {
1324 Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
1325 }
1326 });
1327
1328 let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
1330 let parent_ids: Vec<Option<&str>> =
1331 documents.iter().map(|d| d.parent_id.as_deref()).collect();
1332 let children_ids_json: Vec<String> = documents
1333 .iter()
1334 .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
1335 .collect();
1336 let keywords_json: Vec<String> = documents
1337 .iter()
1338 .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
1339 .collect();
1340 let content_hashes: Vec<Option<&str>> = documents
1342 .iter()
1343 .map(|d| d.content_hash.as_deref())
1344 .collect();
1345 let source_hashes: Vec<Option<&str>> =
1347 documents.iter().map(|d| d.source_hash.as_deref()).collect();
1348
1349 let schema = Arc::new(Self::create_schema(dim));
1350
1351 let batch = RecordBatch::try_new(
1352 schema.clone(),
1353 vec![
1354 Arc::new(StringArray::from(ids)),
1355 Arc::new(StringArray::from(namespaces)),
1356 Arc::new(
1357 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
1358 vectors, dim as i32,
1359 ),
1360 ),
1361 Arc::new(StringArray::from(texts)),
1362 Arc::new(StringArray::from(metadata_strings)),
1363 Arc::new(UInt8Array::from(layers)),
1365 Arc::new(StringArray::from(parent_ids)),
1366 Arc::new(StringArray::from(
1367 children_ids_json
1368 .iter()
1369 .map(|s| s.as_str())
1370 .collect::<Vec<_>>(),
1371 )),
1372 Arc::new(StringArray::from(
1373 keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
1374 )),
1375 Arc::new(StringArray::from(content_hashes)),
1377 Arc::new(StringArray::from(source_hashes)),
1379 ],
1380 )?;
1381
1382 Ok(RecordBatchIterator::new(
1383 vec![Ok(batch)].into_iter(),
1384 schema,
1385 ))
1386 }
1387
1388 fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
1389 let id_col = batch
1390 .column_by_name("id")
1391 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1392 .ok_or_else(|| anyhow!("Missing id column"))?;
1393 let ns_col = batch
1394 .column_by_name("namespace")
1395 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1396 .ok_or_else(|| anyhow!("Missing namespace column"))?;
1397 let text_col = batch
1398 .column_by_name("text")
1399 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1400 .ok_or_else(|| anyhow!("Missing text column"))?;
1401 let metadata_col = batch
1402 .column_by_name("metadata")
1403 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1404 .ok_or_else(|| anyhow!("Missing metadata column"))?;
1405 let vector_col = batch
1406 .column_by_name("vector")
1407 .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
1408 .ok_or_else(|| anyhow!("Missing vector column"))?;
1409
1410 let layer_col = batch
1412 .column_by_name("layer")
1413 .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
1414 let parent_id_col = batch
1415 .column_by_name("parent_id")
1416 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1417 let children_ids_col = batch
1418 .column_by_name("children_ids")
1419 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1420 let keywords_col = batch
1421 .column_by_name("keywords")
1422 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1423 let content_hash_col = batch
1425 .column_by_name("content_hash")
1426 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1427 let source_hash_col = batch
1429 .column_by_name("source_hash")
1430 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1431
1432 let dim = vector_col.value_length() as usize;
1433 let values = vector_col
1434 .values()
1435 .as_any()
1436 .downcast_ref::<Float32Array>()
1437 .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
1438
1439 let mut docs = Vec::new();
1440 for i in 0..batch.num_rows() {
1441 let id = id_col.value(i).to_string();
1442 let text = text_col.value(i).to_string();
1443 let namespace = ns_col.value(i).to_string();
1444 let meta_str = metadata_col.value(i);
1445 let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
1446
1447 let offset = i * dim;
1448 let mut emb = Vec::with_capacity(dim);
1449 for j in 0..dim {
1450 emb.push(values.value(offset + j));
1451 }
1452
1453 let layer = layer_col
1455 .and_then(|col| {
1456 if col.is_null(i) {
1457 None
1458 } else {
1459 Some(col.value(i))
1460 }
1461 })
1462 .unwrap_or(0);
1463
1464 let parent_id = parent_id_col.and_then(|col| {
1465 if col.is_null(i) {
1466 None
1467 } else {
1468 Some(col.value(i).to_string())
1469 }
1470 });
1471
1472 let children_ids: Vec<String> = children_ids_col
1473 .and_then(|col| {
1474 if col.is_null(i) {
1475 None
1476 } else {
1477 serde_json::from_str(col.value(i)).ok()
1478 }
1479 })
1480 .unwrap_or_default();
1481
1482 let keywords: Vec<String> = keywords_col
1483 .and_then(|col| {
1484 if col.is_null(i) {
1485 None
1486 } else {
1487 serde_json::from_str(col.value(i)).ok()
1488 }
1489 })
1490 .unwrap_or_default();
1491
1492 let content_hash = content_hash_col.and_then(|col| {
1493 if col.is_null(i) {
1494 None
1495 } else {
1496 Some(col.value(i).to_string())
1497 }
1498 });
1499
1500 let source_hash = source_hash_col.and_then(|col| {
1501 if col.is_null(i) {
1502 None
1503 } else {
1504 Some(col.value(i).to_string())
1505 }
1506 });
1507
1508 docs.push(ChromaDocument {
1509 id,
1510 namespace,
1511 embedding: emb,
1512 metadata,
1513 document: text,
1514 layer,
1515 parent_id,
1516 children_ids,
1517 keywords,
1518 content_hash,
1519 source_hash,
1520 });
1521 }
1522 Ok(docs)
1523 }
1524
1525 pub async fn get_filtered_in_namespace(
1526 &self,
1527 namespace: &str,
1528 filter: &str,
1529 ) -> Result<Vec<ChromaDocument>> {
1530 let mut results = Vec::new();
1531 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1532 let mut stream = table.query().only_if(filter).execute().await?;
1533 while let Some(batch) = stream.try_next().await? {
1534 let mut docs = self.batch_to_docs(&batch)?;
1535 results.append(&mut docs);
1536 }
1537 }
1538 if let Some(table) = self.legacy_table_if_exists().await? {
1539 let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
1540 let mut stream = table.query().only_if(combined.as_str()).execute().await?;
1541 while let Some(batch) = stream.try_next().await? {
1542 let mut docs = self.batch_to_docs(&batch)?;
1543 results.append(&mut docs);
1544 }
1545 }
1546 Ok(results)
1547 }
1548
1549 pub async fn search_store_with_layer(
1551 &self,
1552 namespace: Option<&str>,
1553 embedding: Vec<f32>,
1554 k: usize,
1555 layer_filter: Option<SliceLayer>,
1556 ) -> Result<Vec<ChromaDocument>> {
1557 if embedding.is_empty() {
1558 return Ok(vec![]);
1559 }
1560 let mut filters = Vec::new();
1562 if let Some(layer) = layer_filter {
1563 filters.push(self.layer_filter(layer));
1564 }
1565 let mut results = Vec::new();
1566
1567 if let Some(ns) = namespace {
1568 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
1569 let mut query = table.query();
1570 if !filters.is_empty() {
1571 let combined = filters.join(" AND ");
1572 query = query.only_if(combined.as_str());
1573 }
1574 let mut stream = query
1575 .nearest_to(embedding.clone())?
1576 .limit(k)
1577 .execute()
1578 .await?;
1579 while let Some(batch) = stream.try_next().await? {
1580 let mut docs = self.batch_to_docs(&batch)?;
1581 results.append(&mut docs);
1582 }
1583 }
1584 if let Some(table) = self.legacy_table_if_exists().await? {
1585 let mut legacy_filters = vec![self.namespace_filter(ns)];
1586 legacy_filters.extend(filters.clone());
1587 let combined = legacy_filters.join(" AND ");
1588 let mut stream = table
1589 .query()
1590 .only_if(combined.as_str())
1591 .nearest_to(embedding)?
1592 .limit(k)
1593 .execute()
1594 .await?;
1595 while let Some(batch) = stream.try_next().await? {
1596 let mut docs = self.batch_to_docs(&batch)?;
1597 results.append(&mut docs);
1598 }
1599 }
1600 results.truncate(k);
1601 } else {
1602 for table_name in self.data_table_names().await? {
1603 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1604 continue;
1605 };
1606 let mut query = table.query();
1607 if !filters.is_empty() {
1608 let combined = filters.join(" AND ");
1609 query = query.only_if(combined.as_str());
1610 }
1611 let mut stream = query
1612 .nearest_to(embedding.clone())?
1613 .limit(k)
1614 .execute()
1615 .await?;
1616 while let Some(batch) = stream.try_next().await? {
1617 let mut docs = self.batch_to_docs(&batch)?;
1618 results.append(&mut docs);
1619 }
1620 }
1621 results.truncate(k);
1622 }
1623 debug!(
1624 "Lance returned {} results (layer filter: {:?})",
1625 results.len(),
1626 layer_filter
1627 );
1628 Ok(results)
1629 }
1630
1631 pub async fn get_children(
1633 &self,
1634 namespace: &str,
1635 parent_id: &str,
1636 ) -> Result<Vec<ChromaDocument>> {
1637 if self.open_table_if_exists().await?.is_none() {
1638 return Ok(vec![]);
1639 }
1640
1641 if let Some(parent) = self.get_document(namespace, parent_id).await? {
1643 if parent.children_ids.is_empty() {
1644 return Ok(vec![]);
1645 }
1646
1647 let mut children = Vec::new();
1649 for child_id in &parent.children_ids {
1650 if let Some(child) = self.get_document(namespace, child_id).await? {
1651 children.push(child);
1652 }
1653 }
1654 return Ok(children);
1655 }
1656
1657 Ok(vec![])
1658 }
1659
1660 pub async fn get_parent(
1662 &self,
1663 namespace: &str,
1664 child_id: &str,
1665 ) -> Result<Option<ChromaDocument>> {
1666 if let Some(child) = self.get_document(namespace, child_id).await?
1667 && let Some(ref parent_id) = child.parent_id
1668 {
1669 return self.get_document(namespace, parent_id).await;
1670 }
1671 Ok(None)
1672 }
1673
1674 fn namespace_filter(&self, namespace: &str) -> String {
1675 format!("namespace = '{}'", namespace.replace('\'', "''"))
1676 }
1677
1678 fn id_filter(&self, id: &str) -> String {
1679 format!("id = '{}'", id.replace('\'', "''"))
1680 }
1681
1682 fn layer_filter(&self, layer: SliceLayer) -> String {
1683 if layer == SliceLayer::Outer {
1684 "(layer = 0 OR layer = 1)".to_string()
1686 } else {
1687 format!("layer = {}", layer.as_u8())
1688 }
1689 }
1690
1691 fn content_hash_filter(&self, hash: &str) -> String {
1692 format!("content_hash = '{}'", hash.replace('\'', "''"))
1693 }
1694
1695 fn source_hash_filter(&self, hash: &str) -> String {
1696 format!("source_hash = '{}'", hash.replace('\'', "''"))
1697 }
1698
1699 async fn table_has_content_hash(table: &Table) -> bool {
1701 table
1702 .schema()
1703 .await
1704 .map(|schema| schema.field_with_name("content_hash").is_ok())
1705 .unwrap_or(false)
1706 }
1707
1708 async fn table_has_source_hash(table: &Table) -> bool {
1710 table
1711 .schema()
1712 .await
1713 .map(|schema| schema.field_with_name("source_hash").is_ok())
1714 .unwrap_or(false)
1715 }
1716
1717 pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1723 let hash_filter = self.content_hash_filter(hash);
1724 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1725 && Self::table_has_content_hash(&table).await
1726 {
1727 let mut stream = table
1728 .query()
1729 .only_if(hash_filter.as_str())
1730 .limit(1)
1731 .execute()
1732 .await?;
1733 if let Some(batch) = stream.try_next().await? {
1734 return Ok(batch.num_rows() > 0);
1735 }
1736 }
1737
1738 if let Some(table) = self.legacy_table_if_exists().await? {
1739 if !Self::table_has_content_hash(&table).await {
1740 tracing::warn!(
1741 "Table '{}' has old schema without content_hash column. \
1742 Deduplication disabled. Consider re-indexing with new schema.",
1743 self.collection_name
1744 );
1745 return Ok(false); }
1747
1748 let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1749 let mut stream = table
1750 .query()
1751 .only_if(filter.as_str())
1752 .limit(1)
1753 .execute()
1754 .await?;
1755
1756 if let Some(batch) = stream.try_next().await? {
1757 return Ok(batch.num_rows() > 0);
1758 }
1759 }
1760
1761 Ok(false)
1762 }
1763
1764 pub async fn has_source_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1772 let hash_filter = self.source_hash_filter(hash);
1773 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1774 && Self::table_has_source_hash(&table).await
1775 {
1776 let mut stream = table
1777 .query()
1778 .only_if(hash_filter.as_str())
1779 .limit(1)
1780 .execute()
1781 .await?;
1782 if let Some(batch) = stream.try_next().await? {
1783 return Ok(batch.num_rows() > 0);
1784 }
1785 }
1786
1787 if let Some(table) = self.legacy_table_if_exists().await? {
1788 if !Self::table_has_source_hash(&table).await {
1789 tracing::debug!(
1790 "Table '{}' has pre-v4 schema without source_hash column. \
1791 Source-level dedup disabled until backfill.",
1792 self.collection_name
1793 );
1794 return Ok(false);
1795 }
1796
1797 let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1798 let mut stream = table
1799 .query()
1800 .only_if(filter.as_str())
1801 .limit(1)
1802 .execute()
1803 .await?;
1804
1805 if let Some(batch) = stream.try_next().await? {
1806 return Ok(batch.num_rows() > 0);
1807 }
1808 }
1809
1810 Ok(false)
1811 }
1812
1813 pub async fn filter_existing_hashes<'a>(
1818 &self,
1819 namespace: &str,
1820 hashes: &'a [String],
1821 ) -> Result<Vec<&'a String>> {
1822 if hashes.is_empty() {
1823 return Ok(vec![]);
1824 }
1825
1826 let hash_conditions: Vec<String> =
1829 hashes.iter().map(|h| self.content_hash_filter(h)).collect();
1830
1831 let mut existing_hashes = std::collections::HashSet::new();
1832
1833 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1834 && Self::table_has_content_hash(&table).await
1835 {
1836 let filter = hash_conditions.join(" OR ");
1837 let mut stream = table
1838 .query()
1839 .only_if(filter.as_str())
1840 .limit(hashes.len())
1841 .execute()
1842 .await?;
1843 while let Some(batch) = stream.try_next().await? {
1844 if let Some(hash_col) = batch
1845 .column_by_name("content_hash")
1846 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1847 {
1848 for i in 0..batch.num_rows() {
1849 if !hash_col.is_null(i) {
1850 existing_hashes.insert(hash_col.value(i).to_string());
1851 }
1852 }
1853 }
1854 }
1855 }
1856
1857 if let Some(table) = self.legacy_table_if_exists().await? {
1858 if !Self::table_has_content_hash(&table).await {
1860 tracing::warn!(
1861 "Table '{}' has old schema without content_hash column. \
1862 Deduplication disabled. Consider re-indexing with new schema.",
1863 self.collection_name
1864 );
1865 return Ok(hashes.iter().collect()); }
1867
1868 let filter = format!(
1869 "{} AND ({})",
1870 self.namespace_filter(namespace),
1871 hash_conditions.join(" OR ")
1872 );
1873 let mut stream = table
1874 .query()
1875 .only_if(filter.as_str())
1876 .limit(hashes.len())
1877 .execute()
1878 .await?;
1879 while let Some(batch) = stream.try_next().await? {
1880 if let Some(hash_col) = batch
1881 .column_by_name("content_hash")
1882 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1883 {
1884 for i in 0..batch.num_rows() {
1885 if !hash_col.is_null(i) {
1886 existing_hashes.insert(hash_col.value(i).to_string());
1887 }
1888 }
1889 }
1890 }
1891 }
1892
1893 Ok(hashes
1895 .iter()
1896 .filter(|h| !existing_hashes.contains(h.as_str()))
1897 .collect())
1898 }
1899
1900 pub async fn optimize(&self) -> Result<OptimizeStats> {
1906 let mut stats = OptimizeStats {
1907 compaction: None,
1908 prune: None,
1909 };
1910 for table_name in self.data_table_names().await? {
1911 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1912 continue;
1913 };
1914 stats = table.optimize(OptimizeAction::All).await?;
1915 }
1916 info!(
1917 "Optimize complete: compaction={:?}, prune={:?}",
1918 stats.compaction, stats.prune
1919 );
1920 Ok(stats)
1921 }
1922
1923 pub async fn compact(&self) -> Result<OptimizeStats> {
1925 let mut stats = OptimizeStats {
1926 compaction: None,
1927 prune: None,
1928 };
1929 for table_name in self.data_table_names().await? {
1930 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1931 continue;
1932 };
1933 stats = table
1934 .optimize(OptimizeAction::Compact {
1935 options: Default::default(),
1936 remap_options: None,
1937 })
1938 .await?;
1939 }
1940 info!("Compaction complete: {:?}", stats.compaction);
1941 Ok(stats)
1942 }
1943
1944 pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
1946 let days = older_than_days.unwrap_or(7) as i64;
1947 let duration = chrono::TimeDelta::days(days);
1948 let mut stats = OptimizeStats {
1949 compaction: None,
1950 prune: None,
1951 };
1952 for table_name in self.data_table_names().await? {
1953 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1954 continue;
1955 };
1956 stats = table
1957 .optimize(OptimizeAction::Prune {
1958 older_than: Some(duration),
1959 delete_unverified: Some(false),
1960 error_if_tagged_old_versions: None,
1961 })
1962 .await?;
1963 }
1964 info!("Cleanup complete: {:?}", stats.prune);
1965 Ok(stats)
1966 }
1967
1968 pub async fn stats(&self) -> Result<TableStats> {
1970 let table_names = self.data_table_names().await?;
1971 let mut row_count = 0usize;
1972 let mut version_count = 0usize;
1973
1974 for table_name in &table_names {
1975 let Some(table) = self.open_named_table_if_exists(table_name).await? else {
1976 continue;
1977 };
1978 row_count += table.count_rows(None).await.unwrap_or(0);
1979 version_count += table.list_versions().await.unwrap_or_default().len();
1980 }
1981
1982 Ok(TableStats {
1983 row_count,
1984 version_count,
1985 table_name: self.collection_name.clone(),
1986 db_path: self.lance_path.clone(),
1987 })
1988 }
1989
1990 pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1992 let mut count = 0usize;
1993 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1994 count += table.count_rows(None).await?;
1995 }
1996 if let Some(table) = self.legacy_table_if_exists().await? {
1997 let filter = self.namespace_filter(namespace);
1998 count += table.count_rows(Some(filter)).await?;
1999 }
2000 Ok(count)
2001 }
2002
2003 pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
2008 let results = self.all_documents(Some(namespace), 100_000).await?;
2009 debug!(
2010 "Retrieved {} documents from namespace '{}'",
2011 results.len(),
2012 namespace
2013 );
2014 Ok(results)
2015 }
2016
2017 pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
2019 let count = self.count_namespace(namespace).await?;
2020 Ok(count > 0)
2021 }
2022}
2023
2024#[derive(Debug, Clone, Serialize)]
2026pub struct TableStats {
2027 pub row_count: usize,
2028 pub version_count: usize,
2029 pub table_name: String,
2030 pub db_path: String,
2031}
2032
2033#[derive(Debug, Clone, Default, Serialize)]
2039pub struct GcStats {
2040 pub orphans_found: usize,
2042 pub orphans_removed: usize,
2044 pub empty_namespaces_found: usize,
2046 pub empty_namespaces_removed: usize,
2048 pub old_docs_found: usize,
2050 pub old_docs_removed: usize,
2052 pub bytes_freed: Option<u64>,
2054 pub empty_namespace_names: Vec<String>,
2056 pub affected_namespaces: Vec<String>,
2058}
2059
2060impl GcStats {
2061 pub fn has_issues(&self) -> bool {
2063 self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
2064 }
2065
2066 pub fn has_deletions(&self) -> bool {
2068 self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
2069 }
2070}
2071
2072#[derive(Debug, Clone)]
2074pub struct GcConfig {
2075 pub remove_orphans: bool,
2077 pub remove_empty: bool,
2079 pub older_than: Option<chrono::Duration>,
2081 pub dry_run: bool,
2083 pub namespace: Option<String>,
2085}
2086
2087impl Default for GcConfig {
2088 fn default() -> Self {
2089 Self {
2090 remove_orphans: false,
2091 remove_empty: false,
2092 older_than: None,
2093 dry_run: true,
2094 namespace: None,
2095 }
2096 }
2097}
2098
2099pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
2101 let s = s.trim().to_lowercase();
2102 if s.is_empty() {
2103 return Err(anyhow!("Empty duration string"));
2104 }
2105
2106 let (num_str, unit) = if s.ends_with('d') {
2108 (&s[..s.len() - 1], 'd')
2109 } else if s.ends_with('m') {
2110 (&s[..s.len() - 1], 'm')
2111 } else if s.ends_with('y') {
2112 (&s[..s.len() - 1], 'y')
2113 } else {
2114 return Err(anyhow!(
2115 "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
2116 s
2117 ));
2118 };
2119
2120 let num: i64 = num_str.parse().map_err(|_| {
2121 anyhow!(
2122 "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
2123 s
2124 )
2125 })?;
2126
2127 if num <= 0 {
2128 return Err(anyhow!("Duration must be positive, got '{}'", s));
2129 }
2130
2131 match unit {
2132 'd' => Ok(chrono::Duration::days(num)),
2133 'm' => Ok(chrono::Duration::days(num * 30)), 'y' => Ok(chrono::Duration::days(num * 365)), _ => unreachable!(),
2136 }
2137}
2138
2139impl StorageManager {
2140 #[doc(alias = "run_gc")]
2146 pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
2147 let mut stats = GcStats::default();
2148
2149 const PAGE_SIZE: usize = 5000;
2150 let mut all_docs: Vec<ChromaDocument> = Vec::new();
2151 let mut offset = 0;
2152 loop {
2153 let page = self
2154 .all_documents_page(config.namespace.as_deref(), offset, PAGE_SIZE)
2155 .await?;
2156 let page_len = page.len();
2157 all_docs.extend(page);
2158 if page_len < PAGE_SIZE {
2159 break;
2160 }
2161 offset += page_len;
2162 }
2163
2164 if all_docs.is_empty() {
2165 return Ok(stats);
2166 }
2167
2168 let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
2170 std::collections::HashMap::new();
2171 for doc in &all_docs {
2172 by_namespace
2173 .entry(doc.namespace.clone())
2174 .or_default()
2175 .push(doc);
2176 }
2177
2178 if config.remove_orphans {
2180 let orphan_stats = self
2181 .find_and_remove_orphans(&all_docs, config.dry_run)
2182 .await?;
2183 stats.orphans_found = orphan_stats.0;
2184 stats.orphans_removed = orphan_stats.1;
2185 }
2186
2187 if config.remove_empty {
2189 let empty_stats = self
2190 .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
2191 .await?;
2192 stats.empty_namespaces_found = empty_stats.0;
2193 stats.empty_namespaces_removed = empty_stats.1;
2194 stats.empty_namespace_names = empty_stats.2;
2195 }
2196
2197 if let Some(ref duration) = config.older_than {
2199 let old_stats = self
2200 .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
2201 .await?;
2202 stats.old_docs_found = old_stats.0;
2203 stats.old_docs_removed = old_stats.1;
2204 stats.affected_namespaces = old_stats.2;
2205 }
2206
2207 Ok(stats)
2208 }
2209
2210 #[deprecated(note = "use garbage_collect")]
2211 pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
2212 self.garbage_collect(config).await
2213 }
2214
2215 async fn find_and_remove_orphans(
2217 &self,
2218 docs: &[ChromaDocument],
2219 dry_run: bool,
2220 ) -> Result<(usize, usize)> {
2221 let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
2223
2224 let mut orphans: Vec<(&str, &str)> = Vec::new(); for doc in docs {
2227 if let Some(ref parent_id) = doc.parent_id
2228 && !all_ids.contains(parent_id.as_str())
2229 {
2230 orphans.push((&doc.namespace, &doc.id));
2231 }
2232 }
2233
2234 let found = orphans.len();
2235 let mut removed = 0;
2236
2237 if !dry_run && !orphans.is_empty() {
2238 for (namespace, id) in &orphans {
2239 if self.delete_document(namespace, id).await.is_ok() {
2240 removed += 1;
2241 }
2242 }
2243 }
2244
2245 Ok((found, removed))
2246 }
2247
2248 async fn find_and_remove_empty_namespaces(
2252 &self,
2253 by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
2254 _dry_run: bool,
2255 ) -> Result<(usize, usize, Vec<String>)> {
2256 let empty_namespaces: Vec<String> = by_namespace
2258 .iter()
2259 .filter(|(_, docs)| docs.is_empty())
2260 .map(|(ns, _)| ns.clone())
2261 .collect();
2262
2263 let found = empty_namespaces.len();
2264 let removed = 0;
2267
2268 Ok((found, removed, empty_namespaces))
2269 }
2270
2271 async fn find_and_remove_old_docs(
2273 &self,
2274 docs: &[ChromaDocument],
2275 older_than: &chrono::Duration,
2276 dry_run: bool,
2277 ) -> Result<(usize, usize, Vec<String>)> {
2278 let cutoff = chrono::Utc::now() - *older_than;
2279
2280 let mut old_docs: Vec<(&str, &str)> = Vec::new(); let mut affected_namespaces: std::collections::HashSet<String> =
2282 std::collections::HashSet::new();
2283
2284 for doc in docs {
2285 if let Some(obj) = doc.metadata.as_object() {
2287 let mut doc_timestamp: Option<String> = None;
2288
2289 for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
2291 if let Some(value) = obj.get(*key)
2292 && let Some(ts) = value.as_str()
2293 {
2294 doc_timestamp = Some(ts.to_string());
2295 break;
2296 }
2297 }
2298
2299 if let Some(ts) = doc_timestamp {
2301 let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
2303 parsed < cutoff
2304 } else if let Ok(parsed) =
2305 chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
2306 {
2307 parsed < cutoff.naive_utc()
2308 } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
2309 parsed < cutoff.date_naive()
2310 } else {
2311 false
2313 };
2314
2315 if is_old {
2316 old_docs.push((&doc.namespace, &doc.id));
2317 affected_namespaces.insert(doc.namespace.clone());
2318 }
2319 }
2320 }
2321 }
2322
2323 let found = old_docs.len();
2324 let mut removed = 0;
2325
2326 if !dry_run && !old_docs.is_empty() {
2327 for (namespace, id) in &old_docs {
2328 if self.delete_document(namespace, id).await.is_ok() {
2329 removed += 1;
2330 }
2331 }
2332 }
2333
2334 Ok((found, removed, affected_namespaces.into_iter().collect()))
2335 }
2336
2337 pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
2339 self.refresh().await?;
2340
2341 let mut namespace_counts: std::collections::HashMap<String, usize> =
2342 std::collections::HashMap::new();
2343
2344 for table_name in self.data_table_names().await? {
2345 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
2346 continue;
2347 };
2348 const PAGE_SIZE: usize = 5000;
2349 let mut offset = 0;
2350 loop {
2351 let page = self
2352 .query_table_page(&table, None, offset, PAGE_SIZE)
2353 .await?;
2354 let page_len = page.len();
2355 for doc in &page {
2356 *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
2357 }
2358 if page_len < PAGE_SIZE {
2359 break;
2360 }
2361 offset += page_len;
2362 }
2363 }
2364
2365 let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
2366 namespaces.sort_by(|a, b| a.0.cmp(&b.0));
2367 Ok(namespaces)
2368 }
2369}
2370
2371#[cfg(test)]
2372mod tests {
2373 use super::*;
2374 use serde_json::json;
2375 use tempfile::TempDir;
2376
2377 #[test]
2378 fn flat_documents_preserve_separate_chunk_and_source_hashes() {
2379 let doc = ChromaDocument::new_flat_with_hashes(
2380 "doc-1".to_string(),
2381 "kb:transcripts".to_string(),
2382 vec![0.0, 1.0],
2383 json!({"path": "sample.md"}),
2384 "outer summary chunk".to_string(),
2385 "chunk-sha256".to_string(),
2386 Some("source-sha256".to_string()),
2387 );
2388
2389 assert_eq!(doc.content_hash.as_deref(), Some("chunk-sha256"));
2390 assert_eq!(doc.source_hash.as_deref(), Some("source-sha256"));
2391 assert_ne!(doc.content_hash, doc.source_hash);
2392 }
2393
2394 #[tokio::test]
2395 async fn namespace_writes_use_separate_lance_tables_and_keep_contracts() {
2396 let tmp = TempDir::new().expect("temp dir");
2397 let db_path = tmp.path().join("lancedb");
2398 let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
2399 .await
2400 .expect("storage");
2401
2402 let embedding = vec![0.25_f32; 8];
2403 storage
2404 .add_to_store(vec![
2405 ChromaDocument::new_flat(
2406 "shared-id".to_string(),
2407 "kb:alpha".to_string(),
2408 embedding.clone(),
2409 json!({"ns": "alpha"}),
2410 "alpha memory".to_string(),
2411 ),
2412 ChromaDocument::new_flat(
2413 "shared-id".to_string(),
2414 "kb:beta".to_string(),
2415 embedding.clone(),
2416 json!({"ns": "beta"}),
2417 "beta memory".to_string(),
2418 ),
2419 ])
2420 .await
2421 .expect("write two namespaces");
2422
2423 let table_names = storage.lance.table_names().execute().await.expect("tables");
2424 let namespace_tables = table_names
2425 .iter()
2426 .filter(|name| StorageManager::is_namespace_table_name(name))
2427 .count();
2428 assert_eq!(namespace_tables, 2, "{table_names:?}");
2429 assert!(!table_names.iter().any(|name| name == DEFAULT_TABLE_NAME));
2430
2431 assert_eq!(storage.count_namespace("kb:alpha").await.unwrap(), 1);
2432 assert_eq!(storage.count_namespace("kb:beta").await.unwrap(), 1);
2433 assert_eq!(
2434 storage
2435 .get_document("kb:alpha", "shared-id")
2436 .await
2437 .unwrap()
2438 .unwrap()
2439 .document,
2440 "alpha memory"
2441 );
2442 assert_eq!(
2443 storage
2444 .search_store(Some("kb:beta"), embedding, 10)
2445 .await
2446 .unwrap()
2447 .into_iter()
2448 .map(|doc| doc.document)
2449 .collect::<Vec<_>>(),
2450 vec!["beta memory"]
2451 );
2452 }
2453}