Skip to main content

uni_store/storage/
edge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use arrow_array::{RecordBatch, RecordBatchIterator};
6use arrow_schema::{Field, Schema as ArrowSchema};
7use lance::dataset::{Dataset, WriteMode, WriteParams};
8use std::sync::Arc;
9use uni_common::core::schema::Schema;
10
11pub struct EdgeDataset {
12    uri: String,
13    edge_type: String,
14}
15
16impl EdgeDataset {
17    pub fn new(base_uri: &str, edge_type: &str, _src_label: &str, _dst_label: &str) -> Self {
18        let uri = format!("{}/edges/{}", base_uri, edge_type);
19        Self {
20            uri,
21            edge_type: edge_type.to_string(),
22        }
23    }
24
25    pub async fn open(&self) -> Result<Arc<Dataset>> {
26        let ds = Dataset::open(&self.uri).await?;
27        Ok(Arc::new(ds))
28    }
29
30    pub async fn write_batch(&self, batch: RecordBatch, mode: WriteMode) -> Result<Arc<Dataset>> {
31        let arrow_schema = batch.schema();
32        let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), arrow_schema);
33
34        let params = WriteParams {
35            mode,
36            ..Default::default()
37        };
38
39        let ds = Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
40        Ok(Arc::new(ds))
41    }
42
43    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
44        let mut fields = vec![
45            Field::new("eid", arrow_schema::DataType::UInt64, false),
46            Field::new("src_vid", arrow_schema::DataType::UInt64, false),
47            Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
48            Field::new("_deleted", arrow_schema::DataType::Boolean, false),
49            Field::new("_version", arrow_schema::DataType::UInt64, false),
50        ];
51
52        if let Some(type_props) = schema.properties.get(&self.edge_type) {
53            let mut sorted_props: Vec<_> = type_props.iter().collect();
54            sorted_props.sort_by_key(|(name, _)| *name);
55
56            for (name, meta) in sorted_props {
57                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
58            }
59        }
60
61        Ok(Arc::new(ArrowSchema::new(fields)))
62    }
63}