genegraph_storage/traits/
lance.rs1use crate::{StorageError, StorageResult};
2use arrow_array::RecordBatch;
3use log::{debug, info};
4
5use arrow_array::RecordBatchIterator;
6use futures::StreamExt;
7use lance::dataset::{Dataset, WriteMode, WriteParams};
8
9pub trait LanceStorage {
10 async fn write_lance_batch_async(&self, uri: String, batch: RecordBatch) -> StorageResult<()> {
12 info!("Writing Lance dataset to {}", uri);
13
14 let schema = batch.schema();
15 let batches = vec![batch];
16 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
17
18 let params = WriteParams {
19 mode: WriteMode::Create,
20 ..WriteParams::default()
21 };
22
23 Dataset::write(reader, &uri, Some(params))
24 .await
25 .map_err(|e| StorageError::Lance(e.to_string()))?;
26
27 info!("Successfully wrote Lance dataset to {}", uri);
28 Ok(())
29 }
30
31 async fn read_lance_all_batches_async(&self, uri: String) -> StorageResult<RecordBatch> {
33 info!("Reading Lance dataset from {}", uri);
34
35 let dataset = Dataset::open(&uri)
36 .await
37 .map_err(|e| StorageError::Lance(e.to_string()))?;
38 let scanner = dataset.scan();
39 let mut stream = scanner
40 .try_into_stream()
41 .await
42 .map_err(|e| StorageError::Lance(e.to_string()))?;
43
44 let mut batches = Vec::new();
45 while let Some(batch_result) = stream.next().await {
46 let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
47 batches.push(batch);
48 }
49
50 if batches.is_empty() {
51 return Err(StorageError::Invalid("Empty Lance dataset".into()));
52 }
53
54 let schema = batches[0].schema();
55 let combined = arrow::compute::concat_batches(&schema, &batches)
56 .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
57
58 debug!(
59 "Combined Lance batch for {:?} has {} rows",
60 uri,
61 combined.num_rows()
62 );
63 Ok(combined)
64 }
65
66 async fn read_lance_first_batch_async(&self, uri: String) -> StorageResult<RecordBatch> {
68 info!("Reading first batch from Lance dataset {}", uri);
69
70 let dataset = Dataset::open(&uri)
71 .await
72 .map_err(|e| StorageError::Lance(e.to_string()))?;
73 let scanner = dataset.scan();
74 let mut stream = scanner
75 .try_into_stream()
76 .await
77 .map_err(|e| StorageError::Lance(e.to_string()))?;
78
79 let batch = stream
80 .next()
81 .await
82 .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
83 .map_err(|e| StorageError::Lance(e.to_string()))?;
84
85 debug!(
86 "Read first RecordBatch for path {:?} with {} rows",
87 uri,
88 batch.num_rows()
89 );
90 Ok(batch)
91 }
92}