genegraph_storage/traits/
lance.rs

1use crate::{StorageError, StorageResult};
2use arrow_array::RecordBatch;
3use log::{debug, info};
4
5use arrow_array::RecordBatchIterator;
6use futures::StreamExt;
7use lance::dataset::{Dataset, WriteMode, WriteParams};
8
9pub trait LanceStorage {
10    /// Async helper: write a RecordBatch to a Lance dataset.
11    async fn write_lance_batch_async(&self, uri: String, batch: RecordBatch) -> StorageResult<()> {
12        info!("Writing Lance dataset to {}", uri);
13
14        let schema = batch.schema();
15        let batches = vec![batch];
16        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
17
18        let params = WriteParams {
19            mode: WriteMode::Create,
20            ..WriteParams::default()
21        };
22
23        Dataset::write(reader, &uri, Some(params))
24            .await
25            .map_err(|e| StorageError::Lance(e.to_string()))?;
26
27        info!("Successfully wrote Lance dataset to {}", uri);
28        Ok(())
29    }
30
31    /// Async helper: read and concatenate all RecordBatches from a Lance dataset.
32    async fn read_lance_all_batches_async(&self, uri: String) -> StorageResult<RecordBatch> {
33        info!("Reading Lance dataset from {}", uri);
34
35        let dataset = Dataset::open(&uri)
36            .await
37            .map_err(|e| StorageError::Lance(e.to_string()))?;
38        let scanner = dataset.scan();
39        let mut stream = scanner
40            .try_into_stream()
41            .await
42            .map_err(|e| StorageError::Lance(e.to_string()))?;
43
44        let mut batches = Vec::new();
45        while let Some(batch_result) = stream.next().await {
46            let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
47            batches.push(batch);
48        }
49
50        if batches.is_empty() {
51            return Err(StorageError::Invalid("Empty Lance dataset".into()));
52        }
53
54        let schema = batches[0].schema();
55        let combined = arrow::compute::concat_batches(&schema, &batches)
56            .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
57
58        debug!(
59            "Combined Lance batch for {:?} has {} rows",
60            uri,
61            combined.num_rows()
62        );
63        Ok(combined)
64    }
65
66    /// Async helper: read the first RecordBatch from a Lance dataset.
67    async fn read_lance_first_batch_async(&self, uri: String) -> StorageResult<RecordBatch> {
68        info!("Reading first batch from Lance dataset {}", uri);
69
70        let dataset = Dataset::open(&uri)
71            .await
72            .map_err(|e| StorageError::Lance(e.to_string()))?;
73        let scanner = dataset.scan();
74        let mut stream = scanner
75            .try_into_stream()
76            .await
77            .map_err(|e| StorageError::Lance(e.to_string()))?;
78
79        let batch = stream
80            .next()
81            .await
82            .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
83            .map_err(|e| StorageError::Lance(e.to_string()))?;
84
85        debug!(
86            "Read first RecordBatch for path {:?} with {} rows",
87            uri,
88            batch.num_rows()
89        );
90        Ok(batch)
91    }
92}