Skip to main content

uni_store/lancedb/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! LanceDB integration module.
5//!
6//! This module provides a wrapper around LanceDB for Uni's storage layer.
7//! LanceDB provides:
8//! - Built-in DataFusion query engine
9//! - Automatic scalar indexing (BTREE, BITMAP)
10//! - Vector search with IVF_PQ
11//! - Full-text search with BM25
12
13use anyhow::{Result, anyhow};
14use arrow_array::RecordBatch;
15use arrow_schema::Schema as ArrowSchema;
16use futures::TryStreamExt;
17use lancedb::Table;
18use lancedb::connection::Connection;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22/// Wrapper around LanceDB connection for Uni storage.
23///
24/// This provides a unified interface for all table operations,
25/// replacing direct Lance Dataset usage.
26pub struct LanceDbStore {
27    connection: Connection,
28    base_uri: String,
29}
30
31impl LanceDbStore {
32    /// Connect to a LanceDB database at the given URI.
33    ///
34    /// Supported URIs:
35    /// - `/path/to/database` - local filesystem
36    /// - `s3://bucket/path` - AWS S3
37    /// - `gs://bucket/path` - Google Cloud Storage
38    pub async fn connect(uri: &str) -> Result<Self> {
39        Self::connect_with_storage_options(uri, None).await
40    }
41
42    /// Connect to a LanceDB database with explicit storage options.
43    pub async fn connect_with_storage_options(
44        uri: &str,
45        storage_options: Option<HashMap<String, String>>,
46    ) -> Result<Self> {
47        let mut builder = lancedb::connect(uri);
48        if let Some(opts) = storage_options {
49            builder = builder.storage_options(opts);
50        }
51        let connection = builder
52            .execute()
53            .await
54            .map_err(|e| anyhow!("Failed to connect to LanceDB at {}: {}", uri, e))?;
55
56        Ok(Self {
57            connection,
58            base_uri: uri.to_string(),
59        })
60    }
61
62    /// Get the base URI for this store.
63    pub fn base_uri(&self) -> &str {
64        &self.base_uri
65    }
66
67    /// List all table names in the database.
68    pub async fn table_names(&self) -> Result<Vec<String>> {
69        self.connection
70            .table_names()
71            .execute()
72            .await
73            .map_err(|e| anyhow!("Failed to list tables: {}", e))
74    }
75
76    /// Check if a table exists.
77    pub async fn table_exists(&self, name: &str) -> Result<bool> {
78        let tables = self.table_names().await?;
79        Ok(tables.contains(&name.to_string()))
80    }
81
82    /// Open an existing table.
83    pub async fn open_table(&self, name: &str) -> Result<Table> {
84        self.connection
85            .open_table(name)
86            .execute()
87            .await
88            .map_err(|e| anyhow!("Failed to open table '{}': {}", name, e))
89    }
90
91    /// Create a new table with initial data.
92    ///
93    /// If the table already exists, this will fail.
94    pub async fn create_table(&self, name: &str, batches: Vec<RecordBatch>) -> Result<Table> {
95        if batches.is_empty() {
96            return Err(anyhow!(
97                "Cannot create table '{}' with empty data. Use create_empty_table instead.",
98                name
99            ));
100        }
101
102        self.connection
103            .create_table(name, batches)
104            .execute()
105            .await
106            .map_err(|e| anyhow!("Failed to create table '{}': {}", name, e))
107    }
108
109    /// Create a new empty table with a schema.
110    pub async fn create_empty_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<Table> {
111        self.connection
112            .create_empty_table(name, schema)
113            .execute()
114            .await
115            .map_err(|e| anyhow!("Failed to create empty table '{}': {}", name, e))
116    }
117
118    /// Open a table, creating it if it doesn't exist.
119    pub async fn open_or_create_table(
120        &self,
121        name: &str,
122        schema: Arc<ArrowSchema>,
123    ) -> Result<Table> {
124        if self.table_exists(name).await? {
125            self.open_table(name).await
126        } else {
127            self.create_empty_table(name, schema).await
128        }
129    }
130
131    /// Drop a table by name.
132    pub async fn drop_table(&self, name: &str) -> Result<()> {
133        self.connection
134            .drop_table(name, &[])
135            .await
136            .map_err(|e| anyhow!("Failed to drop table '{}': {}", name, e))
137    }
138
139    /// Drop all tables in the database.
140    pub async fn drop_all_tables(&self) -> Result<()> {
141        self.connection
142            .drop_all_tables(&[])
143            .await
144            .map_err(|e| anyhow!("Failed to drop all tables: {}", e))
145    }
146
147    /// Append data to an existing table.
148    pub async fn append_to_table(&self, table: &Table, batches: Vec<RecordBatch>) -> Result<()> {
149        if batches.is_empty() {
150            return Ok(());
151        }
152
153        table
154            .add(batches)
155            .execute()
156            .await
157            .map_err(|e| anyhow!("Failed to append to table: {}", e))?;
158
159        Ok(())
160    }
161
162    // ========================================================================
163    // Main Unified Table Operations
164    // ========================================================================
165
166    /// Get the table name for the main vertices table.
167    ///
168    /// The main vertices table contains all vertices regardless of label,
169    /// enabling fast ID-based lookups without knowing the label.
170    pub fn main_vertex_table_name() -> &'static str {
171        "vertices"
172    }
173
174    /// Get the table name for the main edges table.
175    ///
176    /// The main edges table contains all edges regardless of type,
177    /// enabling fast ID-based lookups without knowing the edge type.
178    pub fn main_edge_table_name() -> &'static str {
179        "edges"
180    }
181
182    /// Open the main vertices table.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if the table does not exist.
187    pub async fn open_main_vertex_table(&self) -> Result<Table> {
188        self.open_table(Self::main_vertex_table_name()).await
189    }
190
191    /// Open the main edges table.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if the table does not exist.
196    pub async fn open_main_edge_table(&self) -> Result<Table> {
197        self.open_table(Self::main_edge_table_name()).await
198    }
199
200    /// Check if the main vertices table exists.
201    pub async fn main_vertex_table_exists(&self) -> Result<bool> {
202        self.table_exists(Self::main_vertex_table_name()).await
203    }
204
205    /// Check if the main edges table exists.
206    pub async fn main_edge_table_exists(&self) -> Result<bool> {
207        self.table_exists(Self::main_edge_table_name()).await
208    }
209
210    // ========================================================================
211    // Per-Label Vertex Table Operations
212    // ========================================================================
213
214    /// Get the table name for a vertex label.
215    pub fn vertex_table_name(label: &str) -> String {
216        format!("vertices_{}", label)
217    }
218
219    /// Open or create a vertex table for a label.
220    pub async fn open_or_create_vertex_table(
221        &self,
222        label: &str,
223        schema: Arc<ArrowSchema>,
224    ) -> Result<Table> {
225        let table_name = Self::vertex_table_name(label);
226        self.open_or_create_table(&table_name, schema).await
227    }
228
229    /// Open a vertex table for a label.
230    pub async fn open_vertex_table(&self, label: &str) -> Result<Table> {
231        let table_name = Self::vertex_table_name(label);
232        self.open_table(&table_name).await
233    }
234
235    /// Check if a vertex table exists for a label.
236    pub async fn vertex_table_exists(&self, label: &str) -> Result<bool> {
237        let table_name = Self::vertex_table_name(label);
238        self.table_exists(&table_name).await
239    }
240
241    // ========================================================================
242    // Delta Table Operations (Edge Mutations)
243    // ========================================================================
244
245    /// Get the table name for edge deltas.
246    pub fn delta_table_name(edge_type: &str, direction: &str) -> String {
247        format!("deltas_{}_{}", edge_type, direction)
248    }
249
250    /// Open or create a delta table.
251    pub async fn open_or_create_delta_table(
252        &self,
253        edge_type: &str,
254        direction: &str,
255        schema: Arc<ArrowSchema>,
256    ) -> Result<Table> {
257        let table_name = Self::delta_table_name(edge_type, direction);
258        self.open_or_create_table(&table_name, schema).await
259    }
260
261    /// Open a delta table.
262    pub async fn open_delta_table(&self, edge_type: &str, direction: &str) -> Result<Table> {
263        let table_name = Self::delta_table_name(edge_type, direction);
264        self.open_table(&table_name).await
265    }
266
267    // ========================================================================
268    // Adjacency Table Operations
269    // ========================================================================
270
271    /// Get the table name for adjacency data.
272    pub fn adjacency_table_name(edge_type: &str, direction: &str) -> String {
273        format!("adjacency_{}_{}", edge_type, direction)
274    }
275
276    /// Open or create an adjacency table.
277    pub async fn open_or_create_adjacency_table(
278        &self,
279        edge_type: &str,
280        direction: &str,
281        schema: Arc<ArrowSchema>,
282    ) -> Result<Table> {
283        let table_name = Self::adjacency_table_name(edge_type, direction);
284        self.open_or_create_table(&table_name, schema).await
285    }
286
287    /// Open an adjacency table.
288    pub async fn open_adjacency_table(&self, edge_type: &str, direction: &str) -> Result<Table> {
289        let table_name = Self::adjacency_table_name(edge_type, direction);
290        self.open_table(&table_name).await
291    }
292
293    // ========================================================================
294    // Version/Rollback Operations (for bulk load abort)
295    // ========================================================================
296
297    /// Get the current version of a table.
298    ///
299    /// Returns `None` if the table does not exist.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the table exists but version query fails.
304    pub async fn get_table_version(&self, table_name: &str) -> Result<Option<u64>> {
305        if !self.table_exists(table_name).await? {
306            return Ok(None);
307        }
308        let table = self.open_table(table_name).await?;
309        let version = table
310            .version()
311            .await
312            .map_err(|e| anyhow!("Failed to get version for table '{}': {}", table_name, e))?;
313        Ok(Some(version))
314    }
315
316    /// Roll back a table to a specific version.
317    ///
318    /// This uses LanceDB's checkout and restore APIs to create a new version
319    /// that matches the state at `target_version`.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the table cannot be opened or rollback fails.
324    pub async fn rollback_table(&self, table_name: &str, target_version: u64) -> Result<()> {
325        let table = self.open_table(table_name).await?;
326        table.checkout(target_version).await.map_err(|e| {
327            anyhow!(
328                "Failed to checkout version {} for '{}': {}",
329                target_version,
330                table_name,
331                e
332            )
333        })?;
334        table.restore().await.map_err(|e| {
335            anyhow!(
336                "Failed to restore table '{}' to version {}: {}",
337                table_name,
338                target_version,
339                e
340            )
341        })?;
342        Ok(())
343    }
344
345    /// Replace a table's contents atomically using Lance's overwrite mode.
346    ///
347    /// When the table already exists, this uses `add().mode(Overwrite)` to create
348    /// a new dataset version without dropping the table. This is critical for
349    /// concurrency safety: the old data files remain on disk (referenced by older
350    /// versions) until `Prune` runs, so concurrent readers can finish without
351    /// hitting "file not found" errors.
352    ///
353    /// When the table does not exist, it creates a new table normally.
354    ///
355    /// # Arguments
356    /// * `name` - The table name to replace
357    /// * `batches` - The new data (can be empty)
358    /// * `schema` - The Arrow schema for the table
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if table operations fail.
363    pub async fn replace_table_atomic(
364        &self,
365        name: &str,
366        batches: Vec<RecordBatch>,
367        schema: Arc<ArrowSchema>,
368    ) -> Result<Table> {
369        // Clean up any leftover staging table from pre-overwrite-mode code
370        let staging_name = format!("{}_staging", name);
371        if self.table_exists(&staging_name).await? {
372            self.drop_table(&staging_name).await?;
373        }
374
375        if self.table_exists(name).await? {
376            // Table exists: use overwrite mode to create a new version.
377            // Old data files stay on disk until Lance Prune removes them,
378            // so concurrent readers are safe.
379            let table = self.open_table(name).await?;
380            if batches.is_empty() {
381                // Delete all rows to produce an empty table version
382                table
383                    .delete("true")
384                    .await
385                    .map_err(|e| anyhow!("Failed to clear table '{}': {}", name, e))?;
386            } else {
387                use lancedb::table::AddDataMode;
388                table
389                    .add(batches)
390                    .mode(AddDataMode::Overwrite)
391                    .execute()
392                    .await
393                    .map_err(|e| anyhow!("Failed to overwrite table '{}': {}", name, e))?;
394            }
395            Ok(table)
396        } else {
397            // Table doesn't exist: create it
398            if batches.is_empty() {
399                self.create_empty_table(name, schema).await
400            } else {
401                self.create_table(name, batches).await
402            }
403        }
404    }
405
406    /// Recover a table from its staging table if needed.
407    ///
408    /// This method handles crash recovery scenarios:
409    /// - If `{name}_staging` exists AND `{name}` exists → drop staging (leftover)
410    /// - If `{name}_staging` exists AND `{name}` missing → restore main from staging
411    ///
412    /// Call this on startup for all known table patterns to recover from crashes.
413    ///
414    /// # Arguments
415    /// * `name` - The table name to recover
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if recovery operations fail.
420    pub async fn recover_staging(&self, name: &str) -> Result<()> {
421        let staging_name = format!("{}_staging", name);
422
423        // Check if staging table exists
424        if !self.table_exists(&staging_name).await? {
425            return Ok(()); // No staging table, nothing to recover
426        }
427
428        // Check if main table exists
429        let main_exists = self.table_exists(name).await?;
430
431        if main_exists {
432            // Case 1: Both staging and main exist → drop staging (leftover from completed operation)
433            log::info!("Cleaning up leftover staging table: {}", staging_name);
434            self.drop_table(&staging_name).await?;
435        } else {
436            // Case 2: Staging exists but main missing → restore main from staging
437            log::warn!("Recovering table '{}' from staging after crash", name);
438
439            // Read staging data
440            let staging_table = self.open_table(&staging_name).await?;
441            let schema = staging_table.schema().await?;
442
443            // Read all batches from staging
444            use lancedb::query::ExecutableQuery;
445            let stream = staging_table.query().execute().await?;
446            let batches: Vec<RecordBatch> = stream.try_collect().await?;
447
448            // Create main table from staging data
449            if batches.is_empty() {
450                self.create_empty_table(name, schema).await?;
451            } else {
452                self.create_table(name, batches).await?;
453            }
454
455            // Drop staging after successful recovery
456            self.drop_table(&staging_name).await?;
457
458            log::info!("Successfully recovered table '{}' from staging", name);
459        }
460
461        Ok(())
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use arrow_array::{Int64Array, StringArray};
469    use arrow_schema::{DataType, Field};
470    use tempfile::TempDir;
471
472    #[tokio::test]
473    async fn test_connect_and_create_table() {
474        let temp_dir = TempDir::new().unwrap();
475        let uri = temp_dir.path().to_str().unwrap();
476
477        // Connect to LanceDB
478        let store = LanceDbStore::connect(uri).await.unwrap();
479
480        // Create a simple schema
481        let schema = Arc::new(ArrowSchema::new(vec![
482            Field::new("id", DataType::Int64, false),
483            Field::new("name", DataType::Utf8, true),
484        ]));
485
486        // Create empty table
487        let _table = store
488            .create_empty_table("test_table", schema.clone())
489            .await
490            .unwrap();
491
492        // Verify table exists
493        assert!(store.table_exists("test_table").await.unwrap());
494
495        // List tables
496        let tables = store.table_names().await.unwrap();
497        assert!(tables.contains(&"test_table".to_string()));
498    }
499
500    #[tokio::test]
501    async fn test_create_table_with_data() {
502        let temp_dir = TempDir::new().unwrap();
503        let uri = temp_dir.path().to_str().unwrap();
504
505        let store = LanceDbStore::connect(uri).await.unwrap();
506
507        // Create a batch with data
508        let schema = Arc::new(ArrowSchema::new(vec![
509            Field::new("id", DataType::Int64, false),
510            Field::new("name", DataType::Utf8, true),
511        ]));
512
513        let batch = RecordBatch::try_new(
514            schema.clone(),
515            vec![
516                Arc::new(Int64Array::from(vec![1, 2, 3])),
517                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
518            ],
519        )
520        .unwrap();
521
522        // Create table with data
523        let table = store.create_table("users", vec![batch]).await.unwrap();
524
525        // Verify row count
526        let count = table.count_rows(None).await.unwrap();
527        assert_eq!(count, 3);
528    }
529
530    #[tokio::test]
531    async fn test_vertex_table_operations() {
532        let temp_dir = TempDir::new().unwrap();
533        let uri = temp_dir.path().to_str().unwrap();
534
535        let store = LanceDbStore::connect(uri).await.unwrap();
536
537        // Verify table naming convention
538        assert_eq!(LanceDbStore::vertex_table_name("Person"), "vertices_Person");
539
540        // Create a vertex-like schema
541        let schema = Arc::new(ArrowSchema::new(vec![
542            Field::new("_vid", DataType::UInt64, false),
543            Field::new("_deleted", DataType::Boolean, false),
544            Field::new("_version", DataType::UInt64, false),
545            Field::new("name", DataType::Utf8, true),
546        ]));
547
548        // Open or create vertex table
549        let table = store
550            .open_or_create_vertex_table("Person", schema)
551            .await
552            .unwrap();
553
554        // Verify table was created
555        assert!(store.vertex_table_exists("Person").await.unwrap());
556
557        // Verify row count is 0 for empty table
558        let count = table.count_rows(None).await.unwrap();
559        assert_eq!(count, 0);
560    }
561
562    #[tokio::test]
563    async fn test_append_to_table() {
564        use arrow_array::UInt64Array;
565
566        let temp_dir = TempDir::new().unwrap();
567        let uri = temp_dir.path().to_str().unwrap();
568
569        let store = LanceDbStore::connect(uri).await.unwrap();
570
571        let schema = Arc::new(ArrowSchema::new(vec![
572            Field::new("id", DataType::UInt64, false),
573            Field::new("value", DataType::Int64, false),
574        ]));
575
576        // Create initial batch
577        let batch1 = RecordBatch::try_new(
578            schema.clone(),
579            vec![
580                Arc::new(UInt64Array::from(vec![1, 2])),
581                Arc::new(Int64Array::from(vec![100, 200])),
582            ],
583        )
584        .unwrap();
585
586        let table = store.create_table("test", vec![batch1]).await.unwrap();
587        assert_eq!(table.count_rows(None).await.unwrap(), 2);
588
589        // Append more data
590        let batch2 = RecordBatch::try_new(
591            schema.clone(),
592            vec![
593                Arc::new(UInt64Array::from(vec![3, 4, 5])),
594                Arc::new(Int64Array::from(vec![300, 400, 500])),
595            ],
596        )
597        .unwrap();
598
599        store.append_to_table(&table, vec![batch2]).await.unwrap();
600
601        // Verify total row count
602        let count = table.count_rows(None).await.unwrap();
603        assert_eq!(count, 5);
604    }
605
606    #[tokio::test]
607    async fn test_replace_table_atomic_success() {
608        use arrow_array::UInt64Array;
609
610        let temp_dir = TempDir::new().unwrap();
611        let uri = temp_dir.path().to_str().unwrap();
612
613        let store = LanceDbStore::connect(uri).await.unwrap();
614
615        let schema = Arc::new(ArrowSchema::new(vec![
616            Field::new("id", DataType::UInt64, false),
617            Field::new("value", DataType::Int64, false),
618        ]));
619
620        // Create initial table with data
621        let batch1 = RecordBatch::try_new(
622            schema.clone(),
623            vec![
624                Arc::new(UInt64Array::from(vec![1, 2, 3])),
625                Arc::new(Int64Array::from(vec![100, 200, 300])),
626            ],
627        )
628        .unwrap();
629
630        store.create_table("test", vec![batch1]).await.unwrap();
631
632        // Replace with new data
633        let batch2 = RecordBatch::try_new(
634            schema.clone(),
635            vec![
636                Arc::new(UInt64Array::from(vec![4, 5])),
637                Arc::new(Int64Array::from(vec![400, 500])),
638            ],
639        )
640        .unwrap();
641
642        let table = store
643            .replace_table_atomic("test", vec![batch2], schema.clone())
644            .await
645            .unwrap();
646
647        // Verify new data replaced old data
648        assert_eq!(table.count_rows(None).await.unwrap(), 2);
649
650        // Verify no staging table was created (overwrite mode doesn't use staging)
651        assert!(!store.table_exists("test_staging").await.unwrap());
652    }
653
654    #[tokio::test]
655    async fn test_replace_table_atomic_empty_data() {
656        let temp_dir = TempDir::new().unwrap();
657        let uri = temp_dir.path().to_str().unwrap();
658
659        let store = LanceDbStore::connect(uri).await.unwrap();
660
661        let schema = Arc::new(ArrowSchema::new(vec![
662            Field::new("id", DataType::Int64, false),
663            Field::new("name", DataType::Utf8, true),
664        ]));
665
666        // Create table with empty data (table doesn't exist yet)
667        let table = store
668            .replace_table_atomic("test", vec![], schema.clone())
669            .await
670            .unwrap();
671
672        // Verify table exists and is empty
673        assert_eq!(table.count_rows(None).await.unwrap(), 0);
674
675        // Verify no staging table was created
676        assert!(!store.table_exists("test_staging").await.unwrap());
677    }
678
679    #[tokio::test]
680    async fn test_replace_table_atomic_overwrite_existing_with_empty() {
681        use arrow_array::UInt64Array;
682
683        let temp_dir = TempDir::new().unwrap();
684        let uri = temp_dir.path().to_str().unwrap();
685
686        let store = LanceDbStore::connect(uri).await.unwrap();
687
688        let schema = Arc::new(ArrowSchema::new(vec![
689            Field::new("id", DataType::UInt64, false),
690            Field::new("value", DataType::Int64, false),
691        ]));
692
693        // Create initial table with data
694        let batch = RecordBatch::try_new(
695            schema.clone(),
696            vec![
697                Arc::new(UInt64Array::from(vec![1, 2, 3])),
698                Arc::new(Int64Array::from(vec![100, 200, 300])),
699            ],
700        )
701        .unwrap();
702
703        store.create_table("test", vec![batch]).await.unwrap();
704
705        // Replace existing table with empty data (clears all rows)
706        let table = store
707            .replace_table_atomic("test", vec![], schema.clone())
708            .await
709            .unwrap();
710
711        // Verify table is now empty
712        assert_eq!(table.count_rows(None).await.unwrap(), 0);
713    }
714
715    #[tokio::test]
716    async fn test_recover_staging_no_staging() {
717        let temp_dir = TempDir::new().unwrap();
718        let uri = temp_dir.path().to_str().unwrap();
719
720        let store = LanceDbStore::connect(uri).await.unwrap();
721
722        // Call recover_staging when no staging table exists
723        store.recover_staging("test").await.unwrap();
724
725        // Verify no tables were created
726        assert!(!store.table_exists("test").await.unwrap());
727        assert!(!store.table_exists("test_staging").await.unwrap());
728    }
729
730    #[tokio::test]
731    async fn test_recover_staging_both_exist() {
732        use arrow_array::UInt64Array;
733
734        let temp_dir = TempDir::new().unwrap();
735        let uri = temp_dir.path().to_str().unwrap();
736
737        let store = LanceDbStore::connect(uri).await.unwrap();
738
739        let schema = Arc::new(ArrowSchema::new(vec![
740            Field::new("id", DataType::UInt64, false),
741            Field::new("value", DataType::Int64, false),
742        ]));
743
744        // Create both main and staging tables (simulating completed operation with leftover staging)
745        let batch_main = RecordBatch::try_new(
746            schema.clone(),
747            vec![
748                Arc::new(UInt64Array::from(vec![1, 2])),
749                Arc::new(Int64Array::from(vec![100, 200])),
750            ],
751        )
752        .unwrap();
753
754        let batch_staging = RecordBatch::try_new(
755            schema.clone(),
756            vec![
757                Arc::new(UInt64Array::from(vec![3, 4])),
758                Arc::new(Int64Array::from(vec![300, 400])),
759            ],
760        )
761        .unwrap();
762
763        store.create_table("test", vec![batch_main]).await.unwrap();
764        store
765            .create_table("test_staging", vec![batch_staging])
766            .await
767            .unwrap();
768
769        // Recover - should drop staging and keep main
770        store.recover_staging("test").await.unwrap();
771
772        // Verify main table still exists with original data
773        let table = store.open_table("test").await.unwrap();
774        assert_eq!(table.count_rows(None).await.unwrap(), 2);
775
776        // Verify staging was cleaned up
777        assert!(!store.table_exists("test_staging").await.unwrap());
778    }
779
780    #[tokio::test]
781    async fn test_recover_staging_main_missing() {
782        use arrow_array::UInt64Array;
783
784        let temp_dir = TempDir::new().unwrap();
785        let uri = temp_dir.path().to_str().unwrap();
786
787        let store = LanceDbStore::connect(uri).await.unwrap();
788
789        let schema = Arc::new(ArrowSchema::new(vec![
790            Field::new("id", DataType::UInt64, false),
791            Field::new("value", DataType::Int64, false),
792        ]));
793
794        // Create only staging table (simulating crash between drop and create)
795        let batch_staging = RecordBatch::try_new(
796            schema.clone(),
797            vec![
798                Arc::new(UInt64Array::from(vec![1, 2, 3])),
799                Arc::new(Int64Array::from(vec![100, 200, 300])),
800            ],
801        )
802        .unwrap();
803
804        store
805            .create_table("test_staging", vec![batch_staging])
806            .await
807            .unwrap();
808
809        // Recover - should restore main from staging
810        store.recover_staging("test").await.unwrap();
811
812        // Verify main table was restored with staging data
813        let table = store.open_table("test").await.unwrap();
814        assert_eq!(table.count_rows(None).await.unwrap(), 3);
815
816        // Verify staging was cleaned up
817        assert!(!store.table_exists("test_staging").await.unwrap());
818    }
819
820    #[tokio::test]
821    async fn test_recover_staging_empty_staging() {
822        let temp_dir = TempDir::new().unwrap();
823        let uri = temp_dir.path().to_str().unwrap();
824
825        let store = LanceDbStore::connect(uri).await.unwrap();
826
827        let schema = Arc::new(ArrowSchema::new(vec![
828            Field::new("id", DataType::Int64, false),
829            Field::new("name", DataType::Utf8, true),
830        ]));
831
832        // Create empty staging table (simulating crash with empty data)
833        store
834            .create_empty_table("test_staging", schema.clone())
835            .await
836            .unwrap();
837
838        // Recover - should restore empty main table from staging
839        store.recover_staging("test").await.unwrap();
840
841        // Verify main table was restored as empty
842        let table = store.open_table("test").await.unwrap();
843        assert_eq!(table.count_rows(None).await.unwrap(), 0);
844
845        // Verify staging was cleaned up
846        assert!(!store.table_exists("test_staging").await.unwrap());
847    }
848}