uni-store 2.2.2

Storage layer for Uni graph database - Lance datasets, LSM deltas, and WAL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

use anyhow::{Result, anyhow};
use arrow_array::builder::{FixedSizeBinaryBuilder, StringBuilder};
use arrow_array::{Array, RecordBatch, RecordBatchIterator, StringArray, UInt64Array};
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use futures::TryStreamExt;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::DatasetIndexExt;
use std::collections::HashMap;
use std::sync::Arc;
use uni_common::core::id::{UniId, Vid};

/// Convert a UniId to hex string for filter pushdown.
fn uid_to_hex(uid: &UniId) -> String {
    uid.as_bytes()
        .iter()
        .map(|b| format!("{:02x}", b))
        .collect()
}

pub struct UidIndex {
    uri: String,
}

impl UidIndex {
    pub fn new(base_uri: &str, label: &str) -> Self {
        let uri = format!("{}/indexes/uni_id_to_vid/{}/index.lance", base_uri, label);
        Self { uri }
    }

    pub async fn open(&self) -> Result<Arc<Dataset>> {
        let ds = Dataset::open(&self.uri).await?;
        Ok(Arc::new(ds))
    }

    pub fn get_arrow_schema() -> Arc<ArrowSchema> {
        Arc::new(ArrowSchema::new(vec![
            Field::new("_uid", ArrowDataType::FixedSizeBinary(32), false),
            Field::new("_vid", ArrowDataType::UInt64, false),
            Field::new("_uid_hex", ArrowDataType::Utf8, false), // hex-encoded _uid for filtering
            // MVCC version of the mapping (review C3). A UID can have multiple
            // rows after delete+reinsert (a new vid at a higher version);
            // lookups take the highest `_version` so resolution is deterministic
            // instead of the old non-deterministic `limit(1)` / last-row-wins.
            Field::new("_version", ArrowDataType::UInt64, false),
        ]))
    }

