1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4 Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator,
5 RecordBatchReader, StringArray, UInt8Array,
6};
7use arrow_schema::{DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{NewColumnTransform, OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16use std::collections::HashMap;
17use std::fmt;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tracing::{debug, error, info, warn};
23use uuid::Uuid;
24
25use crate::rag::SliceLayer;
26
27pub const SCHEMA_VERSION: u32 = 4;
38pub const DEFAULT_TABLE_NAME: &str = "mcp_documents";
39const NAMESPACE_TABLE_PREFIX: &str = "mcp_documents__ns__";
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SchemaVersion {
43 V3,
44 V4,
45}
46
47impl SchemaVersion {
48 pub fn current() -> Self {
49 Self::V4
50 }
51}
52
53impl fmt::Display for SchemaVersion {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 Self::V3 => f.write_str("v3"),
57 Self::V4 => f.write_str("v4"),
58 }
59 }
60}
61
62impl FromStr for SchemaVersion {
63 type Err = anyhow::Error;
64
65 fn from_str(value: &str) -> Result<Self> {
66 match value.trim().to_ascii_lowercase().as_str() {
67 "3" | "v3" => Ok(Self::V3),
68 "4" | "v4" => Ok(Self::V4),
69 other => Err(anyhow!(
70 "unsupported schema target '{other}' (expected v3 or v4)"
71 )),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaMigrationReport {
78 pub target: SchemaVersion,
79 pub missing_columns: Vec<Field>,
80 pub applied: bool,
81}
82
83impl SchemaMigrationReport {
84 pub fn missing_column_names(&self) -> Vec<&str> {
85 self.missing_columns
86 .iter()
87 .map(|field| field.name().as_str())
88 .collect()
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct SchemaStatusReport {
94 pub schema_version: SchemaVersion,
95 pub expected_schema: SchemaVersion,
96 pub needs_migration: bool,
97 pub missing_columns: Vec<String>,
98 pub manifest_version: Option<u64>,
99}
100
101pub fn required_columns_for(version: SchemaVersion) -> Vec<Field> {
102 let mut fields = vec![Field::new("content_hash", DataType::Utf8, true)];
103 if matches!(version, SchemaVersion::V4) {
104 fields.push(Field::new("source_hash", DataType::Utf8, true));
105 }
106 fields
107}
108
109#[derive(Debug, Clone)]
110pub struct SchemaMismatchWriteError {
111 table_name: String,
112 db_path: String,
113 missing_columns: Vec<String>,
114 message: String,
115}
116
117impl SchemaMismatchWriteError {
118 fn new(
119 table_name: impl Into<String>,
120 db_path: impl Into<String>,
121 missing_columns: Vec<String>,
122 message: impl Into<String>,
123 ) -> Self {
124 Self {
125 table_name: table_name.into(),
126 db_path: db_path.into(),
127 missing_columns,
128 message: message.into(),
129 }
130 }
131
132 pub fn missing_columns(&self) -> &[String] {
133 &self.missing_columns
134 }
135
136 pub fn db_path(&self) -> &str {
137 &self.db_path
138 }
139
140 pub fn table_name(&self) -> &str {
141 &self.table_name
142 }
143
144 pub fn remediation(&self) -> String {
145 format!("rust-memex migrate-schema --db-path {}", self.db_path)
146 }
147}
148
149impl fmt::Display for SchemaMismatchWriteError {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 write!(
152 f,
153 "ERROR schema mismatch while writing Lance table '{}': missing columns {:?}. {}. Remediation: {}",
154 self.table_name,
155 self.missing_columns,
156 self.message,
157 self.remediation()
158 )
159 }
160}
161
162impl std::error::Error for SchemaMismatchWriteError {}
163
164fn is_schema_mismatch_message(message: &str) -> bool {
165 let lower = message.to_ascii_lowercase();
166 lower.contains("schema mismatch")
167 || lower.contains("append with different schema")
168 || lower.contains("fields did not match")
169 || lower.contains("missing=[")
170}
171
172fn extract_missing_columns(message: &str) -> Vec<String> {
173 let mut columns = Vec::new();
174 let mut rest = message;
175
176 while let Some(start) = rest.find("missing=[") {
177 rest = &rest[start + "missing=[".len()..];
178 let Some(end) = rest.find(']') else {
179 break;
180 };
181 let list = &rest[..end];
182 for item in list.split(',') {
183 let column = item
184 .trim()
185 .trim_matches('`')
186 .trim_matches('"')
187 .trim_matches('\'');
188 if !column.is_empty() && !columns.iter().any(|existing| existing == column) {
189 columns.push(column.to_string());
190 }
191 }
192 rest = &rest[end + 1..];
193 }
194
195 if columns.is_empty() {
196 for field in required_columns_for(SchemaVersion::current()) {
197 let name = field.name();
198 if message.contains(name) {
199 columns.push(name.to_string());
200 }
201 }
202 }
203
204 columns
205}
206
207#[derive(Debug, Serialize, Clone)]
228pub struct ChromaDocument {
229 pub id: String,
230 pub namespace: String,
231 pub embedding: Vec<f32>,
232 pub metadata: serde_json::Value,
233 pub document: String,
234 pub layer: u8,
236 pub parent_id: Option<String>,
238 pub children_ids: Vec<String>,
240 pub keywords: Vec<String>,
242 pub content_hash: Option<String>,
246 pub source_hash: Option<String>,
251}
252
253impl ChromaDocument {
254 pub fn new_flat(
256 id: String,
257 namespace: String,
258 embedding: Vec<f32>,
259 metadata: serde_json::Value,
260 document: String,
261 ) -> Self {
262 Self {
263 id,
264 namespace,
265 embedding,
266 metadata,
267 document,
268 layer: 0, parent_id: None,
270 children_ids: vec![],
271 keywords: vec![],
272 content_hash: None,
273 source_hash: None,
274 }
275 }
276
277 pub fn new_flat_with_hash(
281 id: String,
282 namespace: String,
283 embedding: Vec<f32>,
284 metadata: serde_json::Value,
285 document: String,
286 content_hash: String,
287 ) -> Self {
288 Self::new_flat_with_hashes(
289 id,
290 namespace,
291 embedding,
292 metadata,
293 document,
294 content_hash,
295 None,
296 )
297 }
298
299 pub fn new_flat_with_hashes(
301 id: String,
302 namespace: String,
303 embedding: Vec<f32>,
304 metadata: serde_json::Value,
305 document: String,
306 content_hash: String,
307 source_hash: Option<String>,
308 ) -> Self {
309 Self {
310 id,
311 namespace,
312 embedding,
313 metadata,
314 document,
315 layer: 0,
316 parent_id: None,
317 children_ids: vec![],
318 keywords: vec![],
319 content_hash: Some(content_hash),
320 source_hash,
321 }
322 }
323
324 pub fn from_onion_slice(
326 slice: &crate::rag::OnionSlice,
327 namespace: String,
328 embedding: Vec<f32>,
329 metadata: serde_json::Value,
330 ) -> Self {
331 Self {
332 id: slice.id.clone(),
333 namespace,
334 embedding,
335 metadata,
336 document: slice.content.clone(),
337 layer: slice.layer.as_u8(),
338 parent_id: slice.parent_id.clone(),
339 children_ids: slice.children_ids.clone(),
340 keywords: slice.keywords.clone(),
341 content_hash: None,
342 source_hash: None,
343 }
344 }
345
346 pub fn from_onion_slice_with_hash(
350 slice: &crate::rag::OnionSlice,
351 namespace: String,
352 embedding: Vec<f32>,
353 metadata: serde_json::Value,
354 content_hash: String,
355 ) -> Self {
356 Self::from_onion_slice_with_hashes(
357 slice,
358 namespace,
359 embedding,
360 metadata,
361 content_hash,
362 None,
363 )
364 }
365
366 pub fn from_onion_slice_with_hashes(
368 slice: &crate::rag::OnionSlice,
369 namespace: String,
370 embedding: Vec<f32>,
371 metadata: serde_json::Value,
372 content_hash: String,
373 source_hash: Option<String>,
374 ) -> Self {
375 Self {
376 id: slice.id.clone(),
377 namespace,
378 embedding,
379 metadata,
380 document: slice.content.clone(),
381 layer: slice.layer.as_u8(),
382 parent_id: slice.parent_id.clone(),
383 children_ids: slice.children_ids.clone(),
384 keywords: slice.keywords.clone(),
385 content_hash: Some(content_hash),
386 source_hash,
387 }
388 }
389
390 pub fn is_flat(&self) -> bool {
392 self.layer == 0
393 }
394
395 pub fn slice_layer(&self) -> Option<SliceLayer> {
397 SliceLayer::from_u8(self.layer)
398 }
399}
400
401pub struct StorageManager {
402 lance: Connection,
403 table: Arc<Mutex<Option<Table>>>,
404 namespace_tables: Arc<Mutex<HashMap<String, Table>>>,
405 collection_name: String,
406 lance_path: String,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
410#[serde(rename_all = "snake_case")]
411pub enum CrossStoreRecoveryStatus {
412 #[default]
413 Pending,
414 RolledBack,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossStoreRecoveryDocumentRef {
419 pub namespace: String,
420 pub id: String,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct CrossStoreRecoveryBatch {
425 pub batch_id: String,
426 pub created_at: String,
427 #[serde(default)]
428 pub status: CrossStoreRecoveryStatus,
429 #[serde(default)]
430 pub last_error: Option<String>,
431 pub documents: Vec<CrossStoreRecoveryDocumentRef>,
432}
433
434impl CrossStoreRecoveryBatch {
435 pub fn from_documents(documents: &[ChromaDocument]) -> Self {
436 Self {
437 batch_id: Uuid::new_v4().to_string(),
438 created_at: chrono::Utc::now().to_rfc3339(),
439 status: CrossStoreRecoveryStatus::Pending,
440 last_error: None,
441 documents: documents
442 .iter()
443 .map(|document| CrossStoreRecoveryDocumentRef {
444 namespace: document.namespace.clone(),
445 id: document.id.clone(),
446 })
447 .collect(),
448 }
449 }
450}
451
452type BatchIter = Box<dyn RecordBatchReader + Send + 'static>;
457
458impl StorageManager {
459 pub async fn new(db_path: &str) -> Result<Self> {
460 let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
462 let lance_path = if lance_env.trim().is_empty() {
463 shellexpand::tilde("~/.rmcp-servers/rust-memex/lancedb").to_string()
464 } else {
465 shellexpand::tilde(&lance_env).to_string()
466 };
467
468 let lance = connect(&lance_path).execute().await?;
469
470 Ok(Self {
471 lance,
472 table: Arc::new(Mutex::new(None)),
473 namespace_tables: Arc::new(Mutex::new(HashMap::new())),
474 collection_name: DEFAULT_TABLE_NAME.to_string(),
475 lance_path,
476 })
477 }
478
479 pub async fn new_lance_only(db_path: &str) -> Result<Self> {
482 let lance_path = shellexpand::tilde(db_path).to_string();
483 let lance = connect(&lance_path).execute().await?;
484
485 Ok(Self {
486 lance,
487 table: Arc::new(Mutex::new(None)),
488 namespace_tables: Arc::new(Mutex::new(HashMap::new())),
489 collection_name: DEFAULT_TABLE_NAME.to_string(),
490 lance_path,
491 })
492 }
493
494 pub fn lance_path(&self) -> &str {
495 &self.lance_path
496 }
497
498 pub async fn require_current_schema_for_writes(&self) -> Result<()> {
499 let Some(table) = self.open_table_if_exists().await? else {
500 return Ok(());
501 };
502 self.ensure_hash_schema_columns(&table).await
503 }
504
505 pub async fn schema_status(
506 &self,
507 expected_schema: SchemaVersion,
508 ) -> Result<SchemaStatusReport> {
509 let Some(table) = self.open_table_if_exists().await? else {
510 return Ok(SchemaStatusReport {
511 schema_version: expected_schema,
512 expected_schema,
513 needs_migration: false,
514 missing_columns: Vec::new(),
515 manifest_version: None,
516 });
517 };
518
519 let missing_columns = Self::missing_required_columns(&table, expected_schema)
520 .await?
521 .into_iter()
522 .map(|field| field.name().to_string())
523 .collect::<Vec<_>>();
524 let manifest_version = table
525 .list_versions()
526 .await
527 .ok()
528 .and_then(|versions| versions.iter().map(|version| version.version).max());
529
530 Ok(SchemaStatusReport {
531 schema_version: if missing_columns.is_empty() {
532 expected_schema
533 } else {
534 SchemaVersion::V3
535 },
536 expected_schema,
537 needs_migration: !missing_columns.is_empty(),
538 missing_columns,
539 manifest_version,
540 })
541 }
542
543 pub async fn missing_required_columns(
544 table: &Table,
545 target: SchemaVersion,
546 ) -> Result<Vec<Field>> {
547 let schema = table.schema().await?;
548 Ok(required_columns_for(target)
549 .into_iter()
550 .filter(|field| schema.field_with_name(field.name()).is_err())
551 .collect())
552 }
553
554 pub async fn migrate_lance_schema(
555 db_path: &str,
556 target: SchemaVersion,
557 check_only: bool,
558 ) -> Result<SchemaMigrationReport> {
559 let lance_path = shellexpand::tilde(db_path).to_string();
560 let lance = connect(&lance_path).execute().await?;
561 let table = lance.open_table(DEFAULT_TABLE_NAME).execute().await?;
562 let missing = Self::missing_required_columns(&table, target).await?;
563
564 if missing.is_empty() || check_only {
565 return Ok(SchemaMigrationReport {
566 target,
567 missing_columns: missing,
568 applied: false,
569 });
570 }
571
572 let transform = NewColumnTransform::AllNulls(Arc::new(Schema::new(missing.clone())));
573 if let Err(error) = table.add_columns(transform, None).await {
574 let _ = table.checkout_latest().await;
575 let remaining = Self::missing_required_columns(&table, target).await?;
576 if remaining.is_empty() {
577 warn!(
578 "Lance table '{}' schema migration raced with another writer and is already complete",
579 DEFAULT_TABLE_NAME
580 );
581 return Ok(SchemaMigrationReport {
582 target,
583 missing_columns: missing,
584 applied: true,
585 });
586 }
587 return Err(anyhow!(
588 "failed to migrate Lance table '{}' schema to {target}: {error}",
589 DEFAULT_TABLE_NAME
590 ));
591 }
592
593 let _ = table.checkout_latest().await;
594 Ok(SchemaMigrationReport {
595 target,
596 missing_columns: missing,
597 applied: true,
598 })
599 }
600
601 pub fn cross_store_recovery_dir(&self) -> PathBuf {
602 let db_path = Path::new(&self.lance_path);
603 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
604 let stem = db_path
605 .file_name()
606 .and_then(|name| name.to_str())
607 .unwrap_or("lancedb");
608 parent.join(format!(".{stem}-cross-store-recovery"))
609 }
610
611 fn cross_store_recovery_batch_path(&self, batch_id: &str) -> PathBuf {
612 self.cross_store_recovery_dir()
613 .join(format!("{batch_id}.json"))
614 }
615
616 fn write_cross_store_recovery_batch(&self, batch: &CrossStoreRecoveryBatch) -> Result<PathBuf> {
617 let dir = self.cross_store_recovery_dir();
618 std::fs::create_dir_all(&dir)?;
619
620 let path = self.cross_store_recovery_batch_path(&batch.batch_id);
621 let tmp_path = path.with_extension("json.tmp");
622 let payload = serde_json::to_vec_pretty(batch)?;
623
624 std::fs::write(&tmp_path, payload)?;
625 std::fs::rename(&tmp_path, &path)?;
626
627 Ok(path)
628 }
629
630 pub fn persist_cross_store_recovery_batch(
631 &self,
632 batch: &CrossStoreRecoveryBatch,
633 ) -> Result<PathBuf> {
634 self.write_cross_store_recovery_batch(batch)
635 }
636
637 pub fn update_cross_store_recovery_batch(
638 &self,
639 batch: &CrossStoreRecoveryBatch,
640 ) -> Result<PathBuf> {
641 self.write_cross_store_recovery_batch(batch)
642 }
643
644 pub fn clear_cross_store_recovery_batch(&self, batch_id: &str) -> Result<()> {
645 let path = self.cross_store_recovery_batch_path(batch_id);
646 match std::fs::remove_file(path) {
647 Ok(()) => Ok(()),
648 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
649 Err(error) => Err(error.into()),
650 }
651 }
652
653 pub fn list_cross_store_recovery_batches(&self) -> Result<Vec<CrossStoreRecoveryBatch>> {
654 let dir = self.cross_store_recovery_dir();
655 if !dir.exists() {
656 return Ok(vec![]);
657 }
658
659 let mut batches = Vec::new();
660 for entry in std::fs::read_dir(dir)? {
661 let entry = entry?;
662 let path = entry.path();
663 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
664 continue;
665 }
666
667 let payload = std::fs::read(&path)?;
668 let batch: CrossStoreRecoveryBatch = serde_json::from_slice(&payload)?;
669 batches.push(batch);
670 }
671
672 batches.sort_by(|left, right| left.created_at.cmp(&right.created_at));
673 Ok(batches)
674 }
675
676 pub async fn refresh(&self) -> Result<()> {
679 let mut guard = self.table.lock().await;
680 *guard = None;
681 self.namespace_tables.lock().await.clear();
682 tracing::info!("LanceDB table cache cleared - will refresh on next query");
683 Ok(())
684 }
685
686 pub async fn ensure_collection(&self) -> Result<()> {
687 let mut guard = self.table.lock().await;
689 if guard.is_some() {
690 return Ok(());
691 }
692 match self
693 .lance
694 .open_table(self.collection_name.as_str())
695 .execute()
696 .await
697 {
698 Ok(table) => {
699 *guard = Some(table);
700 info!("Found existing Lance table '{}'", self.collection_name);
701 }
702 Err(_) => {
703 info!(
704 "Lance table '{}' will be created on first insert",
705 self.collection_name
706 );
707 }
708 }
709 Ok(())
710 }
711
712 pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
713 if documents.is_empty() {
714 return Ok(());
715 }
716
717 let dim = documents
719 .first()
720 .ok_or_else(|| anyhow!("No documents to add"))?
721 .embedding
722 .len();
723 if dim == 0 {
724 return Err(anyhow!("Embedding dimension is zero"));
725 }
726
727 for (i, doc) in documents.iter().enumerate() {
729 if doc.embedding.len() != dim {
730 return Err(anyhow!(
731 "Document {} has inconsistent embedding dimension: expected {}, got {}. \
732 Aborting batch to prevent database corruption.",
733 i,
734 dim,
735 doc.embedding.len()
736 ));
737 }
738 if doc.id.is_empty() {
739 return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
740 }
741 if doc.namespace.is_empty() {
742 return Err(anyhow!(
743 "Document {} has empty namespace. Aborting batch.",
744 i
745 ));
746 }
747 for (j, &val) in doc.embedding.iter().enumerate() {
749 if val.is_nan() || val.is_infinite() {
750 return Err(anyhow!(
751 "Document {} has invalid embedding value at index {}: {}. \
752 Aborting batch to prevent database corruption.",
753 i,
754 j,
755 val
756 ));
757 }
758 }
759 }
760
761 let mut by_namespace: HashMap<String, Vec<ChromaDocument>> = HashMap::new();
762 for document in documents {
763 by_namespace
764 .entry(document.namespace.clone())
765 .or_default()
766 .push(document);
767 }
768
769 let mut inserted = 0usize;
770 for (namespace, docs) in by_namespace {
771 let table = self.ensure_namespace_table(&namespace, dim).await?;
772 self.ensure_hash_schema_columns(&table).await?;
773 let batch = self.docs_to_batch(&docs, dim)?;
774 if let Err(error) = table.add(batch).execute().await {
775 return Err(self.map_lancedb_write_error(error));
776 }
777 inserted += docs.len();
778 }
779 debug!("Inserted {} documents into Lance (validated)", inserted);
780 Ok(())
781 }
782
783 pub async fn search_store(
784 &self,
785 namespace: Option<&str>,
786 embedding: Vec<f32>,
787 k: usize,
788 ) -> Result<Vec<ChromaDocument>> {
789 if embedding.is_empty() {
790 return Ok(vec![]);
791 }
792 let mut results = Vec::new();
793 if let Some(ns) = namespace {
794 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
795 let mut stream = table
796 .query()
797 .nearest_to(embedding.clone())?
798 .limit(k)
799 .execute()
800 .await?;
801 while let Some(batch) = stream.try_next().await? {
802 let mut docs = self.batch_to_docs(&batch)?;
803 results.append(&mut docs);
804 }
805 }
806 if let Some(table) = self.legacy_table_if_exists().await? {
807 let mut stream = table
808 .query()
809 .only_if(self.namespace_filter(ns).as_str())
810 .nearest_to(embedding)?
811 .limit(k)
812 .execute()
813 .await?;
814 while let Some(batch) = stream.try_next().await? {
815 let mut docs = self.batch_to_docs(&batch)?;
816 results.append(&mut docs);
817 }
818 }
819 results.truncate(k);
820 } else {
821 for table_name in self.data_table_names().await? {
822 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
823 continue;
824 };
825 let mut stream = table
826 .query()
827 .nearest_to(embedding.clone())?
828 .limit(k)
829 .execute()
830 .await?;
831 while let Some(batch) = stream.try_next().await? {
832 let mut docs = self.batch_to_docs(&batch)?;
833 results.append(&mut docs);
834 }
835 }
836 results.truncate(k);
837 }
838 debug!("Lance returned {} results", results.len());
839 Ok(results)
840 }
841
842 pub async fn all_documents_page(
848 &self,
849 namespace: Option<&str>,
850 offset: usize,
851 limit: usize,
852 ) -> Result<Vec<ChromaDocument>> {
853 let mut results = Vec::new();
854 if let Some(ns) = namespace {
855 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
856 results.append(
857 &mut self
858 .query_table_page(&table, None, 0, offset + limit)
859 .await?,
860 );
861 }
862 if let Some(table) = self.legacy_table_if_exists().await? {
863 results.append(
864 &mut self
865 .query_table_page(
866 &table,
867 Some(self.namespace_filter(ns)),
868 0,
869 offset + limit,
870 )
871 .await?,
872 );
873 }
874 } else {
875 for table_name in self.data_table_names().await? {
876 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
877 continue;
878 };
879 results.append(
880 &mut self
881 .query_table_page(&table, None, 0, offset + limit)
882 .await?,
883 );
884 }
885 }
886
887 Ok(results.into_iter().skip(offset).take(limit).collect())
888 }
889
890 pub async fn all_documents(
894 &self,
895 namespace: Option<&str>,
896 limit: usize,
897 ) -> Result<Vec<ChromaDocument>> {
898 self.all_documents_page(namespace, 0, limit).await
899 }
900
901 pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
902 let id_filter = self.id_filter(id);
903 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
904 let mut stream = table
905 .query()
906 .only_if(id_filter.as_str())
907 .limit(1)
908 .execute()
909 .await?;
910 if let Some(batch) = stream.try_next().await? {
911 let mut docs = self.batch_to_docs(&batch)?;
912 if let Some(doc) = docs.pop() {
913 return Ok(Some(doc));
914 }
915 }
916 }
917
918 if let Some(table) = self.legacy_table_if_exists().await? {
919 let filter = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
920 let mut stream = table
921 .query()
922 .only_if(filter.as_str())
923 .limit(1)
924 .execute()
925 .await?;
926 if let Some(batch) = stream.try_next().await? {
927 let mut docs = self.batch_to_docs(&batch)?;
928 if let Some(doc) = docs.pop() {
929 return Ok(Some(doc));
930 }
931 }
932 }
933 Ok(None)
934 }
935
936 pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
937 let mut deleted = 0usize;
938 let id_filter = self.id_filter(id);
939
940 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
941 let pre_count = table.count_rows(Some(id_filter.clone())).await?;
942 if pre_count > 0 {
943 table.delete(id_filter.as_str()).await?;
944 deleted += pre_count;
945 }
946 }
947
948 if let Some(table) = self.legacy_table_if_exists().await? {
949 let predicate = format!("{} AND {}", self.namespace_filter(namespace), id_filter);
950 let pre_count = table.count_rows(Some(predicate.clone())).await?;
951 if pre_count > 0 {
952 table.delete(predicate.as_str()).await?;
953 deleted += pre_count;
954 }
955 }
956 Ok(deleted)
957 }
958
959 pub async fn delete_documents(&self, namespace: &str, ids: &[&str]) -> Result<usize> {
966 if ids.is_empty() {
967 return Ok(0);
968 }
969 const CHUNK: usize = 500;
970 let mut total_deleted = 0usize;
971 for batch in ids.chunks(CHUNK) {
972 let id_list = batch
973 .iter()
974 .map(|id| format!("'{}'", id.replace('\'', "''")))
975 .collect::<Vec<_>>()
976 .join(", ");
977 let id_predicate = format!("id IN ({})", id_list);
978 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
979 let pre_count = table.count_rows(Some(id_predicate.clone())).await?;
980 if pre_count > 0 {
981 table.delete(id_predicate.as_str()).await?;
982 total_deleted += pre_count;
983 }
984 }
985 if let Some(table) = self.legacy_table_if_exists().await? {
986 let predicate =
987 format!("{} AND {}", self.namespace_filter(namespace), id_predicate);
988 let pre_count = table.count_rows(Some(predicate.clone())).await?;
989 if pre_count > 0 {
990 table.delete(predicate.as_str()).await?;
991 total_deleted += pre_count;
992 }
993 }
994 }
995 Ok(total_deleted)
996 }
997
998 pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
999 let mut deleted = 0usize;
1000 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1001 let pre_count = table.count_rows(None).await?;
1002 if pre_count > 0 {
1003 table
1004 .delete(self.namespace_filter(namespace).as_str())
1005 .await?;
1006 deleted += pre_count;
1007 }
1008 }
1009 if let Some(table) = self.legacy_table_if_exists().await? {
1010 let predicate = self.namespace_filter(namespace);
1011 let pre_count = table.count_rows(Some(predicate.clone())).await?;
1012 if pre_count > 0 {
1013 table.delete(predicate.as_str()).await?;
1014 deleted += pre_count;
1015 }
1016 }
1017 Ok(deleted)
1018 }
1019
1020 pub async fn rename_namespace_atomic(&self, from: &str, to: &str) -> Result<usize> {
1021 if from == to {
1022 return Ok(0);
1023 }
1024
1025 let source_count = self.count_namespace(from).await?;
1026 if source_count == 0 {
1027 return Ok(0);
1028 }
1029
1030 let target_count = self.count_namespace(to).await?;
1031 if target_count > 0 {
1032 return Err(anyhow!(
1033 "Target namespace '{}' already exists with {} rows",
1034 to,
1035 target_count
1036 ));
1037 }
1038
1039 let mut docs = self.all_documents(Some(from), source_count).await?;
1040 for doc in &mut docs {
1041 doc.namespace = to.to_string();
1042 }
1043 self.add_to_store(docs).await?;
1044 let deleted = self.delete_namespace_documents(from).await?;
1045
1046 Ok(deleted)
1047 }
1048
1049 pub fn get_collection_name(&self) -> &str {
1050 &self.collection_name
1051 }
1052
1053 fn namespace_table_name(namespace: &str) -> String {
1054 let mut safe = namespace
1055 .chars()
1056 .map(|ch| {
1057 if ch.is_ascii_alphanumeric() {
1058 ch.to_ascii_lowercase()
1059 } else {
1060 '_'
1061 }
1062 })
1063 .collect::<String>();
1064 while safe.contains("__") {
1065 safe = safe.replace("__", "_");
1066 }
1067 let safe = safe.trim_matches('_');
1068 let safe = if safe.is_empty() { "default" } else { safe };
1069 let safe = safe.chars().take(48).collect::<String>();
1070 let hash = Sha256::digest(namespace.as_bytes());
1071 let suffix = hash[..6]
1072 .iter()
1073 .map(|byte| format!("{byte:02x}"))
1074 .collect::<String>();
1075 format!("{NAMESPACE_TABLE_PREFIX}{safe}_{suffix}")
1076 }
1077
1078 fn is_namespace_table_name(table_name: &str) -> bool {
1079 table_name.starts_with(NAMESPACE_TABLE_PREFIX)
1080 }
1081
1082 async fn data_table_names(&self) -> Result<Vec<String>> {
1083 let table_names = self.lance.table_names().execute().await?;
1084 Ok(table_names
1085 .into_iter()
1086 .filter(|name| name == DEFAULT_TABLE_NAME || Self::is_namespace_table_name(name))
1087 .collect())
1088 }
1089
1090 async fn open_named_table_if_exists(&self, table_name: &str) -> Result<Option<Table>> {
1091 match self.lance.open_table(table_name).execute().await {
1092 Ok(table) => Ok(Some(table)),
1093 Err(e) => {
1094 let msg = e.to_string().to_lowercase();
1095 if msg.contains("not found")
1096 || msg.contains("does not exist")
1097 || msg.contains("no such file")
1098 {
1099 Ok(None)
1100 } else {
1101 Err(anyhow!("LanceDB error on table '{}': {}", table_name, e))
1102 }
1103 }
1104 }
1105 }
1106
1107 async fn open_namespace_table_if_exists(&self, namespace: &str) -> Result<Option<Table>> {
1108 let table_name = Self::namespace_table_name(namespace);
1109 if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1110 return Ok(Some(table));
1111 }
1112 let table = self.open_named_table_if_exists(&table_name).await?;
1113 if let Some(table) = &table {
1114 self.namespace_tables
1115 .lock()
1116 .await
1117 .insert(table_name, table.clone());
1118 }
1119 Ok(table)
1120 }
1121
1122 async fn ensure_namespace_table(&self, namespace: &str, dim: usize) -> Result<Table> {
1123 let table_name = Self::namespace_table_name(namespace);
1124 if let Some(table) = self.namespace_tables.lock().await.get(&table_name).cloned() {
1125 return Ok(table);
1126 }
1127
1128 let table = match self.open_named_table_if_exists(&table_name).await? {
1129 Some(table) => table,
1130 None => {
1131 if dim == 0 {
1132 return Err(anyhow!(
1133 "Vector table '{}' not found and dimension is unknown",
1134 table_name
1135 ));
1136 }
1137 info!(
1138 "Creating Lance namespace table '{}' for '{}' with vector dimension {} (schema v{})",
1139 table_name, namespace, dim, SCHEMA_VERSION
1140 );
1141 let schema = Arc::new(Self::create_schema(dim));
1142 self.lance
1143 .create_empty_table(table_name.as_str(), schema)
1144 .execute()
1145 .await?
1146 }
1147 };
1148
1149 self.namespace_tables
1150 .lock()
1151 .await
1152 .insert(table_name, table.clone());
1153 Ok(table)
1154 }
1155
1156 async fn query_table_page(
1157 &self,
1158 table: &Table,
1159 filter: Option<String>,
1160 offset: usize,
1161 limit: usize,
1162 ) -> Result<Vec<ChromaDocument>> {
1163 let mut query = table.query().limit(limit).offset(offset);
1164 if let Some(filter) = filter {
1165 query = query.only_if(filter.as_str());
1166 }
1167 let mut stream = query.execute().await?;
1168 let mut results = Vec::new();
1169 while let Some(batch) = stream.try_next().await? {
1170 let mut docs = self.batch_to_docs(&batch)?;
1171 results.append(&mut docs);
1172 }
1173 Ok(results)
1174 }
1175
1176 async fn legacy_table_if_exists(&self) -> Result<Option<Table>> {
1177 self.open_table_if_exists().await
1178 }
1179
1180 async fn open_table_if_exists(&self) -> Result<Option<Table>> {
1184 let mut guard = self.table.lock().await;
1185 if let Some(table) = guard.as_ref() {
1186 return Ok(Some(table.clone()));
1187 }
1188
1189 match self
1190 .lance
1191 .open_table(self.collection_name.as_str())
1192 .execute()
1193 .await
1194 {
1195 Ok(tbl) => {
1196 *guard = Some(tbl.clone());
1197 Ok(Some(tbl))
1198 }
1199 Err(e) => {
1200 let msg = e.to_string().to_lowercase();
1201 if msg.contains("not found")
1202 || msg.contains("does not exist")
1203 || msg.contains("no such file")
1204 {
1205 Ok(None)
1206 } else {
1207 tracing::warn!(
1208 "LanceDB error opening table '{}': {}",
1209 self.collection_name,
1210 e
1211 );
1212 Err(anyhow!(
1213 "LanceDB error on table '{}': {}",
1214 self.collection_name,
1215 e
1216 ))
1217 }
1218 }
1219 }
1220 }
1221
1222 fn create_schema(dim: usize) -> Schema {
1224 Schema::new(vec![
1225 Field::new("id", DataType::Utf8, false),
1226 Field::new("namespace", DataType::Utf8, false),
1227 Field::new(
1228 "vector",
1229 DataType::FixedSizeList(
1230 Arc::new(Field::new("item", DataType::Float32, true)),
1231 dim as i32,
1232 ),
1233 false,
1234 ),
1235 Field::new("text", DataType::Utf8, true),
1236 Field::new("metadata", DataType::Utf8, true),
1237 Field::new("layer", DataType::UInt8, true), Field::new("parent_id", DataType::Utf8, true), Field::new("children_ids", DataType::Utf8, true), Field::new("keywords", DataType::Utf8, true), Field::new("content_hash", DataType::Utf8, true), Field::new("source_hash", DataType::Utf8, true), ])
1249 }
1250
1251 async fn ensure_hash_schema_columns(&self, table: &Table) -> Result<()> {
1252 let missing = Self::missing_required_columns(table, SchemaVersion::current()).await?;
1253
1254 if missing.is_empty() {
1255 return Ok(());
1256 }
1257
1258 let missing_columns = missing
1259 .iter()
1260 .map(|field| field.name().to_string())
1261 .collect::<Vec<_>>();
1262 let error = SchemaMismatchWriteError::new(
1263 self.collection_name.clone(),
1264 self.lance_path.clone(),
1265 missing_columns,
1266 "table is older than the current writer schema",
1267 );
1268 self.log_schema_mismatch(&error);
1269 Err(error.into())
1270 }
1271
1272 fn map_lancedb_write_error(&self, error: lancedb::error::Error) -> anyhow::Error {
1273 let message = match &error {
1274 lancedb::error::Error::Lance { source } => source.to_string(),
1275 lancedb::error::Error::Schema { message } => message.clone(),
1276 lancedb::error::Error::Arrow { source } => source.to_string(),
1277 _ => return error.into(),
1278 };
1279
1280 if !is_schema_mismatch_message(&message) {
1281 return error.into();
1282 }
1283
1284 let missing_columns = extract_missing_columns(&message);
1285 let error = SchemaMismatchWriteError::new(
1286 self.collection_name.clone(),
1287 self.lance_path.clone(),
1288 missing_columns,
1289 message,
1290 );
1291 self.log_schema_mismatch(&error);
1292 error.into()
1293 }
1294
1295 fn log_schema_mismatch(&self, error: &SchemaMismatchWriteError) {
1296 error!(
1297 error_kind = "schema_mismatch",
1298 table = %error.table_name(),
1299 db_path = %error.db_path(),
1300 missing_columns = ?error.missing_columns(),
1301 remediation = %error.remediation(),
1302 file = file!(),
1303 line = line!(),
1304 "write-path schema mismatch"
1305 );
1306 }
1307
1308 fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
1309 let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
1310 let namespaces = documents
1311 .iter()
1312 .map(|d| d.namespace.as_str())
1313 .collect::<Vec<_>>();
1314 let texts = documents
1315 .iter()
1316 .map(|d| d.document.as_str())
1317 .collect::<Vec<_>>();
1318 let metadata_strings = documents
1319 .iter()
1320 .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
1321 .collect::<Vec<_>>();
1322
1323 let vectors = documents.iter().map(|d| {
1324 if d.embedding.len() != dim {
1325 None
1326 } else {
1327 Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
1328 }
1329 });
1330
1331 let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
1333 let parent_ids: Vec<Option<&str>> =
1334 documents.iter().map(|d| d.parent_id.as_deref()).collect();
1335 let children_ids_json: Vec<String> = documents
1336 .iter()
1337 .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
1338 .collect();
1339 let keywords_json: Vec<String> = documents
1340 .iter()
1341 .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
1342 .collect();
1343 let content_hashes: Vec<Option<&str>> = documents
1345 .iter()
1346 .map(|d| d.content_hash.as_deref())
1347 .collect();
1348 let source_hashes: Vec<Option<&str>> =
1350 documents.iter().map(|d| d.source_hash.as_deref()).collect();
1351
1352 let schema = Arc::new(Self::create_schema(dim));
1353
1354 let batch = RecordBatch::try_new(
1355 schema.clone(),
1356 vec![
1357 Arc::new(StringArray::from(ids)),
1358 Arc::new(StringArray::from(namespaces)),
1359 Arc::new(
1360 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
1361 vectors, dim as i32,
1362 ),
1363 ),
1364 Arc::new(StringArray::from(texts)),
1365 Arc::new(StringArray::from(metadata_strings)),
1366 Arc::new(UInt8Array::from(layers)),
1368 Arc::new(StringArray::from(parent_ids)),
1369 Arc::new(StringArray::from(
1370 children_ids_json
1371 .iter()
1372 .map(|s| s.as_str())
1373 .collect::<Vec<_>>(),
1374 )),
1375 Arc::new(StringArray::from(
1376 keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
1377 )),
1378 Arc::new(StringArray::from(content_hashes)),
1380 Arc::new(StringArray::from(source_hashes)),
1382 ],
1383 )?;
1384
1385 Ok(Box::new(RecordBatchIterator::new(
1386 vec![Ok(batch)].into_iter(),
1387 schema,
1388 )))
1389 }
1390
1391 fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
1392 let id_col = batch
1393 .column_by_name("id")
1394 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1395 .ok_or_else(|| anyhow!("Missing id column"))?;
1396 let ns_col = batch
1397 .column_by_name("namespace")
1398 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1399 .ok_or_else(|| anyhow!("Missing namespace column"))?;
1400 let text_col = batch
1401 .column_by_name("text")
1402 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1403 .ok_or_else(|| anyhow!("Missing text column"))?;
1404 let metadata_col = batch
1405 .column_by_name("metadata")
1406 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1407 .ok_or_else(|| anyhow!("Missing metadata column"))?;
1408 let vector_col = batch
1409 .column_by_name("vector")
1410 .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
1411 .ok_or_else(|| anyhow!("Missing vector column"))?;
1412
1413 let layer_col = batch
1415 .column_by_name("layer")
1416 .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
1417 let parent_id_col = batch
1418 .column_by_name("parent_id")
1419 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1420 let children_ids_col = batch
1421 .column_by_name("children_ids")
1422 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1423 let keywords_col = batch
1424 .column_by_name("keywords")
1425 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1426 let content_hash_col = batch
1428 .column_by_name("content_hash")
1429 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1430 let source_hash_col = batch
1432 .column_by_name("source_hash")
1433 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1434
1435 let dim = vector_col.value_length() as usize;
1436 let values = vector_col
1437 .values()
1438 .as_any()
1439 .downcast_ref::<Float32Array>()
1440 .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
1441
1442 let mut docs = Vec::new();
1443 for i in 0..batch.num_rows() {
1444 let id = id_col.value(i).to_string();
1445 let text = text_col.value(i).to_string();
1446 let namespace = ns_col.value(i).to_string();
1447 let meta_str = metadata_col.value(i);
1448 let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
1449
1450 let offset = i * dim;
1451 let mut emb = Vec::with_capacity(dim);
1452 for j in 0..dim {
1453 emb.push(values.value(offset + j));
1454 }
1455
1456 let layer = layer_col
1458 .and_then(|col| {
1459 if col.is_null(i) {
1460 None
1461 } else {
1462 Some(col.value(i))
1463 }
1464 })
1465 .unwrap_or(0);
1466
1467 let parent_id = parent_id_col.and_then(|col| {
1468 if col.is_null(i) {
1469 None
1470 } else {
1471 Some(col.value(i).to_string())
1472 }
1473 });
1474
1475 let children_ids: Vec<String> = children_ids_col
1476 .and_then(|col| {
1477 if col.is_null(i) {
1478 None
1479 } else {
1480 serde_json::from_str(col.value(i)).ok()
1481 }
1482 })
1483 .unwrap_or_default();
1484
1485 let keywords: Vec<String> = keywords_col
1486 .and_then(|col| {
1487 if col.is_null(i) {
1488 None
1489 } else {
1490 serde_json::from_str(col.value(i)).ok()
1491 }
1492 })
1493 .unwrap_or_default();
1494
1495 let content_hash = content_hash_col.and_then(|col| {
1496 if col.is_null(i) {
1497 None
1498 } else {
1499 Some(col.value(i).to_string())
1500 }
1501 });
1502
1503 let source_hash = source_hash_col.and_then(|col| {
1504 if col.is_null(i) {
1505 None
1506 } else {
1507 Some(col.value(i).to_string())
1508 }
1509 });
1510
1511 docs.push(ChromaDocument {
1512 id,
1513 namespace,
1514 embedding: emb,
1515 metadata,
1516 document: text,
1517 layer,
1518 parent_id,
1519 children_ids,
1520 keywords,
1521 content_hash,
1522 source_hash,
1523 });
1524 }
1525 Ok(docs)
1526 }
1527
1528 pub async fn get_filtered_in_namespace(
1529 &self,
1530 namespace: &str,
1531 filter: &str,
1532 ) -> Result<Vec<ChromaDocument>> {
1533 let mut results = Vec::new();
1534 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1535 let mut stream = table.query().only_if(filter).execute().await?;
1536 while let Some(batch) = stream.try_next().await? {
1537 let mut docs = self.batch_to_docs(&batch)?;
1538 results.append(&mut docs);
1539 }
1540 }
1541 if let Some(table) = self.legacy_table_if_exists().await? {
1542 let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
1543 let mut stream = table.query().only_if(combined.as_str()).execute().await?;
1544 while let Some(batch) = stream.try_next().await? {
1545 let mut docs = self.batch_to_docs(&batch)?;
1546 results.append(&mut docs);
1547 }
1548 }
1549 Ok(results)
1550 }
1551
1552 pub async fn search_store_with_layer(
1554 &self,
1555 namespace: Option<&str>,
1556 embedding: Vec<f32>,
1557 k: usize,
1558 layer_filter: Option<SliceLayer>,
1559 ) -> Result<Vec<ChromaDocument>> {
1560 if embedding.is_empty() {
1561 return Ok(vec![]);
1562 }
1563 let mut filters = Vec::new();
1565 if let Some(layer) = layer_filter {
1566 filters.push(self.layer_filter(layer));
1567 }
1568 let mut results = Vec::new();
1569
1570 if let Some(ns) = namespace {
1571 if let Some(table) = self.open_namespace_table_if_exists(ns).await? {
1572 let mut query = table.query();
1573 if !filters.is_empty() {
1574 let combined = filters.join(" AND ");
1575 query = query.only_if(combined.as_str());
1576 }
1577 let mut stream = query
1578 .nearest_to(embedding.clone())?
1579 .limit(k)
1580 .execute()
1581 .await?;
1582 while let Some(batch) = stream.try_next().await? {
1583 let mut docs = self.batch_to_docs(&batch)?;
1584 results.append(&mut docs);
1585 }
1586 }
1587 if let Some(table) = self.legacy_table_if_exists().await? {
1588 let mut legacy_filters = vec![self.namespace_filter(ns)];
1589 legacy_filters.extend(filters.clone());
1590 let combined = legacy_filters.join(" AND ");
1591 let mut stream = table
1592 .query()
1593 .only_if(combined.as_str())
1594 .nearest_to(embedding)?
1595 .limit(k)
1596 .execute()
1597 .await?;
1598 while let Some(batch) = stream.try_next().await? {
1599 let mut docs = self.batch_to_docs(&batch)?;
1600 results.append(&mut docs);
1601 }
1602 }
1603 results.truncate(k);
1604 } else {
1605 for table_name in self.data_table_names().await? {
1606 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1607 continue;
1608 };
1609 let mut query = table.query();
1610 if !filters.is_empty() {
1611 let combined = filters.join(" AND ");
1612 query = query.only_if(combined.as_str());
1613 }
1614 let mut stream = query
1615 .nearest_to(embedding.clone())?
1616 .limit(k)
1617 .execute()
1618 .await?;
1619 while let Some(batch) = stream.try_next().await? {
1620 let mut docs = self.batch_to_docs(&batch)?;
1621 results.append(&mut docs);
1622 }
1623 }
1624 results.truncate(k);
1625 }
1626 debug!(
1627 "Lance returned {} results (layer filter: {:?})",
1628 results.len(),
1629 layer_filter
1630 );
1631 Ok(results)
1632 }
1633
1634 pub async fn get_children(
1636 &self,
1637 namespace: &str,
1638 parent_id: &str,
1639 ) -> Result<Vec<ChromaDocument>> {
1640 if self.open_table_if_exists().await?.is_none() {
1641 return Ok(vec![]);
1642 }
1643
1644 if let Some(parent) = self.get_document(namespace, parent_id).await? {
1646 if parent.children_ids.is_empty() {
1647 return Ok(vec![]);
1648 }
1649
1650 let mut children = Vec::new();
1652 for child_id in &parent.children_ids {
1653 if let Some(child) = self.get_document(namespace, child_id).await? {
1654 children.push(child);
1655 }
1656 }
1657 return Ok(children);
1658 }
1659
1660 Ok(vec![])
1661 }
1662
1663 pub async fn get_parent(
1665 &self,
1666 namespace: &str,
1667 child_id: &str,
1668 ) -> Result<Option<ChromaDocument>> {
1669 if let Some(child) = self.get_document(namespace, child_id).await?
1670 && let Some(ref parent_id) = child.parent_id
1671 {
1672 return self.get_document(namespace, parent_id).await;
1673 }
1674 Ok(None)
1675 }
1676
1677 fn namespace_filter(&self, namespace: &str) -> String {
1678 format!("namespace = '{}'", namespace.replace('\'', "''"))
1679 }
1680
1681 fn id_filter(&self, id: &str) -> String {
1682 format!("id = '{}'", id.replace('\'', "''"))
1683 }
1684
1685 fn layer_filter(&self, layer: SliceLayer) -> String {
1686 if layer == SliceLayer::Outer {
1687 "(layer = 0 OR layer = 1)".to_string()
1689 } else {
1690 format!("layer = {}", layer.as_u8())
1691 }
1692 }
1693
1694 fn content_hash_filter(&self, hash: &str) -> String {
1695 format!("content_hash = '{}'", hash.replace('\'', "''"))
1696 }
1697
1698 fn source_hash_filter(&self, hash: &str) -> String {
1699 format!("source_hash = '{}'", hash.replace('\'', "''"))
1700 }
1701
1702 async fn table_has_content_hash(table: &Table) -> bool {
1704 table
1705 .schema()
1706 .await
1707 .map(|schema| schema.field_with_name("content_hash").is_ok())
1708 .unwrap_or(false)
1709 }
1710
1711 async fn table_has_source_hash(table: &Table) -> bool {
1713 table
1714 .schema()
1715 .await
1716 .map(|schema| schema.field_with_name("source_hash").is_ok())
1717 .unwrap_or(false)
1718 }
1719
1720 pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1726 let hash_filter = self.content_hash_filter(hash);
1727 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1728 && Self::table_has_content_hash(&table).await
1729 {
1730 let mut stream = table
1731 .query()
1732 .only_if(hash_filter.as_str())
1733 .limit(1)
1734 .execute()
1735 .await?;
1736 if let Some(batch) = stream.try_next().await? {
1737 return Ok(batch.num_rows() > 0);
1738 }
1739 }
1740
1741 if let Some(table) = self.legacy_table_if_exists().await? {
1742 if !Self::table_has_content_hash(&table).await {
1743 tracing::warn!(
1744 "Table '{}' has old schema without content_hash column. \
1745 Deduplication disabled. Consider re-indexing with new schema.",
1746 self.collection_name
1747 );
1748 return Ok(false); }
1750
1751 let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1752 let mut stream = table
1753 .query()
1754 .only_if(filter.as_str())
1755 .limit(1)
1756 .execute()
1757 .await?;
1758
1759 if let Some(batch) = stream.try_next().await? {
1760 return Ok(batch.num_rows() > 0);
1761 }
1762 }
1763
1764 Ok(false)
1765 }
1766
1767 pub async fn has_source_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
1775 let hash_filter = self.source_hash_filter(hash);
1776 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1777 && Self::table_has_source_hash(&table).await
1778 {
1779 let mut stream = table
1780 .query()
1781 .only_if(hash_filter.as_str())
1782 .limit(1)
1783 .execute()
1784 .await?;
1785 if let Some(batch) = stream.try_next().await? {
1786 return Ok(batch.num_rows() > 0);
1787 }
1788 }
1789
1790 if let Some(table) = self.legacy_table_if_exists().await? {
1791 if !Self::table_has_source_hash(&table).await {
1792 tracing::debug!(
1793 "Table '{}' has pre-v4 schema without source_hash column. \
1794 Source-level dedup disabled until backfill.",
1795 self.collection_name
1796 );
1797 return Ok(false);
1798 }
1799
1800 let filter = format!("{} AND {}", self.namespace_filter(namespace), hash_filter);
1801 let mut stream = table
1802 .query()
1803 .only_if(filter.as_str())
1804 .limit(1)
1805 .execute()
1806 .await?;
1807
1808 if let Some(batch) = stream.try_next().await? {
1809 return Ok(batch.num_rows() > 0);
1810 }
1811 }
1812
1813 Ok(false)
1814 }
1815
1816 pub async fn filter_existing_hashes<'a>(
1821 &self,
1822 namespace: &str,
1823 hashes: &'a [String],
1824 ) -> Result<Vec<&'a String>> {
1825 if hashes.is_empty() {
1826 return Ok(vec![]);
1827 }
1828
1829 let hash_conditions: Vec<String> =
1832 hashes.iter().map(|h| self.content_hash_filter(h)).collect();
1833
1834 let mut existing_hashes = std::collections::HashSet::new();
1835
1836 if let Some(table) = self.open_namespace_table_if_exists(namespace).await?
1837 && Self::table_has_content_hash(&table).await
1838 {
1839 let filter = hash_conditions.join(" OR ");
1840 let mut stream = table
1841 .query()
1842 .only_if(filter.as_str())
1843 .limit(hashes.len())
1844 .execute()
1845 .await?;
1846 while let Some(batch) = stream.try_next().await? {
1847 if let Some(hash_col) = batch
1848 .column_by_name("content_hash")
1849 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1850 {
1851 for i in 0..batch.num_rows() {
1852 if !hash_col.is_null(i) {
1853 existing_hashes.insert(hash_col.value(i).to_string());
1854 }
1855 }
1856 }
1857 }
1858 }
1859
1860 if let Some(table) = self.legacy_table_if_exists().await? {
1861 if !Self::table_has_content_hash(&table).await {
1863 tracing::warn!(
1864 "Table '{}' has old schema without content_hash column. \
1865 Deduplication disabled. Consider re-indexing with new schema.",
1866 self.collection_name
1867 );
1868 return Ok(hashes.iter().collect()); }
1870
1871 let filter = format!(
1872 "{} AND ({})",
1873 self.namespace_filter(namespace),
1874 hash_conditions.join(" OR ")
1875 );
1876 let mut stream = table
1877 .query()
1878 .only_if(filter.as_str())
1879 .limit(hashes.len())
1880 .execute()
1881 .await?;
1882 while let Some(batch) = stream.try_next().await? {
1883 if let Some(hash_col) = batch
1884 .column_by_name("content_hash")
1885 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1886 {
1887 for i in 0..batch.num_rows() {
1888 if !hash_col.is_null(i) {
1889 existing_hashes.insert(hash_col.value(i).to_string());
1890 }
1891 }
1892 }
1893 }
1894 }
1895
1896 Ok(hashes
1898 .iter()
1899 .filter(|h| !existing_hashes.contains(h.as_str()))
1900 .collect())
1901 }
1902
1903 pub async fn optimize(&self) -> Result<OptimizeStats> {
1909 let mut stats = OptimizeStats {
1910 compaction: None,
1911 prune: None,
1912 };
1913 for table_name in self.data_table_names().await? {
1914 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1915 continue;
1916 };
1917 stats = table.optimize(OptimizeAction::All).await?;
1918 }
1919 info!(
1920 "Optimize complete: compaction={:?}, prune={:?}",
1921 stats.compaction, stats.prune
1922 );
1923 Ok(stats)
1924 }
1925
1926 pub async fn compact(&self) -> Result<OptimizeStats> {
1928 let mut stats = OptimizeStats {
1929 compaction: None,
1930 prune: None,
1931 };
1932 for table_name in self.data_table_names().await? {
1933 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1934 continue;
1935 };
1936 stats = table
1937 .optimize(OptimizeAction::Compact {
1938 options: Default::default(),
1939 remap_options: None,
1940 })
1941 .await?;
1942 }
1943 info!("Compaction complete: {:?}", stats.compaction);
1944 Ok(stats)
1945 }
1946
1947 pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
1949 let days = older_than_days.unwrap_or(7) as i64;
1950 let duration = chrono::TimeDelta::days(days);
1951 let mut stats = OptimizeStats {
1952 compaction: None,
1953 prune: None,
1954 };
1955 for table_name in self.data_table_names().await? {
1956 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
1957 continue;
1958 };
1959 stats = table
1960 .optimize(OptimizeAction::Prune {
1961 older_than: Some(duration),
1962 delete_unverified: Some(false),
1963 error_if_tagged_old_versions: None,
1964 })
1965 .await?;
1966 }
1967 info!("Cleanup complete: {:?}", stats.prune);
1968 Ok(stats)
1969 }
1970
1971 pub async fn stats(&self) -> Result<TableStats> {
1973 let table_names = self.data_table_names().await?;
1974 let mut row_count = 0usize;
1975 let mut version_count = 0usize;
1976
1977 for table_name in &table_names {
1978 let Some(table) = self.open_named_table_if_exists(table_name).await? else {
1979 continue;
1980 };
1981 row_count += table.count_rows(None).await.unwrap_or(0);
1982 version_count += table.list_versions().await.unwrap_or_default().len();
1983 }
1984
1985 Ok(TableStats {
1986 row_count,
1987 version_count,
1988 table_name: self.collection_name.clone(),
1989 db_path: self.lance_path.clone(),
1990 })
1991 }
1992
1993 pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1995 let mut count = 0usize;
1996 if let Some(table) = self.open_namespace_table_if_exists(namespace).await? {
1997 count += table.count_rows(None).await?;
1998 }
1999 if let Some(table) = self.legacy_table_if_exists().await? {
2000 let filter = self.namespace_filter(namespace);
2001 count += table.count_rows(Some(filter)).await?;
2002 }
2003 Ok(count)
2004 }
2005
2006 pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
2011 let results = self.all_documents(Some(namespace), 100_000).await?;
2012 debug!(
2013 "Retrieved {} documents from namespace '{}'",
2014 results.len(),
2015 namespace
2016 );
2017 Ok(results)
2018 }
2019
2020 pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
2022 let count = self.count_namespace(namespace).await?;
2023 Ok(count > 0)
2024 }
2025}
2026
2027#[derive(Debug, Clone, Serialize)]
2029pub struct TableStats {
2030 pub row_count: usize,
2031 pub version_count: usize,
2032 pub table_name: String,
2033 pub db_path: String,
2034}
2035
2036#[derive(Debug, Clone, Default, Serialize)]
2042pub struct GcStats {
2043 pub orphans_found: usize,
2045 pub orphans_removed: usize,
2047 pub empty_namespaces_found: usize,
2049 pub empty_namespaces_removed: usize,
2051 pub old_docs_found: usize,
2053 pub old_docs_removed: usize,
2055 pub bytes_freed: Option<u64>,
2057 pub empty_namespace_names: Vec<String>,
2059 pub affected_namespaces: Vec<String>,
2061}
2062
2063impl GcStats {
2064 pub fn has_issues(&self) -> bool {
2066 self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
2067 }
2068
2069 pub fn has_deletions(&self) -> bool {
2071 self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
2072 }
2073}
2074
2075#[derive(Debug, Clone)]
2077pub struct GcConfig {
2078 pub remove_orphans: bool,
2080 pub remove_empty: bool,
2082 pub older_than: Option<chrono::Duration>,
2084 pub dry_run: bool,
2086 pub namespace: Option<String>,
2088}
2089
2090impl Default for GcConfig {
2091 fn default() -> Self {
2092 Self {
2093 remove_orphans: false,
2094 remove_empty: false,
2095 older_than: None,
2096 dry_run: true,
2097 namespace: None,
2098 }
2099 }
2100}
2101
2102pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
2104 let s = s.trim().to_lowercase();
2105 if s.is_empty() {
2106 return Err(anyhow!("Empty duration string"));
2107 }
2108
2109 let (num_str, unit) = if s.ends_with('d') {
2111 (&s[..s.len() - 1], 'd')
2112 } else if s.ends_with('m') {
2113 (&s[..s.len() - 1], 'm')
2114 } else if s.ends_with('y') {
2115 (&s[..s.len() - 1], 'y')
2116 } else {
2117 return Err(anyhow!(
2118 "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
2119 s
2120 ));
2121 };
2122
2123 let num: i64 = num_str.parse().map_err(|_| {
2124 anyhow!(
2125 "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
2126 s
2127 )
2128 })?;
2129
2130 if num <= 0 {
2131 return Err(anyhow!("Duration must be positive, got '{}'", s));
2132 }
2133
2134 match unit {
2135 'd' => Ok(chrono::Duration::days(num)),
2136 'm' => Ok(chrono::Duration::days(num * 30)), 'y' => Ok(chrono::Duration::days(num * 365)), _ => unreachable!(),
2139 }
2140}
2141
2142impl StorageManager {
2143 #[doc(alias = "run_gc")]
2149 pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
2150 let mut stats = GcStats::default();
2151
2152 const PAGE_SIZE: usize = 5000;
2153 let mut all_docs: Vec<ChromaDocument> = Vec::new();
2154 let mut offset = 0;
2155 loop {
2156 let page = self
2157 .all_documents_page(config.namespace.as_deref(), offset, PAGE_SIZE)
2158 .await?;
2159 let page_len = page.len();
2160 all_docs.extend(page);
2161 if page_len < PAGE_SIZE {
2162 break;
2163 }
2164 offset += page_len;
2165 }
2166
2167 if all_docs.is_empty() {
2168 return Ok(stats);
2169 }
2170
2171 let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
2173 std::collections::HashMap::new();
2174 for doc in &all_docs {
2175 by_namespace
2176 .entry(doc.namespace.clone())
2177 .or_default()
2178 .push(doc);
2179 }
2180
2181 if config.remove_orphans {
2183 let orphan_stats = self
2184 .find_and_remove_orphans(&all_docs, config.dry_run)
2185 .await?;
2186 stats.orphans_found = orphan_stats.0;
2187 stats.orphans_removed = orphan_stats.1;
2188 }
2189
2190 if config.remove_empty {
2192 let empty_stats = self
2193 .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
2194 .await?;
2195 stats.empty_namespaces_found = empty_stats.0;
2196 stats.empty_namespaces_removed = empty_stats.1;
2197 stats.empty_namespace_names = empty_stats.2;
2198 }
2199
2200 if let Some(ref duration) = config.older_than {
2202 let old_stats = self
2203 .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
2204 .await?;
2205 stats.old_docs_found = old_stats.0;
2206 stats.old_docs_removed = old_stats.1;
2207 stats.affected_namespaces = old_stats.2;
2208 }
2209
2210 Ok(stats)
2211 }
2212
2213 #[deprecated(note = "use garbage_collect")]
2214 pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
2215 self.garbage_collect(config).await
2216 }
2217
2218 async fn find_and_remove_orphans(
2220 &self,
2221 docs: &[ChromaDocument],
2222 dry_run: bool,
2223 ) -> Result<(usize, usize)> {
2224 let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
2226
2227 let mut orphans: Vec<(&str, &str)> = Vec::new(); for doc in docs {
2230 if let Some(ref parent_id) = doc.parent_id
2231 && !all_ids.contains(parent_id.as_str())
2232 {
2233 orphans.push((&doc.namespace, &doc.id));
2234 }
2235 }
2236
2237 let found = orphans.len();
2238 let mut removed = 0;
2239
2240 if !dry_run && !orphans.is_empty() {
2241 for (namespace, id) in &orphans {
2242 if self.delete_document(namespace, id).await.is_ok() {
2243 removed += 1;
2244 }
2245 }
2246 }
2247
2248 Ok((found, removed))
2249 }
2250
2251 async fn find_and_remove_empty_namespaces(
2255 &self,
2256 by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
2257 _dry_run: bool,
2258 ) -> Result<(usize, usize, Vec<String>)> {
2259 let empty_namespaces: Vec<String> = by_namespace
2261 .iter()
2262 .filter(|(_, docs)| docs.is_empty())
2263 .map(|(ns, _)| ns.clone())
2264 .collect();
2265
2266 let found = empty_namespaces.len();
2267 let removed = 0;
2270
2271 Ok((found, removed, empty_namespaces))
2272 }
2273
2274 async fn find_and_remove_old_docs(
2276 &self,
2277 docs: &[ChromaDocument],
2278 older_than: &chrono::Duration,
2279 dry_run: bool,
2280 ) -> Result<(usize, usize, Vec<String>)> {
2281 let cutoff = chrono::Utc::now() - *older_than;
2282
2283 let mut old_docs: Vec<(&str, &str)> = Vec::new(); let mut affected_namespaces: std::collections::HashSet<String> =
2285 std::collections::HashSet::new();
2286
2287 for doc in docs {
2288 if let Some(obj) = doc.metadata.as_object() {
2290 let mut doc_timestamp: Option<String> = None;
2291
2292 for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
2294 if let Some(value) = obj.get(*key)
2295 && let Some(ts) = value.as_str()
2296 {
2297 doc_timestamp = Some(ts.to_string());
2298 break;
2299 }
2300 }
2301
2302 if let Some(ts) = doc_timestamp {
2304 let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
2306 parsed < cutoff
2307 } else if let Ok(parsed) =
2308 chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
2309 {
2310 parsed < cutoff.naive_utc()
2311 } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
2312 parsed < cutoff.date_naive()
2313 } else {
2314 false
2316 };
2317
2318 if is_old {
2319 old_docs.push((&doc.namespace, &doc.id));
2320 affected_namespaces.insert(doc.namespace.clone());
2321 }
2322 }
2323 }
2324 }
2325
2326 let found = old_docs.len();
2327 let mut removed = 0;
2328
2329 if !dry_run && !old_docs.is_empty() {
2330 for (namespace, id) in &old_docs {
2331 if self.delete_document(namespace, id).await.is_ok() {
2332 removed += 1;
2333 }
2334 }
2335 }
2336
2337 Ok((found, removed, affected_namespaces.into_iter().collect()))
2338 }
2339
2340 pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
2342 self.refresh().await?;
2343
2344 let mut namespace_counts: std::collections::HashMap<String, usize> =
2345 std::collections::HashMap::new();
2346
2347 for table_name in self.data_table_names().await? {
2348 let Some(table) = self.open_named_table_if_exists(&table_name).await? else {
2349 continue;
2350 };
2351 const PAGE_SIZE: usize = 5000;
2352 let mut offset = 0;
2353 loop {
2354 let page = self
2355 .query_table_page(&table, None, offset, PAGE_SIZE)
2356 .await?;
2357 let page_len = page.len();
2358 for doc in &page {
2359 *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
2360 }
2361 if page_len < PAGE_SIZE {
2362 break;
2363 }
2364 offset += page_len;
2365 }
2366 }
2367
2368 let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
2369 namespaces.sort_by(|a, b| a.0.cmp(&b.0));
2370 Ok(namespaces)
2371 }
2372}
2373
2374#[cfg(test)]
2375mod tests {
2376 use super::*;
2377 use serde_json::json;
2378 use tempfile::TempDir;
2379
2380 #[test]
2381 fn flat_documents_preserve_separate_chunk_and_source_hashes() {
2382 let doc = ChromaDocument::new_flat_with_hashes(
2383 "doc-1".to_string(),
2384 "kb:transcripts".to_string(),
2385 vec![0.0, 1.0],
2386 json!({"path": "sample.md"}),
2387 "outer summary chunk".to_string(),
2388 "chunk-sha256".to_string(),
2389 Some("source-sha256".to_string()),
2390 );
2391
2392 assert_eq!(doc.content_hash.as_deref(), Some("chunk-sha256"));
2393 assert_eq!(doc.source_hash.as_deref(), Some("source-sha256"));
2394 assert_ne!(doc.content_hash, doc.source_hash);
2395 }
2396
2397 #[tokio::test]
2398 async fn namespace_writes_use_separate_lance_tables_and_keep_contracts() {
2399 let tmp = TempDir::new().expect("temp dir");
2400 let db_path = tmp.path().join("lancedb");
2401 let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
2402 .await
2403 .expect("storage");
2404
2405 let embedding = vec![0.25_f32; 8];
2406 storage
2407 .add_to_store(vec![
2408 ChromaDocument::new_flat(
2409 "shared-id".to_string(),
2410 "kb:alpha".to_string(),
2411 embedding.clone(),
2412 json!({"ns": "alpha"}),
2413 "alpha memory".to_string(),
2414 ),
2415 ChromaDocument::new_flat(
2416 "shared-id".to_string(),
2417 "kb:beta".to_string(),
2418 embedding.clone(),
2419 json!({"ns": "beta"}),
2420 "beta memory".to_string(),
2421 ),
2422 ])
2423 .await
2424 .expect("write two namespaces");
2425
2426 let table_names = storage.lance.table_names().execute().await.expect("tables");
2427 let namespace_tables = table_names
2428 .iter()
2429 .filter(|name| StorageManager::is_namespace_table_name(name))
2430 .count();
2431 assert_eq!(namespace_tables, 2, "{table_names:?}");
2432 assert!(!table_names.iter().any(|name| name == DEFAULT_TABLE_NAME));
2433
2434 assert_eq!(storage.count_namespace("kb:alpha").await.unwrap(), 1);
2435 assert_eq!(storage.count_namespace("kb:beta").await.unwrap(), 1);
2436 assert_eq!(
2437 storage
2438 .get_document("kb:alpha", "shared-id")
2439 .await
2440 .unwrap()
2441 .unwrap()
2442 .document,
2443 "alpha memory"
2444 );
2445 assert_eq!(
2446 storage
2447 .search_store(Some("kb:beta"), embedding, 10)
2448 .await
2449 .unwrap()
2450 .into_iter()
2451 .map(|doc| doc.document)
2452 .collect::<Vec<_>>(),
2453 vec!["beta memory"]
2454 );
2455 }
2456}