Skip to main content

uni_store/storage/
vertex.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use anyhow::{Result, anyhow};
8use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
9use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
10use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
11use lance::dataset::Dataset;
12use lancedb::Table;
13use lancedb::index::Index as LanceDbIndex;
14use lancedb::index::scalar::BTreeIndexBuilder;
15use sha3::{Digest, Sha3_256};
16use std::sync::Arc;
17use uni_common::Properties;
18use uni_common::core::id::{UniId, Vid};
19use uni_common::core::schema::Schema;
20
21pub struct VertexDataset {
22    uri: String,
23    label: String,
24    _label_id: u16,
25}
26
27impl VertexDataset {
28    pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
29        let uri = format!("{}/vertices/{}", base_uri, label);
30        Self {
31            uri,
32            label: label.to_string(),
33            _label_id: label_id,
34        }
35    }
36
37    /// Compute UniId from vertex content.
38    /// Canonical form: sorted JSON of (label, ext_id, properties)
39    pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
40        let mut hasher = Sha3_256::new();
41
42        // Include label
43        hasher.update(label.as_bytes());
44        hasher.update(b"\x00"); // separator
45
46        // Include ext_id if present
47        if let Some(eid) = ext_id {
48            hasher.update(eid.as_bytes());
49        }
50        hasher.update(b"\x00");
51
52        // Include sorted properties for determinism
53        let mut sorted_props: Vec<_> = properties.iter().collect();
54        sorted_props.sort_by_key(|(k, _)| *k);
55        for (key, value) in sorted_props {
56            hasher.update(key.as_bytes());
57            hasher.update(b"=");
58            hasher.update(value.to_string().as_bytes());
59            hasher.update(b"\x00");
60        }
61
62        let hash: [u8; 32] = hasher.finalize().into();
63        UniId::from_bytes(hash)
64    }
65
66    pub async fn open(&self) -> Result<Arc<Dataset>> {
67        self.open_at(None).await
68    }
69
70    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
71        let mut ds = Dataset::open(&self.uri).await?;
72        if let Some(v) = version {
73            ds = ds.checkout_version(v).await?;
74        }
75        Ok(Arc::new(ds))
76    }
77
78    pub async fn open_raw(&self) -> Result<Dataset> {
79        let ds = Dataset::open(&self.uri).await?;
80        Ok(ds)
81    }
82
83    /// Build a record batch from vertices with optional timestamp metadata.
84    ///
85    /// If timestamps are not provided, they default to None (null).
86    pub fn build_record_batch(
87        &self,
88        vertices: &[(Vid, Vec<String>, Properties)],
89        deleted: &[bool],
90        versions: &[u64],
91        schema: &Schema,
92    ) -> Result<RecordBatch> {
93        self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
94    }
95
96    /// Build a record batch with explicit timestamp metadata.
97    ///
98    /// # Arguments
99    /// * `vertices` - Vertex ID, labels, and properties triples
100    /// * `deleted` - Deletion flags per vertex
101    /// * `versions` - Version numbers per vertex
102    /// * `schema` - Database schema
103    /// * `created_at` - Optional map of Vid -> nanoseconds since epoch
104    /// * `updated_at` - Optional map of Vid -> nanoseconds since epoch
105    pub fn build_record_batch_with_timestamps(
106        &self,
107        vertices: &[(Vid, Vec<String>, Properties)],
108        deleted: &[bool],
109        versions: &[u64],
110        schema: &Schema,
111        created_at: Option<&std::collections::HashMap<uni_common::core::id::Vid, i64>>,
112        updated_at: Option<&std::collections::HashMap<uni_common::core::id::Vid, i64>>,
113    ) -> Result<RecordBatch> {
114        let arrow_schema = self.get_arrow_schema(schema)?;
115        let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
116
117        let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
118        columns.push(Arc::new(UInt64Array::from(vids)));
119
120        let mut uid_builder = FixedSizeBinaryBuilder::new(32);
121        for (_vid, _labels, props) in vertices.iter() {
122            let ext_id = props.get("ext_id").and_then(|v| v.as_str());
123            let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
124            uid_builder.append_value(uid.as_bytes())?;
125        }
126        columns.push(Arc::new(uid_builder.finish()));
127
128        columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
129        columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
130
131        // Build ext_id column (extracted from properties as dedicated column)
132        let mut ext_id_builder = StringBuilder::new();
133        for (_vid, _labels, props) in vertices.iter() {
134            if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
135                ext_id_builder.append_value(ext_id_val);
136            } else {
137                ext_id_builder.append_null();
138            }
139        }
140        columns.push(Arc::new(ext_id_builder.finish()));
141
142        // Build _labels column (List<Utf8>)
143        let mut labels_builder = ListBuilder::new(StringBuilder::new());
144        for (_vid, labels, _props) in vertices.iter() {
145            let values = labels_builder.values();
146            for lbl in labels {
147                values.append_value(lbl);
148            }
149            labels_builder.append(true);
150        }
151        columns.push(Arc::new(labels_builder.finish()));
152
153        // Build _created_at and _updated_at columns using shared builder
154        let vids = vertices.iter().map(|(v, _, _)| *v);
155        columns.push(build_timestamp_column_from_vid_map(
156            vids.clone(),
157            created_at,
158        ));
159        columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
160
161        // Build property columns using shared builder
162        let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
163            .with_deleted(deleted)
164            .build(|i| &vertices[i].2)?;
165
166        columns.extend(prop_columns);
167
168        // Build overflow_json column for non-schema properties
169        let overflow_column = self.build_overflow_json_column(vertices, schema)?;
170        columns.push(overflow_column);
171
172        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
173    }
174
175    /// Build the overflow_json column containing properties not in schema.
176    fn build_overflow_json_column(
177        &self,
178        vertices: &[(Vid, Vec<String>, Properties)],
179        schema: &Schema,
180    ) -> Result<ArrayRef> {
181        crate::storage::property_builder::build_overflow_json_column(
182            vertices.len(),
183            &self.label,
184            schema,
185            |i| &vertices[i].2,
186            &["ext_id"],
187        )
188    }
189
190    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
191        let mut fields = vec![
192            Field::new("_vid", arrow_schema::DataType::UInt64, false),
193            Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
194            Field::new("_deleted", arrow_schema::DataType::Boolean, false),
195            Field::new("_version", arrow_schema::DataType::UInt64, false),
196            // New metadata columns per STORAGE_DESIGN.md
197            Field::new("ext_id", arrow_schema::DataType::Utf8, true),
198            Field::new(
199                "_labels",
200                arrow_schema::DataType::List(Arc::new(Field::new(
201                    "item",
202                    arrow_schema::DataType::Utf8,
203                    true,
204                ))),
205                true,
206            ),
207            Field::new(
208                "_created_at",
209                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
210                true,
211            ),
212            Field::new(
213                "_updated_at",
214                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
215                true,
216            ),
217        ];
218
219        if let Some(label_props) = schema.properties.get(&self.label) {
220            let mut sorted_props: Vec<_> = label_props.iter().collect();
221            sorted_props.sort_by_key(|(name, _)| *name);
222
223            for (name, meta) in sorted_props {
224                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
225            }
226        }
227
228        // Add overflow_json column for non-schema properties (JSONB binary format)
229        fields.push(Field::new(
230            "overflow_json",
231            arrow_schema::DataType::LargeBinary,
232            true,
233        ));
234
235        Ok(Arc::new(ArrowSchema::new(fields)))
236    }
237
238    // ========================================================================
239    // LanceDB-based Methods
240    // ========================================================================
241
242    /// Open a vertex table using LanceDB.
243    ///
244    /// This is the preferred method for new code as it enables DataFusion queries.
245    pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
246        store.open_vertex_table(&self.label).await
247    }
248
249    /// Open or create a vertex table using LanceDB.
250    pub async fn open_or_create_lancedb(
251        &self,
252        store: &LanceDbStore,
253        schema: &Schema,
254    ) -> Result<Table> {
255        let arrow_schema = self.get_arrow_schema(schema)?;
256        store
257            .open_or_create_vertex_table(&self.label, arrow_schema)
258            .await
259    }
260
261    /// Write a batch to a LanceDB vertex table.
262    ///
263    /// Creates the table if it doesn't exist, otherwise appends to it.
264    pub async fn write_batch_lancedb(
265        &self,
266        store: &LanceDbStore,
267        batch: RecordBatch,
268        _schema: &Schema,
269    ) -> Result<Table> {
270        let table_name = LanceDbStore::vertex_table_name(&self.label);
271
272        if store.table_exists(&table_name).await? {
273            let table = store.open_table(&table_name).await?;
274            store.append_to_table(&table, vec![batch]).await?;
275            Ok(table)
276        } else {
277            store.create_table(&table_name, vec![batch]).await
278        }
279    }
280
281    /// Ensure default scalar indexes exist on system columns (_vid, _uid, ext_id) using LanceDB.
282    ///
283    /// LanceDB uses BTree indexes for scalar columns.
284    pub async fn ensure_default_indexes_lancedb(&self, table: &Table) -> Result<()> {
285        let indices = table
286            .list_indices()
287            .await
288            .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
289
290        let has_index = |col: &str| {
291            indices
292                .iter()
293                .any(|idx| idx.columns.contains(&col.to_string()))
294        };
295
296        for column in &["_vid", "_uid", "ext_id"] {
297            if has_index(column) {
298                continue;
299            }
300            log::info!(
301                "Creating {} BTree index for label '{}' via LanceDB",
302                column,
303                self.label
304            );
305            if let Err(e) = table
306                .create_index(&[column], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
307                .execute()
308                .await
309            {
310                log::warn!(
311                    "Failed to create {} index for '{}' via LanceDB: {}",
312                    column,
313                    self.label,
314                    e
315                );
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Get the LanceDB table name for this vertex dataset.
323    pub fn lancedb_table_name(&self) -> String {
324        LanceDbStore::vertex_table_name(&self.label)
325    }
326
327    /// Replace a vertex table's contents using LanceDB atomically.
328    ///
329    /// This uses a staging table for crash-safe replacement. Used by compaction
330    /// to rewrite the table with merged data.
331    pub async fn replace_lancedb(
332        &self,
333        store: &LanceDbStore,
334        batch: RecordBatch,
335        schema: &Schema,
336    ) -> Result<Table> {
337        let table_name = self.lancedb_table_name();
338        let arrow_schema = self.get_arrow_schema(schema)?;
339        store
340            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
341            .await
342    }
343}