    /// Append UID→vid mappings stamped with `version` (review C3).
    ///
    /// `version` should be the flush's MVCC version so that, across flushes, a
    /// re-created vertex (new vid, same content-derived UID) writes a row that
    /// deterministically outranks the stale mapping — [`Self::get_vid`] /
    /// [`Self::resolve_uids`] take the highest `_version`.
    ///
    /// Note on deletes: a vertex deleted without re-creation leaves its old
    /// mapping in the index (the flush delete path carries only `(vid, version)`,
    /// not the props/ext_id needed to recompute the UID, so it cannot write a
    /// UID-keyed tombstone). This is sound because the index is NOT the
    /// authoritative liveness gate — every consumer re-verifies the resolved vid
    /// against live storage (MERGE scans the vertex table with a tombstone-aware
    /// two-pass check; fork-promote re-validates via a Cypher liveness MATCH).
    pub async fn write_mapping_versioned(
        &self,
        mappings: &[(UniId, Vid)],
        version: u64,
    ) -> Result<()> {
        // Upgrade a pre-`_version` table in place so the append schema matches.
        self.migrate_schema_if_legacy().await?;

        let schema = Self::get_arrow_schema();

        let mut uid_builder = FixedSizeBinaryBuilder::new(32);
        let mut vids = Vec::with_capacity(mappings.len());
        let mut uid_hex_builder = StringBuilder::new();

        for (uid, vid) in mappings {
            uid_builder.append_value(uid.as_bytes()).unwrap();
            vids.push(vid.as_u64());
            uid_hex_builder.append_value(uid_to_hex(uid));
        }

        let uid_array = uid_builder.finish();
        let vid_array = UInt64Array::from(vids);
        let uid_hex_array = uid_hex_builder.finish();
        let version_array = UInt64Array::from(vec![version; mappings.len()]);

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(uid_array),
                Arc::new(vid_array),
                Arc::new(uid_hex_array),
                Arc::new(version_array),
            ],
        )?;

        let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema);

        let params = WriteParams {
            mode: WriteMode::Append,
            ..Default::default()
        };

        Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
        self.ensure_uid_hex_index().await?;
        Ok(())
    }

    /// Back-compat shim for ad-hoc single inserts that don't carry a flush
    /// version. Such inserts write one mapping per UID and so are not subject to
    /// the multi-row-per-UID determinism problem; version 0 is sufficient (and
    /// is outranked by any real flush write of the same UID).
    pub async fn write_mapping(&self, mappings: &[(UniId, Vid)]) -> Result<()> {
        self.write_mapping_versioned(mappings, 0).await
    }

    /// Upgrade a legacy (pre-`_version`) index table — the original
    /// `_uid`/`_vid`/`_uid_hex` schema — to include `_version`, defaulting
    /// existing rows to `0`. One-time and cheap (one small table per label).
    /// A no-op when the table is absent (a fresh write creates the new schema)
    /// or already migrated. The BTree `_uid_hex` index is recreated by the
    /// `ensure_uid_hex_index` call that follows the append in the write path.
    async fn migrate_schema_if_legacy(&self) -> Result<()> {
        let ds = match Dataset::open(&self.uri).await {
            Ok(ds) => ds,
            Err(_) => return Ok(()), // doesn't exist yet — created fresh with the new schema
        };
        if ds.schema().field("_version").is_some() {
            return Ok(()); // already migrated
        }

        // Read every legacy row and re-emit it under the new schema with
        // `_version = 0`, then overwrite the table.
        let mut stream = ds.scan().try_into_stream().await?;
        let new_schema = Self::get_arrow_schema();
        let mut migrated = Vec::new();
        while let Some(b) = stream.try_next().await? {
            let n = b.num_rows();
            let uid = b
                .column_by_name("_uid")
                .ok_or(anyhow!("legacy uid index missing _uid"))?
                .clone();
            let vid = b
                .column_by_name("_vid")
                .ok_or(anyhow!("legacy uid index missing _vid"))?
                .clone();
            let hex = b
                .column_by_name("_uid_hex")
                .ok_or(anyhow!("legacy uid index missing _uid_hex"))?
                .clone();
            let versions = Arc::new(UInt64Array::from(vec![0u64; n]));
            migrated.push(RecordBatch::try_new(
                new_schema.clone(),
                vec![uid, vid, hex, versions],
            )?);
        }

        let reader = RecordBatchIterator::new(migrated.into_iter().map(Ok), new_schema);
        let params = WriteParams {
            mode: WriteMode::Overwrite,
            ..Default::default()
        };
        Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
        Ok(())
    }

    /// Create a BTree scalar index on _uid_hex for O(log N) lookups.
    /// Non-fatal: if index creation fails, filter pushdown still works without the index.
    pub async fn ensure_uid_hex_index(&self) -> Result<()> {
        let mut ds = match Dataset::open(&self.uri).await {
            Ok(ds) => ds,
            Err(_) => return Ok(()), // Index doesn't exist yet, skip
        };

        // Create BTree index on _uid_hex column for faster lookups
        ds.create_index(
            &["_uid_hex"],
            lance_index::IndexType::Scalar,
            Some("idx_uid_hex".to_string()),
            &lance_index::scalar::ScalarIndexParams::default(),
            true, // replace if exists
        )
        .await
        .ok(); // Non-fatal: filter pushdown works without index

        Ok(())
    }

    pub async fn get_vid(&self, uid: &UniId) -> Result<Option<Vid>> {
        let ds = match self.open().await {
            Ok(ds) => ds,
            Err(_) => return Ok(None),
        };

        // Use filter pushdown on _uid_hex for O(log N) or better lookup
        let hex = uid_to_hex(uid);
        let filter = format!("_uid_hex = '{}'", hex);

        // Deterministic resolution (review C3): a UID may have multiple rows
        // after delete+reinsert. Take the highest `_version` instead of the old
        // non-deterministic `limit(1)`. `_version` is absent on a not-yet-
        // migrated legacy table, in which case every row is treated as version 0.
        let has_version = ds.schema().field("_version").is_some();
        let project: &[&str] = if has_version {
            &["_vid", "_version"]
        } else {
            &["_vid"]
        };
        let mut scanner = ds.scan();
        scanner.filter(&filter)?;
        scanner.project(project)?;
        let mut stream = scanner.try_into_stream().await?;

        let mut best_vid: Option<Vid> = None;
        let mut best_version: u64 = 0;

        while let Some(batch) = stream.try_next().await? {
            let vid_col = batch
                .column_by_name("_vid")
                .ok_or(anyhow!("Missing _vid column"))?
                .as_any()
                .downcast_ref::<UInt64Array>()
                .ok_or(anyhow!("Invalid _vid column type"))?;
            let ver_col = batch
                .column_by_name("_version")
                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());

            for i in 0..batch.num_rows() {
                let v = ver_col.map_or(0, |c| if c.is_null(i) { 0 } else { c.value(i) });
                if best_vid.is_none() || v >= best_version {
                    best_version = v;
                    best_vid = Some(Vid::from(vid_col.value(i)));
                }
            }
        }

        Ok(best_vid)
    }

    pub async fn resolve_uids(&self, uids: &[UniId]) -> Result<HashMap<UniId, Vid>> {
        if uids.is_empty() {
            return Ok(HashMap::new());
        }

        let ds = match self.open().await {
            Ok(ds) => ds,
            Err(_) => return Ok(HashMap::new()),
        };

        // Batch scan using IN filter on _uid_hex
        let hex_values: Vec<String> = uids.iter().map(uid_to_hex).collect();
        let filter = format!(
            "_uid_hex IN ({})",
            hex_values
                .iter()
                .map(|h| format!("'{}'", h))
                .collect::<Vec<_>>()
                .join(", ")
        );

        // Deterministic resolution (review C3): keep the highest-`_version` vid
        // per UID rather than the old last-row-wins. `_version` is absent on a
        // not-yet-migrated legacy table (every row treated as version 0).
        let has_version = ds.schema().field("_version").is_some();
        let project: &[&str] = if has_version {
            &["_uid_hex", "_vid", "_version"]
        } else {
            &["_uid_hex", "_vid"]
        };
        let mut scanner = ds.scan();
        scanner.filter(&filter)?;
        scanner.project(project)?;
        let mut stream = scanner.try_into_stream().await?;

        // Build reverse map: hex -> UniId for fast lookup
        let hex_to_uid: HashMap<String, UniId> =
            uids.iter().map(|uid| (uid_to_hex(uid), *uid)).collect();

        // Per-UID best (version, vid).
        let mut best: HashMap<UniId, (u64, Vid)> = HashMap::new();

        while let Some(batch) = stream.try_next().await? {
            let uid_hex_col = batch
                .column_by_name("_uid_hex")
                .ok_or(anyhow!("Missing _uid_hex column"))?
                .as_any()
                .downcast_ref::<StringArray>()
                .ok_or(anyhow!("Invalid _uid_hex column type"))?;

            let vid_col = batch
                .column_by_name("_vid")
                .ok_or(anyhow!("Missing _vid column"))?
                .as_any()
                .downcast_ref::<UInt64Array>()
                .ok_or(anyhow!("Invalid _vid column type"))?;
            let ver_col = batch
                .column_by_name("_version")
                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());

            for i in 0..batch.num_rows() {
                if !uid_hex_col.is_null(i) {
                    let hex = uid_hex_col.value(i);
                    if let Some(&uid) = hex_to_uid.get(hex) {
                        let v = ver_col.map_or(0, |c| if c.is_null(i) { 0 } else { c.value(i) });
                        let vid = Vid::from(vid_col.value(i));
                        match best.get(&uid) {
                            Some(&(bv, _)) if bv >= v => {}
                            _ => {
                                best.insert(uid, (v, vid));
                            }
                        }
                    }
                }
            }
        }

        let result = best.into_iter().map(|(uid, (_, vid))| (uid, vid)).collect();
        Ok(result)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    fn test_uid(counter: u8) -> UniId {
        let mut bytes = [0u8; 32];
        bytes[0] = counter;
        UniId::from_bytes(bytes)
    }

    /// Deterministic resolution after delete+reinsert: the highest-`_version`
    /// mapping wins regardless of write order (review C3). The old `limit(1)` /
    /// last-row-wins could return the stale vid.
    #[tokio::test]
    async fn test_get_vid_picks_highest_version() {
        let dir = TempDir::new().unwrap();
        let index = UidIndex::new(dir.path().to_str().unwrap(), "Person");

        // Original mapping (low version), then a re-create at a higher version.
        let uid = test_uid(1);
        index
            .write_mapping_versioned(&[(uid, Vid::new(100))], 5)
            .await
            .unwrap();
        index
            .write_mapping_versioned(&[(uid, Vid::new(200))], 9)
            .await
            .unwrap();
        assert_eq!(index.get_vid(&uid).await.unwrap(), Some(Vid::new(200)));

        // Write-order independence: higher version written first, lower second.
        let uid2 = test_uid(2);
        index
            .write_mapping_versioned(&[(uid2, Vid::new(11))], 20)
            .await
            .unwrap();
        index
            .write_mapping_versioned(&[(uid2, Vid::new(10))], 12)
            .await
            .unwrap();
        assert_eq!(index.get_vid(&uid2).await.unwrap(), Some(Vid::new(11)));

        // resolve_uids agrees with get_vid.
        let map = index.resolve_uids(&[uid, uid2]).await.unwrap();
        assert_eq!(map.get(&uid), Some(&Vid::new(200)));
        assert_eq!(map.get(&uid2), Some(&Vid::new(11)));
    }

    /// A pre-`_version` (3-column) index table is read correctly as-is and is
    /// migrated in place on the next write, after which the higher-version
    /// re-create wins (review C3).
    #[tokio::test]
    async fn test_legacy_table_migrates_on_write() {
        let dir = TempDir::new().unwrap();
        let index = UidIndex::new(dir.path().to_str().unwrap(), "Person");

        // Hand-build the OLD 3-column schema and write a legacy row.
        let legacy_schema = Arc::new(ArrowSchema::new(vec![
            Field::new("_uid", ArrowDataType::FixedSizeBinary(32), false),
            Field::new("_vid", ArrowDataType::UInt64, false),
            Field::new("_uid_hex", ArrowDataType::Utf8, false),
        ]));
        let uid = test_uid(7);
        let mut uid_b = FixedSizeBinaryBuilder::new(32);
        uid_b.append_value(uid.as_bytes()).unwrap();
        let mut hex_b = StringBuilder::new();
        hex_b.append_value(uid_to_hex(&uid));
        let legacy_batch = RecordBatch::try_new(
            legacy_schema.clone(),
            vec![
                Arc::new(uid_b.finish()),
                Arc::new(UInt64Array::from(vec![100u64])),
                Arc::new(hex_b.finish()),
            ],
        )
        .unwrap();
        let reader = RecordBatchIterator::new(std::iter::once(Ok(legacy_batch)), legacy_schema);
        Dataset::write(
            Box::new(reader),
            &index.uri,
            Some(WriteParams {
                mode: WriteMode::Append,
                ..Default::default()
            }),
        )
        .await
        .unwrap();

        // Reads against the legacy (no `_version`) table still work (treated as
        // version 0).
        assert_eq!(index.get_vid(&uid).await.unwrap(), Some(Vid::new(100)));

        // A re-create at a higher version triggers migration and then wins.
        index
            .write_mapping_versioned(&[(uid, Vid::new(200))], 9)
            .await
            .unwrap();
        let ds = Dataset::open(&index.uri).await.unwrap();
        assert!(
            ds.schema().field("_version").is_some(),
            "table should have been migrated to include _version"
        );
        assert_eq!(index.get_vid(&uid).await.unwrap(), Some(Vid::new(200)));
    }
}