Skip to main content

uni_store/storage/
vertex.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use anyhow::{Result, anyhow};
8use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
9use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
10use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
11use lance::dataset::Dataset;
12use lancedb::Table;
13use lancedb::index::Index as LanceDbIndex;
14use lancedb::index::scalar::BTreeIndexBuilder;
15use sha3::{Digest, Sha3_256};
16use std::collections::HashMap;
17use std::sync::Arc;
18use uni_common::Properties;
19use uni_common::core::id::{UniId, Vid};
20use uni_common::core::schema::Schema;
21
22pub struct VertexDataset {
23    uri: String,
24    label: String,
25    _label_id: u16,
26}
27
28impl VertexDataset {
29    pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
30        let uri = format!("{}/vertices_{}", base_uri, label);
31        Self {
32            uri,
33            label: label.to_string(),
34            _label_id: label_id,
35        }
36    }
37
38    /// Compute UniId from vertex content.
39    /// Canonical form: sorted JSON of (label, ext_id, properties)
40    pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
41        let mut hasher = Sha3_256::new();
42
43        // Include label
44        hasher.update(label.as_bytes());
45        hasher.update(b"\x00"); // separator
46
47        // Include ext_id if present
48        if let Some(eid) = ext_id {
49            hasher.update(eid.as_bytes());
50        }
51        hasher.update(b"\x00");
52
53        // Include sorted properties for determinism
54        let mut sorted_props: Vec<_> = properties.iter().collect();
55        sorted_props.sort_by_key(|(k, _)| *k);
56        for (key, value) in sorted_props {
57            hasher.update(key.as_bytes());
58            hasher.update(b"=");
59            hasher.update(value.to_string().as_bytes());
60            hasher.update(b"\x00");
61        }
62
63        let hash: [u8; 32] = hasher.finalize().into();
64        UniId::from_bytes(hash)
65    }
66
67    pub async fn open(&self) -> Result<Arc<Dataset>> {
68        self.open_at(None).await
69    }
70
71    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
72        let mut ds = Dataset::open(&self.uri).await?;
73        if let Some(v) = version {
74            ds = ds.checkout_version(v).await?;
75        }
76        Ok(Arc::new(ds))
77    }
78
79    pub async fn open_raw(&self) -> Result<Dataset> {
80        let ds = Dataset::open(&self.uri).await?;
81        Ok(ds)
82    }
83
84    /// Build a record batch from vertices with optional timestamp metadata.
85    ///
86    /// If timestamps are not provided, they default to None (null).
87    pub fn build_record_batch(
88        &self,
89        vertices: &[(Vid, Vec<String>, Properties)],
90        deleted: &[bool],
91        versions: &[u64],
92        schema: &Schema,
93    ) -> Result<RecordBatch> {
94        self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
95    }
96
97    /// Build a record batch with explicit timestamp metadata.
98    ///
99    /// # Arguments
100    /// * `vertices` - Vertex ID, labels, and properties triples
101    /// * `deleted` - Deletion flags per vertex
102    /// * `versions` - Version numbers per vertex
103    /// * `schema` - Database schema
104    /// * `created_at` - Optional map of Vid -> nanoseconds since epoch
105    /// * `updated_at` - Optional map of Vid -> nanoseconds since epoch
106    pub fn build_record_batch_with_timestamps(
107        &self,
108        vertices: &[(Vid, Vec<String>, Properties)],
109        deleted: &[bool],
110        versions: &[u64],
111        schema: &Schema,
112        created_at: Option<&HashMap<Vid, i64>>,
113        updated_at: Option<&HashMap<Vid, i64>>,
114    ) -> Result<RecordBatch> {
115        let arrow_schema = self.get_arrow_schema(schema)?;
116        let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
117
118        let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
119        columns.push(Arc::new(UInt64Array::from(vids)));
120
121        let mut uid_builder = FixedSizeBinaryBuilder::new(32);
122        for (_vid, _labels, props) in vertices.iter() {
123            let ext_id = props.get("ext_id").and_then(|v| v.as_str());
124            let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
125            uid_builder.append_value(uid.as_bytes())?;
126        }
127        columns.push(Arc::new(uid_builder.finish()));
128
129        columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
130        columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
131
132        // Build ext_id column (extracted from properties as dedicated column)
133        let mut ext_id_builder = StringBuilder::new();
134        for (_vid, _labels, props) in vertices.iter() {
135            if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
136                ext_id_builder.append_value(ext_id_val);
137            } else {
138                ext_id_builder.append_null();
139            }
140        }
141        columns.push(Arc::new(ext_id_builder.finish()));
142
143        // Build _labels column (List<Utf8>)
144        let mut labels_builder = ListBuilder::new(StringBuilder::new());
145        for (_vid, labels, _props) in vertices.iter() {
146            let values = labels_builder.values();
147            for lbl in labels {
148                values.append_value(lbl);
149            }
150            labels_builder.append(true);
151        }
152        columns.push(Arc::new(labels_builder.finish()));
153
154        // Build _created_at and _updated_at columns using shared builder
155        let vids = vertices.iter().map(|(v, _, _)| *v);
156        columns.push(build_timestamp_column_from_vid_map(
157            vids.clone(),
158            created_at,
159        ));
160        columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
161
162        // Build property columns using shared builder
163        let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
164            .with_deleted(deleted)
165            .build(|i| &vertices[i].2)?;
166
167        columns.extend(prop_columns);
168
169        // Build overflow_json column for non-schema properties
170        let overflow_column = self.build_overflow_json_column(vertices, schema)?;
171        columns.push(overflow_column);
172
173        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
174    }
175
176    /// Build the overflow_json column containing properties not in schema.
177    fn build_overflow_json_column(
178        &self,
179        vertices: &[(Vid, Vec<String>, Properties)],
180        schema: &Schema,
181    ) -> Result<ArrayRef> {
182        crate::storage::property_builder::build_overflow_json_column(
183            vertices.len(),
184            &self.label,
185            schema,
186            |i| &vertices[i].2,
187            &["ext_id"],
188        )
189    }
190
191    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
192        let mut fields = vec![
193            Field::new("_vid", arrow_schema::DataType::UInt64, false),
194            Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
195            Field::new("_deleted", arrow_schema::DataType::Boolean, false),
196            Field::new("_version", arrow_schema::DataType::UInt64, false),
197            // New metadata columns per STORAGE_DESIGN.md
198            Field::new("ext_id", arrow_schema::DataType::Utf8, true),
199            Field::new(
200                "_labels",
201                arrow_schema::DataType::List(Arc::new(Field::new(
202                    "item",
203                    arrow_schema::DataType::Utf8,
204                    true,
205                ))),
206                true,
207            ),
208            Field::new(
209                "_created_at",
210                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
211                true,
212            ),
213            Field::new(
214                "_updated_at",
215                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
216                true,
217            ),
218        ];
219
220        if let Some(label_props) = schema.properties.get(&self.label) {
221            let mut sorted_props: Vec<_> = label_props.iter().collect();
222            sorted_props.sort_by_key(|(name, _)| *name);
223
224            for (name, meta) in sorted_props {
225                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
226            }
227        }
228
229        // Add overflow_json column for non-schema properties (JSONB binary format)
230        fields.push(Field::new(
231            "overflow_json",
232            arrow_schema::DataType::LargeBinary,
233            true,
234        ));
235
236        Ok(Arc::new(ArrowSchema::new(fields)))
237    }
238
239    // ========================================================================
240    // LanceDB-based Methods
241    // ========================================================================
242
243    /// Open a vertex table using LanceDB.
244    ///
245    /// This is the preferred method for new code as it enables DataFusion queries.
246    pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
247        store.open_vertex_table(&self.label).await
248    }
249
250    /// Open or create a vertex table using LanceDB.
251    pub async fn open_or_create_lancedb(
252        &self,
253        store: &LanceDbStore,
254        schema: &Schema,
255    ) -> Result<Table> {
256        let arrow_schema = self.get_arrow_schema(schema)?;
257        store
258            .open_or_create_vertex_table(&self.label, arrow_schema)
259            .await
260    }
261
262    /// Write a batch to a LanceDB vertex table.
263    ///
264    /// Creates the table if it doesn't exist, otherwise appends to it.
265    pub async fn write_batch_lancedb(
266        &self,
267        store: &LanceDbStore,
268        batch: RecordBatch,
269        _schema: &Schema,
270    ) -> Result<Table> {
271        let table_name = LanceDbStore::vertex_table_name(&self.label);
272
273        if store.table_exists(&table_name).await? {
274            let table = store.open_table(&table_name).await?;
275            store.append_to_table(&table, vec![batch]).await?;
276            Ok(table)
277        } else {
278            store.create_table(&table_name, vec![batch]).await
279        }
280    }
281
282    /// Ensure default scalar indexes exist on system columns (_vid, _uid, ext_id) using LanceDB.
283    ///
284    /// LanceDB uses BTree indexes for scalar columns.
285    pub async fn ensure_default_indexes_lancedb(&self, table: &Table) -> Result<()> {
286        let indices = table
287            .list_indices()
288            .await
289            .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
290
291        let has_index = |col: &str| {
292            indices
293                .iter()
294                .any(|idx| idx.columns.contains(&col.to_string()))
295        };
296
297        for column in &["_vid", "_uid", "ext_id"] {
298            if has_index(column) {
299                continue;
300            }
301            log::info!(
302                "Creating {} BTree index for label '{}' via LanceDB",
303                column,
304                self.label
305            );
306            if let Err(e) = table
307                .create_index(&[column], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
308                .execute()
309                .await
310            {
311                log::warn!(
312                    "Failed to create {} index for '{}' via LanceDB: {}",
313                    column,
314                    self.label,
315                    e
316                );
317            }
318        }
319
320        Ok(())
321    }
322
323    /// Get the LanceDB table name for this vertex dataset.
324    pub fn lancedb_table_name(&self) -> String {
325        LanceDbStore::vertex_table_name(&self.label)
326    }
327
328    /// Replace a vertex table's contents using LanceDB atomically.
329    ///
330    /// This uses a staging table for crash-safe replacement. Used by compaction
331    /// to rewrite the table with merged data.
332    pub async fn replace_lancedb(
333        &self,
334        store: &LanceDbStore,
335        batch: RecordBatch,
336        schema: &Schema,
337    ) -> Result<Table> {
338        let table_name = self.lancedb_table_name();
339        let arrow_schema = self.get_arrow_schema(schema)?;
340        store
341            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
342            .await
343    }
344}