Skip to main content

uni_store/storage/
delta.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! LSM-style delta dataset for accumulating edge mutations before compaction.
5
6use crate::lancedb::LanceDbStore;
7use crate::storage::arrow_convert::build_timestamp_column;
8use crate::storage::property_builder::PropertyColumnBuilder;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::types::TimestampNanosecondType;
12use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
13use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
14use futures::TryStreamExt;
15use lance::dataset::Dataset;
16use lancedb::Table;
17use lancedb::index::Index as LanceDbIndex;
18use lancedb::index::scalar::BTreeIndexBuilder;
19use std::collections::HashMap;
20use std::sync::Arc;
21use tracing::info;
22use uni_common::DataType;
23use uni_common::Properties;
24use uni_common::core::id::{Eid, Vid};
25use uni_common::core::schema::Schema;
26
27/// Default maximum number of rows allowed in in-memory compaction operations.
28/// Set to 5 million rows to prevent OOM. For larger datasets, use chunked compaction.
29pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
30
31/// Estimated memory footprint per L1Entry in bytes (conservative estimate).
32/// Each entry has: src_vid (8), dst_vid (8), eid (8), op (1), version (8),
33/// properties (variable, ~64 avg), timestamps (16), overhead (~32) = ~145 bytes.
34pub const ENTRY_SIZE_ESTIMATE: usize = 145;
35
36/// Check whether loading `row_count` rows into memory would exceed `max_rows`.
37///
38/// Returns an error with a human-readable message including the estimated memory
39/// footprint. Used by both Lance and LanceDB scan paths.
40pub fn check_oom_guard(
41    row_count: usize,
42    max_rows: usize,
43    entity_name: &str,
44    qualifier: &str,
45) -> Result<()> {
46    if row_count > max_rows {
47        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
48        return Err(anyhow!(
49            "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
50            Use chunked compaction or increase the limit. See issue #143.",
51            entity_name,
52            qualifier,
53            row_count,
54            estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
55            max_rows
56        ));
57    }
58    Ok(())
59}
60
61/// Operation type stored in the delta (L1) log.
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
63pub enum Op {
64    /// Edge was inserted.
65    Insert = 0,
66    /// Edge was soft-deleted.
67    Delete = 1,
68}
69
70/// A single entry in the L1 (sorted run) delta dataset.
71#[derive(Clone, Debug)]
72pub struct L1Entry {
73    pub src_vid: Vid,
74    pub dst_vid: Vid,
75    pub eid: Eid,
76    pub op: Op,
77    pub version: u64,
78    pub properties: Properties,
79    /// Timestamp when the edge was created (nanoseconds since epoch).
80    pub created_at: Option<i64>,
81    /// Timestamp when the edge was last updated (nanoseconds since epoch).
82    pub updated_at: Option<i64>,
83}
84
85/// LSM-style delta dataset for a single edge type and direction.
86///
87/// Stores L1 sorted runs that accumulate edge mutations before compaction
88/// merges them into the base CSR.
89#[derive(Debug)]
90pub struct DeltaDataset {
91    uri: String,
92    edge_type: String,
93    direction: String, // "fwd" or "bwd"
94}
95
96impl DeltaDataset {
97    /// Create a new `DeltaDataset` rooted at `base_uri` for the given edge type and direction.
98    pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
99        let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
100        Self {
101            uri,
102            edge_type: edge_type.to_string(),
103            direction: direction.to_string(),
104        }
105    }
106
107    /// Open the delta dataset at its latest version.
108    pub async fn open(&self) -> Result<Arc<Dataset>> {
109        self.open_at(None).await
110    }
111
112    /// Open the delta dataset, optionally pinned to a specific Lance version.
113    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
114        let mut ds = Dataset::open(&self.uri).await?;
115        if let Some(v) = version {
116            ds = ds.checkout_version(v).await?;
117        }
118        Ok(Arc::new(ds))
119    }
120
121    /// Build the Arrow schema for this delta table using the given graph schema.
122    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
123        let mut fields = vec![
124            Field::new("src_vid", arrow_schema::DataType::UInt64, false),
125            Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
126            Field::new("eid", arrow_schema::DataType::UInt64, false),
127            Field::new("op", arrow_schema::DataType::UInt8, false), // 0=INSERT, 1=DELETE
128            Field::new("_version", arrow_schema::DataType::UInt64, false),
129            // New timestamp columns per STORAGE_DESIGN.md
130            Field::new(
131                "_created_at",
132                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
133                true,
134            ),
135            Field::new(
136                "_updated_at",
137                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
138                true,
139            ),
140        ];
141
142        if let Some(type_props) = schema.properties.get(&self.edge_type) {
143            let mut sorted_props: Vec<_> = type_props.iter().collect();
144            sorted_props.sort_by_key(|(name, _)| *name);
145
146            for (name, meta) in sorted_props {
147                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
148            }
149        }
150
151        // Add overflow_json column for non-schema properties (JSONB binary format)
152        fields.push(Field::new(
153            "overflow_json",
154            arrow_schema::DataType::LargeBinary,
155            true,
156        ));
157
158        Ok(Arc::new(ArrowSchema::new(fields)))
159    }
160
161    /// Serialize `entries` into an Arrow `RecordBatch` using the given graph schema.
162    pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
163        let arrow_schema = self.get_arrow_schema(schema)?;
164
165        let mut src_vids = Vec::with_capacity(entries.len());
166        let mut dst_vids = Vec::with_capacity(entries.len());
167        let mut eids = Vec::with_capacity(entries.len());
168        let mut ops = Vec::with_capacity(entries.len());
169        let mut versions = Vec::with_capacity(entries.len());
170
171        for entry in entries {
172            src_vids.push(entry.src_vid.as_u64());
173            dst_vids.push(entry.dst_vid.as_u64());
174            eids.push(entry.eid.as_u64());
175            ops.push(entry.op as u8);
176            versions.push(entry.version);
177        }
178
179        let mut columns: Vec<ArrayRef> = vec![
180            Arc::new(UInt64Array::from(src_vids)),
181            Arc::new(UInt64Array::from(dst_vids)),
182            Arc::new(UInt64Array::from(eids)),
183            Arc::new(UInt8Array::from(ops)),
184            Arc::new(UInt64Array::from(versions)),
185        ];
186
187        // Build _created_at and _updated_at columns using shared builder
188        columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
189        columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
190
191        // Derive deleted flags from Op for property column building
192        // Tombstones (Op::Delete) are logically deleted and should use default values
193        let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
194
195        // Build property columns using shared builder
196        let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
197            .with_deleted(&deleted_flags)
198            .build(|i| &entries[i].properties)?;
199
200        columns.extend(prop_columns);
201
202        // Build overflow_json column for non-schema properties
203        let overflow_column = self.build_overflow_json_column(entries, schema)?;
204        columns.push(overflow_column);
205
206        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
207    }
208
209    /// Build the overflow_json column containing properties not in schema.
210    fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
211        crate::storage::property_builder::build_overflow_json_column(
212            entries.len(),
213            &self.edge_type,
214            schema,
215            |i| &entries[i].properties,
216            &[],
217        )
218    }
219
220    /// Scan and return all L1 entries, sorted by direction key and version.
221    pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
222        self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
223            .await
224    }
225
226    /// Scan all entries with a configurable row limit to prevent OOM.
227    pub async fn scan_all_with_limit(
228        &self,
229        schema: &Schema,
230        max_rows: usize,
231    ) -> Result<Vec<L1Entry>> {
232        let ds = match self.open().await {
233            Ok(ds) => ds,
234            Err(_) => return Ok(vec![]),
235        };
236
237        let row_count = ds.count_rows(None).await?;
238        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
239
240        info!(
241            edge_type = %self.edge_type,
242            direction = %self.direction,
243            row_count,
244            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
245            "Starting delta scan for compaction"
246        );
247
248        let mut stream = ds.scan().try_into_stream().await?;
249
250        let mut entries = Vec::new();
251
252        while let Some(batch) = stream.try_next().await? {
253            let mut batch_entries = self.parse_batch(&batch, schema)?;
254            entries.append(&mut batch_entries);
255        }
256
257        self.sort_entries(&mut entries);
258
259        Ok(entries)
260    }
261
262    /// Sort entries by direction key (src_vid for fwd, dst_vid for bwd) then by version.
263    fn sort_entries(&self, entries: &mut [L1Entry]) {
264        let is_fwd = self.direction == "fwd";
265        entries.sort_by(|a, b| {
266            let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
267            let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
268            key_a.cmp(&key_b).then(a.version.cmp(&b.version))
269        });
270    }
271
272    fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
273        let src_vids = batch
274            .column_by_name("src_vid")
275            .ok_or(anyhow!("Missing src_vid"))?
276            .as_any()
277            .downcast_ref::<UInt64Array>()
278            .ok_or(anyhow!("Invalid src_vid type"))?;
279        let dst_vids = batch
280            .column_by_name("dst_vid")
281            .ok_or(anyhow!("Missing dst_vid"))?
282            .as_any()
283            .downcast_ref::<UInt64Array>()
284            .ok_or(anyhow!("Invalid dst_vid type"))?;
285        let eids = batch
286            .column_by_name("eid")
287            .ok_or(anyhow!("Missing eid"))?
288            .as_any()
289            .downcast_ref::<UInt64Array>()
290            .ok_or(anyhow!("Invalid eid type"))?;
291        let ops = batch
292            .column_by_name("op")
293            .ok_or(anyhow!("Missing op"))?
294            .as_any()
295            .downcast_ref::<UInt8Array>()
296            .ok_or(anyhow!("Invalid op type"))?;
297        let versions = batch
298            .column_by_name("_version")
299            .ok_or(anyhow!("Missing _version"))?
300            .as_any()
301            .downcast_ref::<UInt64Array>()
302            .ok_or(anyhow!("Invalid _version type"))?;
303
304        // Try to read timestamp columns (may not exist in old data)
305        let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
306            c.as_any()
307                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
308        });
309        let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
310            c.as_any()
311                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
312        });
313
314        // Prepare property columns
315        let mut prop_cols = Vec::new();
316        if let Some(type_props) = schema.properties.get(&self.edge_type) {
317            for (name, meta) in type_props {
318                if let Some(col) = batch.column_by_name(name) {
319                    prop_cols.push((name, meta.r#type.clone(), col));
320                }
321            }
322        }
323
324        let mut entries = Vec::with_capacity(batch.num_rows());
325
326        for i in 0..batch.num_rows() {
327            let op = match ops.value(i) {
328                0 => Op::Insert,
329                1 => Op::Delete,
330                _ => continue, // Unknown op
331            };
332
333            let properties = self.extract_properties(&prop_cols, i)?;
334
335            // Read timestamps if present
336            let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
337                col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
338            };
339            let created_at = read_ts(created_at_col);
340            let updated_at = read_ts(updated_at_col);
341
342            entries.push(L1Entry {
343                src_vid: Vid::from(src_vids.value(i)),
344                dst_vid: Vid::from(dst_vids.value(i)),
345                eid: Eid::from(eids.value(i)),
346                op,
347                version: versions.value(i),
348                properties,
349                created_at,
350                updated_at,
351            });
352        }
353        Ok(entries)
354    }
355
356    /// Extract properties from columns for a single row.
357    fn extract_properties(
358        &self,
359        prop_cols: &[(&String, DataType, &ArrayRef)],
360        row: usize,
361    ) -> Result<Properties> {
362        let mut properties = Properties::new();
363        for (name, dtype, col) in prop_cols {
364            if col.is_null(row) {
365                continue;
366            }
367            let val = Self::value_from_column(col.as_ref(), dtype, row)?;
368            properties.insert(name.to_string(), uni_common::Value::from(val));
369        }
370        Ok(properties)
371    }
372
373    /// Decode an Arrow column value to JSON with lenient CRDT error handling.
374    fn value_from_column(
375        col: &dyn arrow_array::Array,
376        dtype: &uni_common::DataType,
377        row: usize,
378    ) -> Result<serde_json::Value> {
379        crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
380    }
381
382    /// Returns the filter column name based on direction ("src_vid" for fwd, "dst_vid" for bwd).
383    fn filter_column(&self) -> &'static str {
384        if self.direction == "fwd" {
385            "src_vid"
386        } else {
387            "dst_vid"
388        }
389    }
390
391    // ========================================================================
392    // LanceDB-based Methods
393    // ========================================================================
394
395    /// Open a delta table using LanceDB.
396    pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
397        store
398            .open_delta_table(&self.edge_type, &self.direction)
399            .await
400    }
401
402    /// Open or create a delta table using LanceDB.
403    pub async fn open_or_create_lancedb(
404        &self,
405        store: &LanceDbStore,
406        schema: &Schema,
407    ) -> Result<Table> {
408        let arrow_schema = self.get_arrow_schema(schema)?;
409        store
410            .open_or_create_delta_table(&self.edge_type, &self.direction, arrow_schema)
411            .await
412    }
413
414    /// Write a run to a LanceDB delta table.
415    ///
416    /// Creates the table if it doesn't exist, otherwise appends to it.
417    pub async fn write_run_lancedb(
418        &self,
419        store: &LanceDbStore,
420        batch: RecordBatch,
421    ) -> Result<Table> {
422        let table_name = LanceDbStore::delta_table_name(&self.edge_type, &self.direction);
423
424        if store.table_exists(&table_name).await? {
425            let table = store.open_table(&table_name).await?;
426            store.append_to_table(&table, vec![batch]).await?;
427            Ok(table)
428        } else {
429            store.create_table(&table_name, vec![batch]).await
430        }
431    }
432
433    /// Ensure a BTree index exists on the 'eid' column using LanceDB.
434    pub async fn ensure_eid_index_lancedb(&self, table: &Table) -> Result<()> {
435        let indices = table
436            .list_indices()
437            .await
438            .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
439
440        if !indices
441            .iter()
442            .any(|idx| idx.columns.contains(&"eid".to_string()))
443        {
444            log::info!(
445                "Creating eid BTree index for edge type '{}' via LanceDB",
446                self.edge_type
447            );
448            if let Err(e) = table
449                .create_index(&["eid"], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
450                .execute()
451                .await
452            {
453                log::warn!(
454                    "Failed to create eid index for '{}' via LanceDB: {}",
455                    self.edge_type,
456                    e
457                );
458            }
459        }
460
461        Ok(())
462    }
463
464    /// Get the LanceDB table name for this delta dataset.
465    pub fn lancedb_table_name(&self) -> String {
466        LanceDbStore::delta_table_name(&self.edge_type, &self.direction)
467    }
468
469    /// Scan all entries from LanceDB table.
470    ///
471    /// Returns an empty vector if the table doesn't exist.
472    pub async fn scan_all_lancedb(
473        &self,
474        store: &LanceDbStore,
475        schema: &Schema,
476    ) -> Result<Vec<L1Entry>> {
477        self.scan_all_lancedb_with_limit(store, schema, DEFAULT_MAX_COMPACTION_ROWS)
478            .await
479    }
480
481    /// Scan all entries from LanceDB table with a configurable row limit to prevent OOM.
482    pub async fn scan_all_lancedb_with_limit(
483        &self,
484        store: &LanceDbStore,
485        schema: &Schema,
486        max_rows: usize,
487    ) -> Result<Vec<L1Entry>> {
488        let table = match self.open_lancedb(store).await {
489            Ok(t) => t,
490            Err(_) => return Ok(vec![]),
491        };
492
493        let row_count = table.count_rows(None).await?;
494        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
495
496        info!(
497            edge_type = %self.edge_type,
498            direction = %self.direction,
499            row_count,
500            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
501            "Starting delta scan for compaction (LanceDB)"
502        );
503
504        use lancedb::query::ExecutableQuery;
505        let stream = table.query().execute().await?;
506        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
507
508        let mut entries = Vec::new();
509        for batch in batches {
510            let mut batch_entries = self.parse_batch(&batch, schema)?;
511            entries.append(&mut batch_entries);
512        }
513
514        self.sort_entries(&mut entries);
515
516        Ok(entries)
517    }
518
519    /// Replace the delta table with a new batch (atomic replacement).
520    ///
521    /// This is used during compaction to clear the delta table after merging into L2.
522    pub async fn replace_lancedb(&self, store: &LanceDbStore, batch: RecordBatch) -> Result<Table> {
523        let table_name = self.lancedb_table_name();
524        let arrow_schema = batch.schema();
525        store
526            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
527            .await
528    }
529
530    /// Read delta entries for a specific vertex ID from LanceDB.
531    ///
532    /// Returns an empty vector if the table doesn't exist or no entries match.
533    pub async fn read_deltas_lancedb(
534        &self,
535        store: &LanceDbStore,
536        vid: Vid,
537        schema: &Schema,
538        version_hwm: Option<u64>,
539    ) -> Result<Vec<L1Entry>> {
540        let table = match self.open_lancedb(store).await {
541            Ok(t) => t,
542            Err(_) => return Ok(vec![]),
543        };
544
545        use lancedb::query::{ExecutableQuery, QueryBase};
546
547        let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
548
549        // Add version filtering if snapshot is active
550        let final_filter = if let Some(hwm) = version_hwm {
551            format!("({}) AND (_version <= {})", base_filter, hwm)
552        } else {
553            base_filter
554        };
555
556        let query = table.query().only_if(final_filter);
557        let stream = query.execute().await?;
558        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
559
560        let mut entries = Vec::new();
561        for batch in batches {
562            let mut batch_entries = self.parse_batch(&batch, schema)?;
563            entries.append(&mut batch_entries);
564        }
565
566        Ok(entries)
567    }
568
569    /// Read delta entries for multiple vertex IDs in a single batch query from LanceDB.
570    ///
571    /// Returns a HashMap mapping each vid to its delta entries.
572    /// VIDs with no delta entries will not be in the map.
573    pub async fn read_deltas_lancedb_batch(
574        &self,
575        store: &LanceDbStore,
576        vids: &[Vid],
577        schema: &Schema,
578        version_hwm: Option<u64>,
579    ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
580        if vids.is_empty() {
581            return Ok(HashMap::new());
582        }
583
584        let table = match self.open_lancedb(store).await {
585            Ok(t) => t,
586            Err(_) => return Ok(HashMap::new()),
587        };
588
589        use lancedb::query::{ExecutableQuery, QueryBase};
590
591        // Build IN filter for batch query
592        let vid_list = vids
593            .iter()
594            .map(|v| v.as_u64().to_string())
595            .collect::<Vec<_>>()
596            .join(", ");
597        let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
598
599        // Add version filtering if snapshot is active
600        if let Some(hwm) = version_hwm {
601            filter = format!("({}) AND (_version <= {})", filter, hwm);
602        }
603
604        let query = table.query().only_if(filter);
605        let stream = query.execute().await?;
606        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
607
608        // Parse all batches and group by direction key VID
609        let is_fwd = self.direction == "fwd";
610        let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
611        for batch in batches {
612            let entries = self.parse_batch(&batch, schema)?;
613            for entry in entries {
614                let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
615                result.entry(vid).or_default().push(entry);
616            }
617        }
618
619        Ok(result)
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    #[expect(
629        clippy::assertions_on_constants,
630        reason = "Validating configuration constants intentionally"
631    )]
632    fn test_constants_are_reasonable() {
633        // Verify DEFAULT_MAX_COMPACTION_ROWS is set to 5 million
634        assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
635
636        // Verify ENTRY_SIZE_ESTIMATE is reasonable (should be between 100-300 bytes)
637        assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
638        assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
639
640        // Verify that 5M entries at the estimated size fits in reasonable memory
641        let estimated_gb =
642            (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
643        assert!(
644            estimated_gb < 1.0,
645            "5M entries should fit in under 1GB with current estimate"
646        );
647    }
648
649    #[test]
650    fn test_memory_estimate_formatting() {
651        // Test that our GB formatting works correctly
652        let row_count = 10_000_000;
653        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
654        let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
655
656        // Should be around 1.35 GB for 10M rows
657        assert!(
658            estimated_gb > 1.0 && estimated_gb < 2.0,
659            "10M rows should be 1-2 GB"
660        );
661    }
662}