Skip to main content

uni_store/storage/
edge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use arrow_array::{RecordBatch, RecordBatchIterator};
6use arrow_schema::{Field, Schema as ArrowSchema};
7use lance::dataset::{Dataset, WriteMode, WriteParams};
8use std::sync::Arc;
9use uni_common::core::schema::Schema;
10
11pub struct EdgeDataset {
12    uri: String,
13    edge_type: String,
14    /// Lance branch for branched reads. `None` = primary.
15    branch: Option<String>,
16}
17
18impl EdgeDataset {
19    pub fn new(base_uri: &str, edge_type: &str, _src_label: &str, _dst_label: &str) -> Self {
20        let uri = format!("{}/edges_{}", base_uri, edge_type);
21        Self {
22            uri,
23            edge_type: edge_type.to_string(),
24            branch: None,
25        }
26    }
27
28    /// Construct an edge dataset that reads from a Lance branch.
29    pub fn new_branched(
30        base_uri: &str,
31        edge_type: &str,
32        src_label: &str,
33        dst_label: &str,
34        branch: impl Into<String>,
35    ) -> Self {
36        let mut ds = Self::new(base_uri, edge_type, src_label, dst_label);
37        ds.branch = Some(branch.into());
38        ds
39    }
40
41    pub async fn open(&self) -> Result<Arc<Dataset>> {
42        let ds = match &self.branch {
43            Some(branch) => crate::backend::lance_branch::open_branch(&self.uri, branch).await?,
44            None => Dataset::open(&self.uri).await?,
45        };
46        Ok(Arc::new(ds))
47    }
48
49    pub async fn write_batch(&self, batch: RecordBatch, mode: WriteMode) -> Result<Arc<Dataset>> {
50        let arrow_schema = batch.schema();
51        let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), arrow_schema);
52
53        let params = WriteParams {
54            mode,
55            ..Default::default()
56        };
57
58        let ds = Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
59        Ok(Arc::new(ds))
60    }
61
62    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
63        let mut fields = vec![
64            Field::new("eid", arrow_schema::DataType::UInt64, false),
65            Field::new("src_vid", arrow_schema::DataType::UInt64, false),
66            Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
67            Field::new("_deleted", arrow_schema::DataType::Boolean, false),
68            Field::new("_version", arrow_schema::DataType::UInt64, false),
69        ];
70
71        if let Some(type_props) = schema.properties.get(&self.edge_type) {
72            let mut sorted_props: Vec<_> = type_props.iter().collect();
73            sorted_props.sort_by_key(|(name, _)| *name);
74
75            for (name, meta) in sorted_props {
76                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
77            }
78        }
79
80        Ok(Arc::new(ArrowSchema::new(fields)))
81    }
82}