1use anyhow::{Result, anyhow};
14use arrow_array::RecordBatch;
15use arrow_schema::Schema as ArrowSchema;
16use futures::TryStreamExt;
17use lancedb::Table;
18use lancedb::connection::Connection;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22pub struct LanceDbStore {
27 connection: Connection,
28 base_uri: String,
29}
30
31impl LanceDbStore {
32 pub async fn connect(uri: &str) -> Result<Self> {
39 Self::connect_with_storage_options(uri, None).await
40 }
41
42 pub async fn connect_with_storage_options(
44 uri: &str,
45 storage_options: Option<HashMap<String, String>>,
46 ) -> Result<Self> {
47 let mut builder = lancedb::connect(uri);
48 if let Some(opts) = storage_options {
49 builder = builder.storage_options(opts);
50 }
51 let connection = builder
52 .execute()
53 .await
54 .map_err(|e| anyhow!("Failed to connect to LanceDB at {}: {}", uri, e))?;
55
56 Ok(Self {
57 connection,
58 base_uri: uri.to_string(),
59 })
60 }
61
62 pub fn base_uri(&self) -> &str {
64 &self.base_uri
65 }
66
67 pub async fn table_names(&self) -> Result<Vec<String>> {
69 self.connection
70 .table_names()
71 .execute()
72 .await
73 .map_err(|e| anyhow!("Failed to list tables: {}", e))
74 }
75
76 pub async fn table_exists(&self, name: &str) -> Result<bool> {
78 let tables = self.table_names().await?;
79 Ok(tables.contains(&name.to_string()))
80 }
81
82 pub async fn open_table(&self, name: &str) -> Result<Table> {
84 self.connection
85 .open_table(name)
86 .execute()
87 .await
88 .map_err(|e| anyhow!("Failed to open table '{}': {}", name, e))
89 }
90
91 pub async fn create_table(&self, name: &str, batches: Vec<RecordBatch>) -> Result<Table> {
95 if batches.is_empty() {
96 return Err(anyhow!(
97 "Cannot create table '{}' with empty data. Use create_empty_table instead.",
98 name
99 ));
100 }
101
102 self.connection
103 .create_table(name, batches)
104 .execute()
105 .await
106 .map_err(|e| anyhow!("Failed to create table '{}': {}", name, e))
107 }
108
109 pub async fn create_empty_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<Table> {
111 self.connection
112 .create_empty_table(name, schema)
113 .execute()
114 .await
115 .map_err(|e| anyhow!("Failed to create empty table '{}': {}", name, e))
116 }
117
118 pub async fn open_or_create_table(
120 &self,
121 name: &str,
122 schema: Arc<ArrowSchema>,
123 ) -> Result<Table> {
124 if self.table_exists(name).await? {
125 self.open_table(name).await
126 } else {
127 self.create_empty_table(name, schema).await
128 }
129 }
130
131 pub async fn drop_table(&self, name: &str) -> Result<()> {
133 self.connection
134 .drop_table(name, &[])
135 .await
136 .map_err(|e| anyhow!("Failed to drop table '{}': {}", name, e))
137 }
138
139 pub async fn drop_all_tables(&self) -> Result<()> {
141 self.connection
142 .drop_all_tables(&[])
143 .await
144 .map_err(|e| anyhow!("Failed to drop all tables: {}", e))
145 }
146
147 pub async fn append_to_table(&self, table: &Table, batches: Vec<RecordBatch>) -> Result<()> {
149 if batches.is_empty() {
150 return Ok(());
151 }
152
153 table
154 .add(batches)
155 .execute()
156 .await
157 .map_err(|e| anyhow!("Failed to append to table: {}", e))?;
158
159 Ok(())
160 }
161
162 pub fn main_vertex_table_name() -> &'static str {
171 "vertices"
172 }
173
174 pub fn main_edge_table_name() -> &'static str {
179 "edges"
180 }
181
182 pub async fn open_main_vertex_table(&self) -> Result<Table> {
188 self.open_table(Self::main_vertex_table_name()).await
189 }
190
191 pub async fn open_main_edge_table(&self) -> Result<Table> {
197 self.open_table(Self::main_edge_table_name()).await
198 }
199
200 pub async fn main_vertex_table_exists(&self) -> Result<bool> {
202 self.table_exists(Self::main_vertex_table_name()).await
203 }
204
205 pub async fn main_edge_table_exists(&self) -> Result<bool> {
207 self.table_exists(Self::main_edge_table_name()).await
208 }
209
210 pub fn vertex_table_name(label: &str) -> String {
216 format!("vertices_{}", label)
217 }
218
219 pub async fn open_or_create_vertex_table(
221 &self,
222 label: &str,
223 schema: Arc<ArrowSchema>,
224 ) -> Result<Table> {
225 let table_name = Self::vertex_table_name(label);
226 self.open_or_create_table(&table_name, schema).await
227 }
228
229 pub async fn open_vertex_table(&self, label: &str) -> Result<Table> {
231 let table_name = Self::vertex_table_name(label);
232 self.open_table(&table_name).await
233 }
234
235 pub async fn vertex_table_exists(&self, label: &str) -> Result<bool> {
237 let table_name = Self::vertex_table_name(label);
238 self.table_exists(&table_name).await
239 }
240
241 pub fn delta_table_name(edge_type: &str, direction: &str) -> String {
247 format!("deltas_{}_{}", edge_type, direction)
248 }
249
250 pub async fn open_or_create_delta_table(
252 &self,
253 edge_type: &str,
254 direction: &str,
255 schema: Arc<ArrowSchema>,
256 ) -> Result<Table> {
257 let table_name = Self::delta_table_name(edge_type, direction);
258 self.open_or_create_table(&table_name, schema).await
259 }
260
261 pub async fn open_delta_table(&self, edge_type: &str, direction: &str) -> Result<Table> {
263 let table_name = Self::delta_table_name(edge_type, direction);
264 self.open_table(&table_name).await
265 }
266
267 pub fn adjacency_table_name(edge_type: &str, direction: &str) -> String {
273 format!("adjacency_{}_{}", edge_type, direction)
274 }
275
276 pub async fn open_or_create_adjacency_table(
278 &self,
279 edge_type: &str,
280 direction: &str,
281 schema: Arc<ArrowSchema>,
282 ) -> Result<Table> {
283 let table_name = Self::adjacency_table_name(edge_type, direction);
284 self.open_or_create_table(&table_name, schema).await
285 }
286
287 pub async fn open_adjacency_table(&self, edge_type: &str, direction: &str) -> Result<Table> {
289 let table_name = Self::adjacency_table_name(edge_type, direction);
290 self.open_table(&table_name).await
291 }
292
293 pub async fn get_table_version(&self, table_name: &str) -> Result<Option<u64>> {
305 if !self.table_exists(table_name).await? {
306 return Ok(None);
307 }
308 let table = self.open_table(table_name).await?;
309 let version = table
310 .version()
311 .await
312 .map_err(|e| anyhow!("Failed to get version for table '{}': {}", table_name, e))?;
313 Ok(Some(version))
314 }
315
316 pub async fn rollback_table(&self, table_name: &str, target_version: u64) -> Result<()> {
325 let table = self.open_table(table_name).await?;
326 table.checkout(target_version).await.map_err(|e| {
327 anyhow!(
328 "Failed to checkout version {} for '{}': {}",
329 target_version,
330 table_name,
331 e
332 )
333 })?;
334 table.restore().await.map_err(|e| {
335 anyhow!(
336 "Failed to restore table '{}' to version {}: {}",
337 table_name,
338 target_version,
339 e
340 )
341 })?;
342 Ok(())
343 }
344
345 pub async fn replace_table_atomic(
364 &self,
365 name: &str,
366 batches: Vec<RecordBatch>,
367 schema: Arc<ArrowSchema>,
368 ) -> Result<Table> {
369 let staging_name = format!("{}_staging", name);
371 if self.table_exists(&staging_name).await? {
372 self.drop_table(&staging_name).await?;
373 }
374
375 if self.table_exists(name).await? {
376 let table = self.open_table(name).await?;
380 if batches.is_empty() {
381 table
383 .delete("true")
384 .await
385 .map_err(|e| anyhow!("Failed to clear table '{}': {}", name, e))?;
386 } else {
387 use lancedb::table::AddDataMode;
388 table
389 .add(batches)
390 .mode(AddDataMode::Overwrite)
391 .execute()
392 .await
393 .map_err(|e| anyhow!("Failed to overwrite table '{}': {}", name, e))?;
394 }
395 Ok(table)
396 } else {
397 if batches.is_empty() {
399 self.create_empty_table(name, schema).await
400 } else {
401 self.create_table(name, batches).await
402 }
403 }
404 }
405
406 pub async fn recover_staging(&self, name: &str) -> Result<()> {
421 let staging_name = format!("{}_staging", name);
422
423 if !self.table_exists(&staging_name).await? {
425 return Ok(()); }
427
428 let main_exists = self.table_exists(name).await?;
430
431 if main_exists {
432 log::info!("Cleaning up leftover staging table: {}", staging_name);
434 self.drop_table(&staging_name).await?;
435 } else {
436 log::warn!("Recovering table '{}' from staging after crash", name);
438
439 let staging_table = self.open_table(&staging_name).await?;
441 let schema = staging_table.schema().await?;
442
443 use lancedb::query::ExecutableQuery;
445 let stream = staging_table.query().execute().await?;
446 let batches: Vec<RecordBatch> = stream.try_collect().await?;
447
448 if batches.is_empty() {
450 self.create_empty_table(name, schema).await?;
451 } else {
452 self.create_table(name, batches).await?;
453 }
454
455 self.drop_table(&staging_name).await?;
457
458 log::info!("Successfully recovered table '{}' from staging", name);
459 }
460
461 Ok(())
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use arrow_array::{Int64Array, StringArray};
469 use arrow_schema::{DataType, Field};
470 use tempfile::TempDir;
471
472 #[tokio::test]
473 async fn test_connect_and_create_table() {
474 let temp_dir = TempDir::new().unwrap();
475 let uri = temp_dir.path().to_str().unwrap();
476
477 let store = LanceDbStore::connect(uri).await.unwrap();
479
480 let schema = Arc::new(ArrowSchema::new(vec![
482 Field::new("id", DataType::Int64, false),
483 Field::new("name", DataType::Utf8, true),
484 ]));
485
486 let _table = store
488 .create_empty_table("test_table", schema.clone())
489 .await
490 .unwrap();
491
492 assert!(store.table_exists("test_table").await.unwrap());
494
495 let tables = store.table_names().await.unwrap();
497 assert!(tables.contains(&"test_table".to_string()));
498 }
499
500 #[tokio::test]
501 async fn test_create_table_with_data() {
502 let temp_dir = TempDir::new().unwrap();
503 let uri = temp_dir.path().to_str().unwrap();
504
505 let store = LanceDbStore::connect(uri).await.unwrap();
506
507 let schema = Arc::new(ArrowSchema::new(vec![
509 Field::new("id", DataType::Int64, false),
510 Field::new("name", DataType::Utf8, true),
511 ]));
512
513 let batch = RecordBatch::try_new(
514 schema.clone(),
515 vec![
516 Arc::new(Int64Array::from(vec![1, 2, 3])),
517 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
518 ],
519 )
520 .unwrap();
521
522 let table = store.create_table("users", vec![batch]).await.unwrap();
524
525 let count = table.count_rows(None).await.unwrap();
527 assert_eq!(count, 3);
528 }
529
530 #[tokio::test]
531 async fn test_vertex_table_operations() {
532 let temp_dir = TempDir::new().unwrap();
533 let uri = temp_dir.path().to_str().unwrap();
534
535 let store = LanceDbStore::connect(uri).await.unwrap();
536
537 assert_eq!(LanceDbStore::vertex_table_name("Person"), "vertices_Person");
539
540 let schema = Arc::new(ArrowSchema::new(vec![
542 Field::new("_vid", DataType::UInt64, false),
543 Field::new("_deleted", DataType::Boolean, false),
544 Field::new("_version", DataType::UInt64, false),
545 Field::new("name", DataType::Utf8, true),
546 ]));
547
548 let table = store
550 .open_or_create_vertex_table("Person", schema)
551 .await
552 .unwrap();
553
554 assert!(store.vertex_table_exists("Person").await.unwrap());
556
557 let count = table.count_rows(None).await.unwrap();
559 assert_eq!(count, 0);
560 }
561
562 #[tokio::test]
563 async fn test_append_to_table() {
564 use arrow_array::UInt64Array;
565
566 let temp_dir = TempDir::new().unwrap();
567 let uri = temp_dir.path().to_str().unwrap();
568
569 let store = LanceDbStore::connect(uri).await.unwrap();
570
571 let schema = Arc::new(ArrowSchema::new(vec![
572 Field::new("id", DataType::UInt64, false),
573 Field::new("value", DataType::Int64, false),
574 ]));
575
576 let batch1 = RecordBatch::try_new(
578 schema.clone(),
579 vec![
580 Arc::new(UInt64Array::from(vec![1, 2])),
581 Arc::new(Int64Array::from(vec![100, 200])),
582 ],
583 )
584 .unwrap();
585
586 let table = store.create_table("test", vec![batch1]).await.unwrap();
587 assert_eq!(table.count_rows(None).await.unwrap(), 2);
588
589 let batch2 = RecordBatch::try_new(
591 schema.clone(),
592 vec![
593 Arc::new(UInt64Array::from(vec![3, 4, 5])),
594 Arc::new(Int64Array::from(vec![300, 400, 500])),
595 ],
596 )
597 .unwrap();
598
599 store.append_to_table(&table, vec![batch2]).await.unwrap();
600
601 let count = table.count_rows(None).await.unwrap();
603 assert_eq!(count, 5);
604 }
605
606 #[tokio::test]
607 async fn test_replace_table_atomic_success() {
608 use arrow_array::UInt64Array;
609
610 let temp_dir = TempDir::new().unwrap();
611 let uri = temp_dir.path().to_str().unwrap();
612
613 let store = LanceDbStore::connect(uri).await.unwrap();
614
615 let schema = Arc::new(ArrowSchema::new(vec![
616 Field::new("id", DataType::UInt64, false),
617 Field::new("value", DataType::Int64, false),
618 ]));
619
620 let batch1 = RecordBatch::try_new(
622 schema.clone(),
623 vec![
624 Arc::new(UInt64Array::from(vec![1, 2, 3])),
625 Arc::new(Int64Array::from(vec![100, 200, 300])),
626 ],
627 )
628 .unwrap();
629
630 store.create_table("test", vec![batch1]).await.unwrap();
631
632 let batch2 = RecordBatch::try_new(
634 schema.clone(),
635 vec![
636 Arc::new(UInt64Array::from(vec![4, 5])),
637 Arc::new(Int64Array::from(vec![400, 500])),
638 ],
639 )
640 .unwrap();
641
642 let table = store
643 .replace_table_atomic("test", vec![batch2], schema.clone())
644 .await
645 .unwrap();
646
647 assert_eq!(table.count_rows(None).await.unwrap(), 2);
649
650 assert!(!store.table_exists("test_staging").await.unwrap());
652 }
653
654 #[tokio::test]
655 async fn test_replace_table_atomic_empty_data() {
656 let temp_dir = TempDir::new().unwrap();
657 let uri = temp_dir.path().to_str().unwrap();
658
659 let store = LanceDbStore::connect(uri).await.unwrap();
660
661 let schema = Arc::new(ArrowSchema::new(vec![
662 Field::new("id", DataType::Int64, false),
663 Field::new("name", DataType::Utf8, true),
664 ]));
665
666 let table = store
668 .replace_table_atomic("test", vec![], schema.clone())
669 .await
670 .unwrap();
671
672 assert_eq!(table.count_rows(None).await.unwrap(), 0);
674
675 assert!(!store.table_exists("test_staging").await.unwrap());
677 }
678
679 #[tokio::test]
680 async fn test_replace_table_atomic_overwrite_existing_with_empty() {
681 use arrow_array::UInt64Array;
682
683 let temp_dir = TempDir::new().unwrap();
684 let uri = temp_dir.path().to_str().unwrap();
685
686 let store = LanceDbStore::connect(uri).await.unwrap();
687
688 let schema = Arc::new(ArrowSchema::new(vec![
689 Field::new("id", DataType::UInt64, false),
690 Field::new("value", DataType::Int64, false),
691 ]));
692
693 let batch = RecordBatch::try_new(
695 schema.clone(),
696 vec![
697 Arc::new(UInt64Array::from(vec![1, 2, 3])),
698 Arc::new(Int64Array::from(vec![100, 200, 300])),
699 ],
700 )
701 .unwrap();
702
703 store.create_table("test", vec![batch]).await.unwrap();
704
705 let table = store
707 .replace_table_atomic("test", vec![], schema.clone())
708 .await
709 .unwrap();
710
711 assert_eq!(table.count_rows(None).await.unwrap(), 0);
713 }
714
715 #[tokio::test]
716 async fn test_recover_staging_no_staging() {
717 let temp_dir = TempDir::new().unwrap();
718 let uri = temp_dir.path().to_str().unwrap();
719
720 let store = LanceDbStore::connect(uri).await.unwrap();
721
722 store.recover_staging("test").await.unwrap();
724
725 assert!(!store.table_exists("test").await.unwrap());
727 assert!(!store.table_exists("test_staging").await.unwrap());
728 }
729
730 #[tokio::test]
731 async fn test_recover_staging_both_exist() {
732 use arrow_array::UInt64Array;
733
734 let temp_dir = TempDir::new().unwrap();
735 let uri = temp_dir.path().to_str().unwrap();
736
737 let store = LanceDbStore::connect(uri).await.unwrap();
738
739 let schema = Arc::new(ArrowSchema::new(vec![
740 Field::new("id", DataType::UInt64, false),
741 Field::new("value", DataType::Int64, false),
742 ]));
743
744 let batch_main = RecordBatch::try_new(
746 schema.clone(),
747 vec![
748 Arc::new(UInt64Array::from(vec![1, 2])),
749 Arc::new(Int64Array::from(vec![100, 200])),
750 ],
751 )
752 .unwrap();
753
754 let batch_staging = RecordBatch::try_new(
755 schema.clone(),
756 vec![
757 Arc::new(UInt64Array::from(vec![3, 4])),
758 Arc::new(Int64Array::from(vec![300, 400])),
759 ],
760 )
761 .unwrap();
762
763 store.create_table("test", vec![batch_main]).await.unwrap();
764 store
765 .create_table("test_staging", vec![batch_staging])
766 .await
767 .unwrap();
768
769 store.recover_staging("test").await.unwrap();
771
772 let table = store.open_table("test").await.unwrap();
774 assert_eq!(table.count_rows(None).await.unwrap(), 2);
775
776 assert!(!store.table_exists("test_staging").await.unwrap());
778 }
779
780 #[tokio::test]
781 async fn test_recover_staging_main_missing() {
782 use arrow_array::UInt64Array;
783
784 let temp_dir = TempDir::new().unwrap();
785 let uri = temp_dir.path().to_str().unwrap();
786
787 let store = LanceDbStore::connect(uri).await.unwrap();
788
789 let schema = Arc::new(ArrowSchema::new(vec![
790 Field::new("id", DataType::UInt64, false),
791 Field::new("value", DataType::Int64, false),
792 ]));
793
794 let batch_staging = RecordBatch::try_new(
796 schema.clone(),
797 vec![
798 Arc::new(UInt64Array::from(vec![1, 2, 3])),
799 Arc::new(Int64Array::from(vec![100, 200, 300])),
800 ],
801 )
802 .unwrap();
803
804 store
805 .create_table("test_staging", vec![batch_staging])
806 .await
807 .unwrap();
808
809 store.recover_staging("test").await.unwrap();
811
812 let table = store.open_table("test").await.unwrap();
814 assert_eq!(table.count_rows(None).await.unwrap(), 3);
815
816 assert!(!store.table_exists("test_staging").await.unwrap());
818 }
819
820 #[tokio::test]
821 async fn test_recover_staging_empty_staging() {
822 let temp_dir = TempDir::new().unwrap();
823 let uri = temp_dir.path().to_str().unwrap();
824
825 let store = LanceDbStore::connect(uri).await.unwrap();
826
827 let schema = Arc::new(ArrowSchema::new(vec![
828 Field::new("id", DataType::Int64, false),
829 Field::new("name", DataType::Utf8, true),
830 ]));
831
832 store
834 .create_empty_table("test_staging", schema.clone())
835 .await
836 .unwrap();
837
838 store.recover_staging("test").await.unwrap();
840
841 let table = store.open_table("test").await.unwrap();
843 assert_eq!(table.count_rows(None).await.unwrap(), 0);
844
845 assert!(!store.table_exists("test_staging").await.unwrap());
847 }
848}