1use anyhow::{Result, anyhow};
2use arrow_array::types::Float32Type;
3use arrow_array::{
4 Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
5 UInt8Array,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lancedb::connection::Connection;
10use lancedb::query::{ExecutableQuery, QueryBase};
11use lancedb::table::{OptimizeAction, OptimizeStats};
12use lancedb::{Table, connect};
13use serde::Serialize;
14use serde_json::{Value, json};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tracing::{debug, info};
18
19use crate::rag::SliceLayer;
20
21pub const SCHEMA_VERSION: u32 = 3;
26
27#[derive(Debug, Serialize, Clone)]
48pub struct ChromaDocument {
49 pub id: String,
50 pub namespace: String,
51 pub embedding: Vec<f32>,
52 pub metadata: serde_json::Value,
53 pub document: String,
54 pub layer: u8,
56 pub parent_id: Option<String>,
58 pub children_ids: Vec<String>,
60 pub keywords: Vec<String>,
62 pub content_hash: Option<String>,
64}
65
66impl ChromaDocument {
67 pub fn new_flat(
69 id: String,
70 namespace: String,
71 embedding: Vec<f32>,
72 metadata: serde_json::Value,
73 document: String,
74 ) -> Self {
75 Self {
76 id,
77 namespace,
78 embedding,
79 metadata,
80 document,
81 layer: 0, parent_id: None,
83 children_ids: vec![],
84 keywords: vec![],
85 content_hash: None,
86 }
87 }
88
89 pub fn new_flat_with_hash(
91 id: String,
92 namespace: String,
93 embedding: Vec<f32>,
94 metadata: serde_json::Value,
95 document: String,
96 content_hash: String,
97 ) -> Self {
98 Self {
99 id,
100 namespace,
101 embedding,
102 metadata,
103 document,
104 layer: 0,
105 parent_id: None,
106 children_ids: vec![],
107 keywords: vec![],
108 content_hash: Some(content_hash),
109 }
110 }
111
112 pub fn from_onion_slice(
114 slice: &crate::rag::OnionSlice,
115 namespace: String,
116 embedding: Vec<f32>,
117 metadata: serde_json::Value,
118 ) -> Self {
119 Self {
120 id: slice.id.clone(),
121 namespace,
122 embedding,
123 metadata,
124 document: slice.content.clone(),
125 layer: slice.layer.as_u8(),
126 parent_id: slice.parent_id.clone(),
127 children_ids: slice.children_ids.clone(),
128 keywords: slice.keywords.clone(),
129 content_hash: None,
130 }
131 }
132
133 pub fn from_onion_slice_with_hash(
135 slice: &crate::rag::OnionSlice,
136 namespace: String,
137 embedding: Vec<f32>,
138 metadata: serde_json::Value,
139 content_hash: String,
140 ) -> Self {
141 Self {
142 id: slice.id.clone(),
143 namespace,
144 embedding,
145 metadata,
146 document: slice.content.clone(),
147 layer: slice.layer.as_u8(),
148 parent_id: slice.parent_id.clone(),
149 children_ids: slice.children_ids.clone(),
150 keywords: slice.keywords.clone(),
151 content_hash: Some(content_hash),
152 }
153 }
154
155 pub fn is_flat(&self) -> bool {
157 self.layer == 0
158 }
159
160 pub fn slice_layer(&self) -> Option<SliceLayer> {
162 SliceLayer::from_u8(self.layer)
163 }
164}
165
166pub struct StorageManager {
167 lance: Connection,
168 table: Arc<Mutex<Option<Table>>>,
169 collection_name: String,
170 lance_path: String,
171}
172
173type BatchIter =
174 RecordBatchIterator<std::vec::IntoIter<std::result::Result<RecordBatch, ArrowError>>>;
175
176impl StorageManager {
177 pub async fn new(db_path: &str) -> Result<Self> {
178 let lance_env = std::env::var("LANCEDB_PATH").unwrap_or_else(|_| db_path.to_string());
180 let lance_path = if lance_env.trim().is_empty() {
181 shellexpand::tilde("~/.rmcp-servers/rmcp-memex/lancedb").to_string()
182 } else {
183 shellexpand::tilde(&lance_env).to_string()
184 };
185
186 let lance = connect(&lance_path).execute().await?;
187
188 Ok(Self {
189 lance,
190 table: Arc::new(Mutex::new(None)),
191 collection_name: "mcp_documents".to_string(),
192 lance_path,
193 })
194 }
195
196 pub async fn new_lance_only(db_path: &str) -> Result<Self> {
199 let lance_path = shellexpand::tilde(db_path).to_string();
200 let lance = connect(&lance_path).execute().await?;
201
202 Ok(Self {
203 lance,
204 table: Arc::new(Mutex::new(None)),
205 collection_name: "mcp_documents".to_string(),
206 lance_path,
207 })
208 }
209
210 pub fn lance_path(&self) -> &str {
211 &self.lance_path
212 }
213
214 pub async fn refresh(&self) -> Result<()> {
217 let mut guard = self.table.lock().await;
218 *guard = None;
219 tracing::info!("LanceDB table cache cleared - will refresh on next query");
220 Ok(())
221 }
222
223 pub async fn ensure_collection(&self) -> Result<()> {
224 let mut guard = self.table.lock().await;
226 if guard.is_some() {
227 return Ok(());
228 }
229 match self
230 .lance
231 .open_table(self.collection_name.as_str())
232 .execute()
233 .await
234 {
235 Ok(table) => {
236 *guard = Some(table);
237 info!("Found existing Lance table '{}'", self.collection_name);
238 }
239 Err(_) => {
240 info!(
241 "Lance table '{}' will be created on first insert",
242 self.collection_name
243 );
244 }
245 }
246 Ok(())
247 }
248
249 pub async fn add_to_store(&self, documents: Vec<ChromaDocument>) -> Result<()> {
250 if documents.is_empty() {
251 return Ok(());
252 }
253
254 let dim = documents
256 .first()
257 .ok_or_else(|| anyhow!("No documents to add"))?
258 .embedding
259 .len();
260 if dim == 0 {
261 return Err(anyhow!("Embedding dimension is zero"));
262 }
263
264 for (i, doc) in documents.iter().enumerate() {
266 if doc.embedding.len() != dim {
267 return Err(anyhow!(
268 "Document {} has inconsistent embedding dimension: expected {}, got {}. \
269 Aborting batch to prevent database corruption.",
270 i,
271 dim,
272 doc.embedding.len()
273 ));
274 }
275 if doc.id.is_empty() {
276 return Err(anyhow!("Document {} has empty ID. Aborting batch.", i));
277 }
278 if doc.namespace.is_empty() {
279 return Err(anyhow!(
280 "Document {} has empty namespace. Aborting batch.",
281 i
282 ));
283 }
284 for (j, &val) in doc.embedding.iter().enumerate() {
286 if val.is_nan() || val.is_infinite() {
287 return Err(anyhow!(
288 "Document {} has invalid embedding value at index {}: {}. \
289 Aborting batch to prevent database corruption.",
290 i,
291 j,
292 val
293 ));
294 }
295 }
296 }
297
298 let table = self.ensure_table(dim).await?;
299 let batch = self.docs_to_batch(&documents, dim)?;
300 table.add(batch).execute().await?;
301 debug!(
302 "Inserted {} documents into Lance (validated)",
303 documents.len()
304 );
305 Ok(())
306 }
307
308 pub async fn search_store(
309 &self,
310 namespace: Option<&str>,
311 embedding: Vec<f32>,
312 k: usize,
313 ) -> Result<Vec<ChromaDocument>> {
314 if embedding.is_empty() {
315 return Ok(vec![]);
316 }
317 let dim = embedding.len();
318 let table = self.ensure_table(dim).await?;
319
320 let mut query = table.query();
321 if let Some(ns) = namespace {
322 query = query.only_if(self.namespace_filter(ns).as_str());
323 }
324 let mut stream = query.nearest_to(embedding)?.limit(k).execute().await?;
325
326 let mut results = Vec::new();
327 while let Some(batch) = stream.try_next().await? {
328 let mut docs = self.batch_to_docs(&batch)?;
329 results.append(&mut docs);
330 }
331 debug!("Lance returned {} results", results.len());
332 Ok(results)
333 }
334
335 pub async fn all_documents(
339 &self,
340 namespace: Option<&str>,
341 limit: usize,
342 ) -> Result<Vec<ChromaDocument>> {
343 let table = match self.ensure_table(0).await {
344 Ok(t) => t,
345 Err(_) => return Ok(vec![]),
346 };
347
348 let mut query = table.query().limit(limit);
349 if let Some(ns) = namespace {
350 query = query.only_if(self.namespace_filter(ns).as_str());
351 }
352 let mut stream = query.execute().await?;
353
354 let mut results = Vec::new();
355 while let Some(batch) = stream.try_next().await? {
356 let mut docs = self.batch_to_docs(&batch)?;
357 results.append(&mut docs);
358 }
359
360 Ok(results)
361 }
362
363 pub async fn get_document(&self, namespace: &str, id: &str) -> Result<Option<ChromaDocument>> {
364 let table = match self.ensure_table(0).await {
365 Ok(t) => t,
366 Err(_) => return Ok(None),
367 };
368 let filter = format!(
369 "{} AND {}",
370 self.namespace_filter(namespace),
371 self.id_filter(id)
372 );
373 let mut stream = table
374 .query()
375 .only_if(filter.as_str())
376 .limit(1)
377 .execute()
378 .await?;
379 if let Some(batch) = stream.try_next().await? {
380 let mut docs = self.batch_to_docs(&batch)?;
381 if let Some(doc) = docs.pop() {
382 return Ok(Some(doc));
383 }
384 }
385 Ok(None)
386 }
387
388 pub async fn delete_document(&self, namespace: &str, id: &str) -> Result<usize> {
389 let table = match self.ensure_table(0).await {
390 Ok(t) => t,
391 Err(_) => return Ok(0),
392 };
393 let predicate = format!(
394 "{} AND {}",
395 self.namespace_filter(namespace),
396 self.id_filter(id)
397 );
398 let deleted = table.delete(predicate.as_str()).await?;
399 Ok(deleted.version as usize)
400 }
401
402 pub async fn delete_namespace_documents(&self, namespace: &str) -> Result<usize> {
403 let table = match self.ensure_table(0).await {
404 Ok(t) => t,
405 Err(_) => return Ok(0),
406 };
407 let predicate = self.namespace_filter(namespace);
408 let deleted = table.delete(predicate.as_str()).await?;
409 Ok(deleted.version as usize)
410 }
411
412 pub fn get_collection_name(&self) -> &str {
413 &self.collection_name
414 }
415
416 async fn ensure_table(&self, dim: usize) -> Result<Table> {
417 let mut guard = self.table.lock().await;
418 if let Some(table) = guard.as_ref() {
419 return Ok(table.clone());
420 }
421
422 let maybe_table = self
423 .lance
424 .open_table(self.collection_name.as_str())
425 .execute()
426 .await;
427
428 let table = if let Ok(tbl) = maybe_table {
429 tbl
430 } else {
431 if dim == 0 {
432 return Err(anyhow!(
433 "Vector table '{}' not found and dimension is unknown",
434 self.collection_name
435 ));
436 }
437 info!(
438 "Creating Lance table '{}' with vector dimension {} (schema v{})",
439 self.collection_name, dim, SCHEMA_VERSION
440 );
441 let schema = Arc::new(Self::create_schema(dim));
442 self.lance
443 .create_empty_table(self.collection_name.as_str(), schema)
444 .execute()
445 .await?
446 };
447
448 *guard = Some(table.clone());
449 Ok(table)
450 }
451
452 async fn open_existing_table(&self) -> Result<Table> {
453 self.ensure_table(0).await.map_err(|_| {
454 anyhow!(
455 "Vector table '{}' not found at {}. Index data first so rmcp-memex can use the stored embedding dimension instead of guessing.",
456 self.collection_name,
457 self.lance_path
458 )
459 })
460 }
461
462 fn create_schema(dim: usize) -> Schema {
464 Schema::new(vec![
465 Field::new("id", DataType::Utf8, false),
466 Field::new("namespace", DataType::Utf8, false),
467 Field::new(
468 "vector",
469 DataType::FixedSizeList(
470 Arc::new(Field::new("item", DataType::Float32, true)),
471 dim as i32,
472 ),
473 false,
474 ),
475 Field::new("text", DataType::Utf8, true),
476 Field::new("metadata", DataType::Utf8, true),
477 Field::new("layer", DataType::UInt8, true), Field::new("parent_id", DataType::Utf8, true), Field::new("children_ids", DataType::Utf8, true), Field::new("keywords", DataType::Utf8, true), Field::new("content_hash", DataType::Utf8, true), ])
485 }
486
487 fn docs_to_batch(&self, documents: &[ChromaDocument], dim: usize) -> Result<BatchIter> {
488 let ids = documents.iter().map(|d| d.id.as_str()).collect::<Vec<_>>();
489 let namespaces = documents
490 .iter()
491 .map(|d| d.namespace.as_str())
492 .collect::<Vec<_>>();
493 let texts = documents
494 .iter()
495 .map(|d| d.document.as_str())
496 .collect::<Vec<_>>();
497 let metadata_strings = documents
498 .iter()
499 .map(|d| serde_json::to_string(&d.metadata).unwrap_or_else(|_| "{}".to_string()))
500 .collect::<Vec<_>>();
501
502 let vectors = documents.iter().map(|d| {
503 if d.embedding.len() != dim {
504 None
505 } else {
506 Some(d.embedding.iter().map(|v| Some(*v)).collect::<Vec<_>>())
507 }
508 });
509
510 let layers: Vec<u8> = documents.iter().map(|d| d.layer).collect();
512 let parent_ids: Vec<Option<&str>> =
513 documents.iter().map(|d| d.parent_id.as_deref()).collect();
514 let children_ids_json: Vec<String> = documents
515 .iter()
516 .map(|d| serde_json::to_string(&d.children_ids).unwrap_or_else(|_| "[]".to_string()))
517 .collect();
518 let keywords_json: Vec<String> = documents
519 .iter()
520 .map(|d| serde_json::to_string(&d.keywords).unwrap_or_else(|_| "[]".to_string()))
521 .collect();
522 let content_hashes: Vec<Option<&str>> = documents
524 .iter()
525 .map(|d| d.content_hash.as_deref())
526 .collect();
527
528 let schema = Arc::new(Self::create_schema(dim));
529
530 let batch = RecordBatch::try_new(
531 schema.clone(),
532 vec![
533 Arc::new(StringArray::from(ids)),
534 Arc::new(StringArray::from(namespaces)),
535 Arc::new(
536 FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
537 vectors, dim as i32,
538 ),
539 ),
540 Arc::new(StringArray::from(texts)),
541 Arc::new(StringArray::from(metadata_strings)),
542 Arc::new(UInt8Array::from(layers)),
544 Arc::new(StringArray::from(parent_ids)),
545 Arc::new(StringArray::from(
546 children_ids_json
547 .iter()
548 .map(|s| s.as_str())
549 .collect::<Vec<_>>(),
550 )),
551 Arc::new(StringArray::from(
552 keywords_json.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
553 )),
554 Arc::new(StringArray::from(content_hashes)),
556 ],
557 )?;
558
559 Ok(RecordBatchIterator::new(
560 vec![Ok(batch)].into_iter(),
561 schema,
562 ))
563 }
564
565 fn batch_to_docs(&self, batch: &RecordBatch) -> Result<Vec<ChromaDocument>> {
566 let id_col = batch
567 .column_by_name("id")
568 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
569 .ok_or_else(|| anyhow!("Missing id column"))?;
570 let ns_col = batch
571 .column_by_name("namespace")
572 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
573 .ok_or_else(|| anyhow!("Missing namespace column"))?;
574 let text_col = batch
575 .column_by_name("text")
576 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
577 .ok_or_else(|| anyhow!("Missing text column"))?;
578 let metadata_col = batch
579 .column_by_name("metadata")
580 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
581 .ok_or_else(|| anyhow!("Missing metadata column"))?;
582 let vector_col = batch
583 .column_by_name("vector")
584 .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>())
585 .ok_or_else(|| anyhow!("Missing vector column"))?;
586
587 let layer_col = batch
589 .column_by_name("layer")
590 .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
591 let parent_id_col = batch
592 .column_by_name("parent_id")
593 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
594 let children_ids_col = batch
595 .column_by_name("children_ids")
596 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
597 let keywords_col = batch
598 .column_by_name("keywords")
599 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
600 let content_hash_col = batch
602 .column_by_name("content_hash")
603 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
604
605 let dim = vector_col.value_length() as usize;
606 let values = vector_col
607 .values()
608 .as_any()
609 .downcast_ref::<Float32Array>()
610 .ok_or_else(|| anyhow!("Vector inner type mismatch"))?;
611
612 let mut docs = Vec::new();
613 for i in 0..batch.num_rows() {
614 let id = id_col.value(i).to_string();
615 let text = text_col.value(i).to_string();
616 let namespace = ns_col.value(i).to_string();
617 let meta_str = metadata_col.value(i);
618 let metadata: Value = serde_json::from_str(meta_str).unwrap_or_else(|_| json!({}));
619
620 let offset = i * dim;
621 let mut emb = Vec::with_capacity(dim);
622 for j in 0..dim {
623 emb.push(values.value(offset + j));
624 }
625
626 let layer = layer_col
628 .and_then(|col| {
629 if col.is_null(i) {
630 None
631 } else {
632 Some(col.value(i))
633 }
634 })
635 .unwrap_or(0);
636
637 let parent_id = parent_id_col.and_then(|col| {
638 if col.is_null(i) {
639 None
640 } else {
641 Some(col.value(i).to_string())
642 }
643 });
644
645 let children_ids: Vec<String> = children_ids_col
646 .and_then(|col| {
647 if col.is_null(i) {
648 None
649 } else {
650 serde_json::from_str(col.value(i)).ok()
651 }
652 })
653 .unwrap_or_default();
654
655 let keywords: Vec<String> = keywords_col
656 .and_then(|col| {
657 if col.is_null(i) {
658 None
659 } else {
660 serde_json::from_str(col.value(i)).ok()
661 }
662 })
663 .unwrap_or_default();
664
665 let content_hash = content_hash_col.and_then(|col| {
666 if col.is_null(i) {
667 None
668 } else {
669 Some(col.value(i).to_string())
670 }
671 });
672
673 docs.push(ChromaDocument {
674 id,
675 namespace,
676 embedding: emb,
677 metadata,
678 document: text,
679 layer,
680 parent_id,
681 children_ids,
682 keywords,
683 content_hash,
684 });
685 }
686 Ok(docs)
687 }
688
689 pub async fn get_filtered_in_namespace(
690 &self,
691 namespace: &str,
692 filter: &str,
693 ) -> Result<Vec<ChromaDocument>> {
694 let table = match self.ensure_table(0).await {
695 Ok(t) => t,
696 Err(_) => return Ok(vec![]),
697 };
698 let combined = format!("{} AND ({})", self.namespace_filter(namespace), filter);
699 let mut stream = table.query().only_if(combined.as_str()).execute().await?;
700 let mut results = Vec::new();
701 while let Some(batch) = stream.try_next().await? {
702 let mut docs = self.batch_to_docs(&batch)?;
703 results.append(&mut docs);
704 }
705 Ok(results)
706 }
707
708 pub async fn search_store_with_layer(
710 &self,
711 namespace: Option<&str>,
712 embedding: Vec<f32>,
713 k: usize,
714 layer_filter: Option<SliceLayer>,
715 ) -> Result<Vec<ChromaDocument>> {
716 if embedding.is_empty() {
717 return Ok(vec![]);
718 }
719 let dim = embedding.len();
720 let table = self.ensure_table(dim).await?;
721
722 let mut query = table.query();
723
724 let mut filters = Vec::new();
726 if let Some(ns) = namespace {
727 filters.push(self.namespace_filter(ns));
728 }
729 if let Some(layer) = layer_filter {
730 filters.push(self.layer_filter(layer));
731 }
732
733 if !filters.is_empty() {
734 let combined = filters.join(" AND ");
735 query = query.only_if(combined.as_str());
736 }
737
738 let mut stream = query.nearest_to(embedding)?.limit(k).execute().await?;
739
740 let mut results = Vec::new();
741 while let Some(batch) = stream.try_next().await? {
742 let mut docs = self.batch_to_docs(&batch)?;
743 results.append(&mut docs);
744 }
745 debug!(
746 "Lance returned {} results (layer filter: {:?})",
747 results.len(),
748 layer_filter
749 );
750 Ok(results)
751 }
752
753 pub async fn get_children(
755 &self,
756 namespace: &str,
757 parent_id: &str,
758 ) -> Result<Vec<ChromaDocument>> {
759 let _ = match self.ensure_table(0).await {
761 Ok(t) => t,
762 Err(_) => return Ok(vec![]),
763 };
764
765 if let Some(parent) = self.get_document(namespace, parent_id).await? {
767 if parent.children_ids.is_empty() {
768 return Ok(vec![]);
769 }
770
771 let mut children = Vec::new();
773 for child_id in &parent.children_ids {
774 if let Some(child) = self.get_document(namespace, child_id).await? {
775 children.push(child);
776 }
777 }
778 return Ok(children);
779 }
780
781 Ok(vec![])
782 }
783
784 pub async fn get_parent(
786 &self,
787 namespace: &str,
788 child_id: &str,
789 ) -> Result<Option<ChromaDocument>> {
790 if let Some(child) = self.get_document(namespace, child_id).await?
791 && let Some(ref parent_id) = child.parent_id
792 {
793 return self.get_document(namespace, parent_id).await;
794 }
795 Ok(None)
796 }
797
798 fn namespace_filter(&self, namespace: &str) -> String {
799 format!("namespace = '{}'", namespace.replace('\'', "''"))
800 }
801
802 fn id_filter(&self, id: &str) -> String {
803 format!("id = '{}'", id.replace('\'', "''"))
804 }
805
806 fn layer_filter(&self, layer: SliceLayer) -> String {
807 if layer == SliceLayer::Outer {
808 "(layer = 0 OR layer = 1)".to_string()
810 } else {
811 format!("layer = {}", layer.as_u8())
812 }
813 }
814
815 fn content_hash_filter(&self, hash: &str) -> String {
816 format!("content_hash = '{}'", hash.replace('\'', "''"))
817 }
818
819 async fn table_has_content_hash(table: &Table) -> bool {
821 table
822 .schema()
823 .await
824 .map(|schema| schema.field_with_name("content_hash").is_ok())
825 .unwrap_or(false)
826 }
827
828 pub async fn has_content_hash(&self, namespace: &str, hash: &str) -> Result<bool> {
834 let table = match self.ensure_table(0).await {
835 Ok(t) => t,
836 Err(_) => return Ok(false), };
838
839 if !Self::table_has_content_hash(&table).await {
841 tracing::warn!(
842 "Table '{}' has old schema without content_hash column. \
843 Deduplication disabled. Consider re-indexing with new schema.",
844 self.collection_name
845 );
846 return Ok(false); }
848
849 let filter = format!(
850 "{} AND {}",
851 self.namespace_filter(namespace),
852 self.content_hash_filter(hash)
853 );
854
855 let mut stream = table
856 .query()
857 .only_if(filter.as_str())
858 .limit(1)
859 .execute()
860 .await?;
861
862 if let Some(batch) = stream.try_next().await? {
863 return Ok(batch.num_rows() > 0);
864 }
865
866 Ok(false)
867 }
868
869 pub async fn filter_existing_hashes<'a>(
874 &self,
875 namespace: &str,
876 hashes: &'a [String],
877 ) -> Result<Vec<&'a String>> {
878 if hashes.is_empty() {
879 return Ok(vec![]);
880 }
881
882 let table = match self.ensure_table(0).await {
883 Ok(t) => t,
884 Err(_) => return Ok(hashes.iter().collect()), };
886
887 if !Self::table_has_content_hash(&table).await {
889 tracing::warn!(
890 "Table '{}' has old schema without content_hash column. \
891 Deduplication disabled. Consider re-indexing with new schema.",
892 self.collection_name
893 );
894 return Ok(hashes.iter().collect()); }
896
897 let hash_conditions: Vec<String> =
900 hashes.iter().map(|h| self.content_hash_filter(h)).collect();
901
902 let filter = format!(
903 "{} AND ({})",
904 self.namespace_filter(namespace),
905 hash_conditions.join(" OR ")
906 );
907
908 let mut stream = table
909 .query()
910 .only_if(filter.as_str())
911 .limit(hashes.len())
912 .execute()
913 .await?;
914
915 let mut existing_hashes = std::collections::HashSet::new();
917 while let Some(batch) = stream.try_next().await? {
918 if let Some(hash_col) = batch
919 .column_by_name("content_hash")
920 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
921 {
922 for i in 0..batch.num_rows() {
923 if !hash_col.is_null(i) {
924 existing_hashes.insert(hash_col.value(i).to_string());
925 }
926 }
927 }
928 }
929
930 Ok(hashes
932 .iter()
933 .filter(|h| !existing_hashes.contains(h.as_str()))
934 .collect())
935 }
936
937 pub async fn optimize(&self) -> Result<OptimizeStats> {
943 let table = self.open_existing_table().await?;
944 let stats = table.optimize(OptimizeAction::All).await?;
945 info!(
946 "Optimize complete: compaction={:?}, prune={:?}",
947 stats.compaction, stats.prune
948 );
949 Ok(stats)
950 }
951
952 pub async fn compact(&self) -> Result<OptimizeStats> {
954 let table = self.open_existing_table().await?;
955 let stats = table
956 .optimize(OptimizeAction::Compact {
957 options: Default::default(),
958 remap_options: None,
959 })
960 .await?;
961 info!("Compaction complete: {:?}", stats.compaction);
962 Ok(stats)
963 }
964
965 pub async fn cleanup(&self, older_than_days: Option<u64>) -> Result<OptimizeStats> {
967 let table = self.open_existing_table().await?;
968 let days = older_than_days.unwrap_or(7) as i64;
969 let duration = chrono::TimeDelta::days(days);
970 let stats = table
971 .optimize(OptimizeAction::Prune {
972 older_than: Some(duration),
973 delete_unverified: Some(false),
974 error_if_tagged_old_versions: None,
975 })
976 .await?;
977 info!("Cleanup complete: {:?}", stats.prune);
978 Ok(stats)
979 }
980
981 pub async fn stats(&self) -> Result<TableStats> {
983 let table = self.open_existing_table().await?;
984 let row_count = table.count_rows(None).await?;
985
986 let versions = table.list_versions().await.unwrap_or_default();
988 let version_count = versions.len();
989
990 Ok(TableStats {
991 row_count,
992 version_count,
993 table_name: self.collection_name.clone(),
994 db_path: self.lance_path.clone(),
995 })
996 }
997
998 pub async fn count_namespace(&self, namespace: &str) -> Result<usize> {
1000 let table = match self.ensure_table(0).await {
1001 Ok(table) => table,
1002 Err(_) => return Ok(0),
1003 };
1004 let filter = self.namespace_filter(namespace);
1005 let count = table.count_rows(Some(filter)).await?;
1006 Ok(count)
1007 }
1008
1009 pub async fn get_all_in_namespace(&self, namespace: &str) -> Result<Vec<ChromaDocument>> {
1014 let table = match self.ensure_table(0).await {
1015 Ok(t) => t,
1016 Err(_) => return Ok(vec![]), };
1018
1019 let filter = self.namespace_filter(namespace);
1020 let mut stream = table.query().only_if(filter.as_str()).execute().await?;
1021
1022 let mut results = Vec::new();
1023 while let Some(batch) = stream.try_next().await? {
1024 let mut docs = self.batch_to_docs(&batch)?;
1025 results.append(&mut docs);
1026 }
1027
1028 debug!(
1029 "Retrieved {} documents from namespace '{}'",
1030 results.len(),
1031 namespace
1032 );
1033 Ok(results)
1034 }
1035
1036 pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
1038 let count = self.count_namespace(namespace).await?;
1039 Ok(count > 0)
1040 }
1041}
1042
1043#[derive(Debug, Clone, Serialize)]
1045pub struct TableStats {
1046 pub row_count: usize,
1047 pub version_count: usize,
1048 pub table_name: String,
1049 pub db_path: String,
1050}
1051
1052#[derive(Debug, Clone, Default, Serialize)]
1058pub struct GcStats {
1059 pub orphans_found: usize,
1061 pub orphans_removed: usize,
1063 pub empty_namespaces_found: usize,
1065 pub empty_namespaces_removed: usize,
1067 pub old_docs_found: usize,
1069 pub old_docs_removed: usize,
1071 pub bytes_freed: Option<u64>,
1073 pub empty_namespace_names: Vec<String>,
1075 pub affected_namespaces: Vec<String>,
1077}
1078
1079impl GcStats {
1080 pub fn has_issues(&self) -> bool {
1082 self.orphans_found > 0 || self.empty_namespaces_found > 0 || self.old_docs_found > 0
1083 }
1084
1085 pub fn has_deletions(&self) -> bool {
1087 self.orphans_removed > 0 || self.empty_namespaces_removed > 0 || self.old_docs_removed > 0
1088 }
1089}
1090
1091#[derive(Debug, Clone)]
1093pub struct GcConfig {
1094 pub remove_orphans: bool,
1096 pub remove_empty: bool,
1098 pub older_than: Option<chrono::Duration>,
1100 pub dry_run: bool,
1102 pub namespace: Option<String>,
1104}
1105
1106impl Default for GcConfig {
1107 fn default() -> Self {
1108 Self {
1109 remove_orphans: false,
1110 remove_empty: false,
1111 older_than: None,
1112 dry_run: true,
1113 namespace: None,
1114 }
1115 }
1116}
1117
1118pub fn parse_duration_string(s: &str) -> Result<chrono::Duration> {
1120 let s = s.trim().to_lowercase();
1121 if s.is_empty() {
1122 return Err(anyhow!("Empty duration string"));
1123 }
1124
1125 let (num_str, unit) = if s.ends_with('d') {
1127 (&s[..s.len() - 1], 'd')
1128 } else if s.ends_with('m') {
1129 (&s[..s.len() - 1], 'm')
1130 } else if s.ends_with('y') {
1131 (&s[..s.len() - 1], 'y')
1132 } else {
1133 return Err(anyhow!(
1134 "Invalid duration format '{}'. Use format like '30d', '6m', or '1y'",
1135 s
1136 ));
1137 };
1138
1139 let num: i64 = num_str.parse().map_err(|_| {
1140 anyhow!(
1141 "Invalid number in duration '{}'. Use format like '30d', '6m', or '1y'",
1142 s
1143 )
1144 })?;
1145
1146 if num <= 0 {
1147 return Err(anyhow!("Duration must be positive, got '{}'", s));
1148 }
1149
1150 match unit {
1151 'd' => Ok(chrono::Duration::days(num)),
1152 'm' => Ok(chrono::Duration::days(num * 30)), 'y' => Ok(chrono::Duration::days(num * 365)), _ => unreachable!(),
1155 }
1156}
1157
1158impl StorageManager {
1159 #[doc(alias = "run_gc")]
1165 pub async fn garbage_collect(&self, config: &GcConfig) -> Result<GcStats> {
1166 let mut stats = GcStats::default();
1167
1168 let all_docs = self
1170 .all_documents(config.namespace.as_deref(), 1_000_000)
1171 .await?;
1172
1173 if all_docs.is_empty() {
1174 return Ok(stats);
1175 }
1176
1177 let mut by_namespace: std::collections::HashMap<String, Vec<&ChromaDocument>> =
1179 std::collections::HashMap::new();
1180 for doc in &all_docs {
1181 by_namespace
1182 .entry(doc.namespace.clone())
1183 .or_default()
1184 .push(doc);
1185 }
1186
1187 if config.remove_orphans {
1189 let orphan_stats = self
1190 .find_and_remove_orphans(&all_docs, config.dry_run)
1191 .await?;
1192 stats.orphans_found = orphan_stats.0;
1193 stats.orphans_removed = orphan_stats.1;
1194 }
1195
1196 if config.remove_empty {
1198 let empty_stats = self
1199 .find_and_remove_empty_namespaces(&by_namespace, config.dry_run)
1200 .await?;
1201 stats.empty_namespaces_found = empty_stats.0;
1202 stats.empty_namespaces_removed = empty_stats.1;
1203 stats.empty_namespace_names = empty_stats.2;
1204 }
1205
1206 if let Some(ref duration) = config.older_than {
1208 let old_stats = self
1209 .find_and_remove_old_docs(&all_docs, duration, config.dry_run)
1210 .await?;
1211 stats.old_docs_found = old_stats.0;
1212 stats.old_docs_removed = old_stats.1;
1213 stats.affected_namespaces = old_stats.2;
1214 }
1215
1216 Ok(stats)
1217 }
1218
1219 #[deprecated(note = "use garbage_collect")]
1220 pub async fn run_gc(&self, config: &GcConfig) -> Result<GcStats> {
1221 self.garbage_collect(config).await
1222 }
1223
1224 async fn find_and_remove_orphans(
1226 &self,
1227 docs: &[ChromaDocument],
1228 dry_run: bool,
1229 ) -> Result<(usize, usize)> {
1230 let all_ids: std::collections::HashSet<&str> = docs.iter().map(|d| d.id.as_str()).collect();
1232
1233 let mut orphans: Vec<(&str, &str)> = Vec::new(); for doc in docs {
1236 if let Some(ref parent_id) = doc.parent_id
1237 && !all_ids.contains(parent_id.as_str())
1238 {
1239 orphans.push((&doc.namespace, &doc.id));
1240 }
1241 }
1242
1243 let found = orphans.len();
1244 let mut removed = 0;
1245
1246 if !dry_run && !orphans.is_empty() {
1247 for (namespace, id) in &orphans {
1248 if self.delete_document(namespace, id).await.is_ok() {
1249 removed += 1;
1250 }
1251 }
1252 }
1253
1254 Ok((found, removed))
1255 }
1256
1257 async fn find_and_remove_empty_namespaces(
1261 &self,
1262 by_namespace: &std::collections::HashMap<String, Vec<&ChromaDocument>>,
1263 _dry_run: bool,
1264 ) -> Result<(usize, usize, Vec<String>)> {
1265 let empty_namespaces: Vec<String> = by_namespace
1267 .iter()
1268 .filter(|(_, docs)| docs.is_empty())
1269 .map(|(ns, _)| ns.clone())
1270 .collect();
1271
1272 let found = empty_namespaces.len();
1273 let removed = 0;
1276
1277 Ok((found, removed, empty_namespaces))
1278 }
1279
1280 async fn find_and_remove_old_docs(
1282 &self,
1283 docs: &[ChromaDocument],
1284 older_than: &chrono::Duration,
1285 dry_run: bool,
1286 ) -> Result<(usize, usize, Vec<String>)> {
1287 let cutoff = chrono::Utc::now() - *older_than;
1288
1289 let mut old_docs: Vec<(&str, &str)> = Vec::new(); let mut affected_namespaces: std::collections::HashSet<String> =
1291 std::collections::HashSet::new();
1292
1293 for doc in docs {
1294 if let Some(obj) = doc.metadata.as_object() {
1296 let mut doc_timestamp: Option<String> = None;
1297
1298 for key in &["timestamp", "created_at", "indexed_at", "date", "time"] {
1300 if let Some(value) = obj.get(*key)
1301 && let Some(ts) = value.as_str()
1302 {
1303 doc_timestamp = Some(ts.to_string());
1304 break;
1305 }
1306 }
1307
1308 if let Some(ts) = doc_timestamp {
1310 let is_old = if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) {
1312 parsed < cutoff
1313 } else if let Ok(parsed) =
1314 chrono::NaiveDateTime::parse_from_str(&ts, "%Y-%m-%d %H:%M:%S")
1315 {
1316 parsed < cutoff.naive_utc()
1317 } else if let Ok(parsed) = chrono::NaiveDate::parse_from_str(&ts, "%Y-%m-%d") {
1318 parsed < cutoff.date_naive()
1319 } else {
1320 false
1322 };
1323
1324 if is_old {
1325 old_docs.push((&doc.namespace, &doc.id));
1326 affected_namespaces.insert(doc.namespace.clone());
1327 }
1328 }
1329 }
1330 }
1331
1332 let found = old_docs.len();
1333 let mut removed = 0;
1334
1335 if !dry_run && !old_docs.is_empty() {
1336 for (namespace, id) in &old_docs {
1337 if self.delete_document(namespace, id).await.is_ok() {
1338 removed += 1;
1339 }
1340 }
1341 }
1342
1343 Ok((found, removed, affected_namespaces.into_iter().collect()))
1344 }
1345
1346 pub async fn list_namespaces(&self) -> Result<Vec<(String, usize)>> {
1348 let all_docs = self.all_documents(None, 1_000_000).await?;
1349
1350 let mut namespace_counts: std::collections::HashMap<String, usize> =
1351 std::collections::HashMap::new();
1352 for doc in &all_docs {
1353 *namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
1354 }
1355
1356 let mut namespaces: Vec<(String, usize)> = namespace_counts.into_iter().collect();
1357 namespaces.sort_by(|a, b| a.0.cmp(&b.0));
1358 Ok(namespaces)
1359 }
1360}