uni_store/storage/
edge.rs1use anyhow::Result;
5use arrow_array::{RecordBatch, RecordBatchIterator};
6use arrow_schema::{Field, Schema as ArrowSchema};
7use lance::dataset::{Dataset, WriteMode, WriteParams};
8use std::sync::Arc;
9use uni_common::core::schema::Schema;
10
11pub struct EdgeDataset {
12 uri: String,
13 edge_type: String,
14}
15
16impl EdgeDataset {
17 pub fn new(base_uri: &str, edge_type: &str, _src_label: &str, _dst_label: &str) -> Self {
18 let uri = format!("{}/edges/{}", base_uri, edge_type);
19 Self {
20 uri,
21 edge_type: edge_type.to_string(),
22 }
23 }
24
25 pub async fn open(&self) -> Result<Arc<Dataset>> {
26 let ds = Dataset::open(&self.uri).await?;
27 Ok(Arc::new(ds))
28 }
29
30 pub async fn write_batch(&self, batch: RecordBatch, mode: WriteMode) -> Result<Arc<Dataset>> {
31 let arrow_schema = batch.schema();
32 let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), arrow_schema);
33
34 let params = WriteParams {
35 mode,
36 ..Default::default()
37 };
38
39 let ds = Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
40 Ok(Arc::new(ds))
41 }
42
43 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
44 let mut fields = vec![
45 Field::new("eid", arrow_schema::DataType::UInt64, false),
46 Field::new("src_vid", arrow_schema::DataType::UInt64, false),
47 Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
48 Field::new("_deleted", arrow_schema::DataType::Boolean, false),
49 Field::new("_version", arrow_schema::DataType::UInt64, false),
50 ];
51
52 if let Some(type_props) = schema.properties.get(&self.edge_type) {
53 let mut sorted_props: Vec<_> = type_props.iter().collect();
54 sorted_props.sort_by_key(|(name, _)| *name);
55
56 for (name, meta) in sorted_props {
57 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
58 }
59 }
60
61 Ok(Arc::new(ArrowSchema::new(fields)))
62 }
63}