Skip to main content

uni_store/storage/
vertex.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::backend::StorageBackend;
5use crate::backend::table_names;
6use crate::backend::types::{ScalarIndexType, WriteMode};
7use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
8use crate::storage::property_builder::PropertyColumnBuilder;
9use anyhow::{Result, anyhow};
10use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
11use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
12use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
13#[cfg(feature = "lance-backend")]
14use lance::dataset::Dataset;
15use sha3::{Digest, Sha3_256};
16use std::collections::HashMap;
17use std::sync::Arc;
18use uni_common::Properties;
19use uni_common::core::id::{UniId, Vid};
20use uni_common::core::schema::Schema;
21
22pub struct VertexDataset {
23    #[cfg_attr(not(feature = "lance-backend"), allow(dead_code))]
24    uri: String,
25    label: String,
26    _label_id: u16,
27}
28
29impl VertexDataset {
30    pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
31        let uri = format!("{}/vertices_{}", base_uri, label);
32        Self {
33            uri,
34            label: label.to_string(),
35            _label_id: label_id,
36        }
37    }
38
39    /// Compute UniId from vertex content.
40    /// Canonical form: sorted JSON of (label, ext_id, properties)
41    pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
42        let mut hasher = Sha3_256::new();
43
44        // Include label
45        hasher.update(label.as_bytes());
46        hasher.update(b"\x00"); // separator
47
48        // Include ext_id if present
49        if let Some(eid) = ext_id {
50            hasher.update(eid.as_bytes());
51        }
52        hasher.update(b"\x00");
53
54        // Include sorted properties for determinism
55        let mut sorted_props: Vec<_> = properties.iter().collect();
56        sorted_props.sort_by_key(|(k, _)| *k);
57        for (key, value) in sorted_props {
58            hasher.update(key.as_bytes());
59            hasher.update(b"=");
60            hasher.update(value.to_string().as_bytes());
61            hasher.update(b"\x00");
62        }
63
64        let hash: [u8; 32] = hasher.finalize().into();
65        UniId::from_bytes(hash)
66    }
67
68    #[cfg(feature = "lance-backend")]
69    pub async fn open(&self) -> Result<Arc<Dataset>> {
70        self.open_at(None).await
71    }
72
73    #[cfg(feature = "lance-backend")]
74    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
75        let mut ds = Dataset::open(&self.uri).await?;
76        if let Some(v) = version {
77            ds = ds.checkout_version(v).await?;
78        }
79        Ok(Arc::new(ds))
80    }
81
82    #[cfg(feature = "lance-backend")]
83    pub async fn open_raw(&self) -> Result<Dataset> {
84        let ds = Dataset::open(&self.uri).await?;
85        Ok(ds)
86    }
87
88    /// Build a record batch from vertices with optional timestamp metadata.
89    ///
90    /// If timestamps are not provided, they default to None (null).
91    pub fn build_record_batch(
92        &self,
93        vertices: &[(Vid, Vec<String>, Properties)],
94        deleted: &[bool],
95        versions: &[u64],
96        schema: &Schema,
97    ) -> Result<RecordBatch> {
98        self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
99    }
100
101    /// Build a record batch with explicit timestamp metadata.
102    ///
103    /// # Arguments
104    /// * `vertices` - Vertex ID, labels, and properties triples
105    /// * `deleted` - Deletion flags per vertex
106    /// * `versions` - Version numbers per vertex
107    /// * `schema` - Database schema
108    /// * `created_at` - Optional map of Vid -> nanoseconds since epoch
109    /// * `updated_at` - Optional map of Vid -> nanoseconds since epoch
110    pub fn build_record_batch_with_timestamps(
111        &self,
112        vertices: &[(Vid, Vec<String>, Properties)],
113        deleted: &[bool],
114        versions: &[u64],
115        schema: &Schema,
116        created_at: Option<&HashMap<Vid, i64>>,
117        updated_at: Option<&HashMap<Vid, i64>>,
118    ) -> Result<RecordBatch> {
119        let arrow_schema = self.get_arrow_schema(schema)?;
120        let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
121
122        let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
123        columns.push(Arc::new(UInt64Array::from(vids)));
124
125        let mut uid_builder = FixedSizeBinaryBuilder::new(32);
126        for (_vid, _labels, props) in vertices.iter() {
127            let ext_id = props.get("ext_id").and_then(|v| v.as_str());
128            let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
129            uid_builder.append_value(uid.as_bytes())?;
130        }
131        columns.push(Arc::new(uid_builder.finish()));
132
133        columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
134        columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
135
136        // Build ext_id column (extracted from properties as dedicated column)
137        let mut ext_id_builder = StringBuilder::new();
138        for (_vid, _labels, props) in vertices.iter() {
139            if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
140                ext_id_builder.append_value(ext_id_val);
141            } else {
142                ext_id_builder.append_null();
143            }
144        }
145        columns.push(Arc::new(ext_id_builder.finish()));
146
147        // Build _labels column (List<Utf8>)
148        let mut labels_builder = ListBuilder::new(StringBuilder::new());
149        for (_vid, labels, _props) in vertices.iter() {
150            let values = labels_builder.values();
151            for lbl in labels {
152                values.append_value(lbl);
153            }
154            labels_builder.append(true);
155        }
156        columns.push(Arc::new(labels_builder.finish()));
157
158        // Build _created_at and _updated_at columns using shared builder
159        let vids = vertices.iter().map(|(v, _, _)| *v);
160        columns.push(build_timestamp_column_from_vid_map(
161            vids.clone(),
162            created_at,
163        ));
164        columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
165
166        // Build property columns using shared builder
167        let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
168            .with_deleted(deleted)
169            .build(|i| &vertices[i].2)?;
170
171        columns.extend(prop_columns);
172
173        // Build overflow_json column for non-schema properties
174        let overflow_column = self.build_overflow_json_column(vertices, schema)?;
175        columns.push(overflow_column);
176
177        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
178    }
179
180    /// Build the overflow_json column containing properties not in schema.
181    fn build_overflow_json_column(
182        &self,
183        vertices: &[(Vid, Vec<String>, Properties)],
184        schema: &Schema,
185    ) -> Result<ArrayRef> {
186        crate::storage::property_builder::build_overflow_json_column(
187            vertices.len(),
188            &self.label,
189            schema,
190            |i| &vertices[i].2,
191            &["ext_id"],
192        )
193    }
194
195    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
196        let mut fields = vec![
197            Field::new("_vid", arrow_schema::DataType::UInt64, false),
198            Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
199            Field::new("_deleted", arrow_schema::DataType::Boolean, false),
200            Field::new("_version", arrow_schema::DataType::UInt64, false),
201            // New metadata columns per STORAGE_DESIGN.md
202            Field::new("ext_id", arrow_schema::DataType::Utf8, true),
203            Field::new(
204                "_labels",
205                arrow_schema::DataType::List(Arc::new(Field::new(
206                    "item",
207                    arrow_schema::DataType::Utf8,
208                    true,
209                ))),
210                true,
211            ),
212            Field::new(
213                "_created_at",
214                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
215                true,
216            ),
217            Field::new(
218                "_updated_at",
219                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
220                true,
221            ),
222        ];
223
224        if let Some(label_props) = schema.properties.get(&self.label) {
225            let mut sorted_props: Vec<_> = label_props.iter().collect();
226            sorted_props.sort_by_key(|(name, _)| *name);
227
228            for (name, meta) in sorted_props {
229                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
230            }
231        }
232
233        // Add overflow_json column for non-schema properties (JSONB binary format)
234        fields.push(Field::new(
235            "overflow_json",
236            arrow_schema::DataType::LargeBinary,
237            true,
238        ));
239
240        Ok(Arc::new(ArrowSchema::new(fields)))
241    }
242
243    // ========================================================================
244    // Backend-agnostic Methods
245    // ========================================================================
246
247    /// Open or create a vertex table via the storage backend.
248    pub async fn open_or_create(
249        &self,
250        backend: &dyn StorageBackend,
251        schema: &Schema,
252    ) -> Result<()> {
253        let table_name = table_names::vertex_table_name(&self.label);
254        let arrow_schema = self.get_arrow_schema(schema)?;
255        backend
256            .open_or_create_table(&table_name, arrow_schema)
257            .await
258    }
259
260    /// Write a batch to a vertex table.
261    ///
262    /// Creates the table if it doesn't exist, otherwise appends to it.
263    pub async fn write_batch(
264        &self,
265        backend: &dyn StorageBackend,
266        batch: RecordBatch,
267        _schema: &Schema,
268    ) -> Result<()> {
269        let table_name = table_names::vertex_table_name(&self.label);
270        if backend.table_exists(&table_name).await? {
271            backend
272                .write(&table_name, vec![batch], WriteMode::Append)
273                .await
274        } else {
275            backend.create_table(&table_name, vec![batch]).await
276        }
277    }
278
279    /// Ensure default scalar indexes exist on system columns (_vid, _uid, ext_id).
280    pub async fn ensure_default_indexes(&self, backend: &dyn StorageBackend) -> Result<()> {
281        let table_name = table_names::vertex_table_name(&self.label);
282        let indices = backend.list_indexes(&table_name).await?;
283
284        let has_index = |col: &str| {
285            indices
286                .iter()
287                .any(|idx| idx.columns.contains(&col.to_string()))
288        };
289
290        for column in &["_vid", "_uid", "ext_id"] {
291            if has_index(column) {
292                continue;
293            }
294            log::info!("Creating {} BTree index for label '{}'", column, self.label);
295            if let Err(e) = backend
296                .create_scalar_index(&table_name, column, ScalarIndexType::BTree)
297                .await
298            {
299                log::warn!(
300                    "Failed to create {} index for '{}': {}",
301                    column,
302                    self.label,
303                    e
304                );
305            }
306        }
307
308        Ok(())
309    }
310
311    /// Get the table name for this vertex dataset.
312    pub fn table_name(&self) -> String {
313        table_names::vertex_table_name(&self.label)
314    }
315
316    /// Replace a vertex table's contents atomically.
317    ///
318    /// Used by compaction to rewrite the table with merged data.
319    pub async fn replace(
320        &self,
321        backend: &dyn StorageBackend,
322        batch: RecordBatch,
323        schema: &Schema,
324    ) -> Result<()> {
325        let table_name = self.table_name();
326        let arrow_schema = self.get_arrow_schema(schema)?;
327        backend
328            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
329            .await
330    }
331}