Skip to main content

uni_store/storage/
delta.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! LSM-style delta dataset for accumulating edge mutations before compaction.
5
6use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::{ScalarIndexType, ScanRequest, WriteMode};
9use crate::storage::arrow_convert::build_timestamp_column;
10use crate::storage::property_builder::PropertyColumnBuilder;
11use crate::storage::value_codec::CrdtDecodeMode;
12use anyhow::{Result, anyhow};
13use arrow_array::types::TimestampNanosecondType;
14use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
15use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
16#[cfg(feature = "lance-backend")]
17use futures::TryStreamExt;
18#[cfg(feature = "lance-backend")]
19use lance::dataset::Dataset;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tracing::info;
23use uni_common::DataType;
24use uni_common::Properties;
25use uni_common::core::id::{Eid, Vid};
26use uni_common::core::schema::Schema;
27
28/// Default maximum number of rows allowed in in-memory compaction operations.
29/// Set to 5 million rows to prevent OOM. For larger datasets, use chunked compaction.
30pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
31
32/// Estimated memory footprint per L1Entry in bytes (conservative estimate).
33/// Each entry has: src_vid (8), dst_vid (8), eid (8), op (1), version (8),
34/// properties (variable, ~64 avg), timestamps (16), overhead (~32) = ~145 bytes.
35pub const ENTRY_SIZE_ESTIMATE: usize = 145;
36
37/// Check whether loading `row_count` rows into memory would exceed `max_rows`.
38///
39/// Returns an error with a human-readable message including the estimated memory
40/// footprint. Used by both Lance and LanceDB scan paths.
41pub fn check_oom_guard(
42    row_count: usize,
43    max_rows: usize,
44    entity_name: &str,
45    qualifier: &str,
46) -> Result<()> {
47    if row_count > max_rows {
48        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
49        return Err(anyhow!(
50            "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
51            Use chunked compaction or increase the limit. See issue #143.",
52            entity_name,
53            qualifier,
54            row_count,
55            estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
56            max_rows
57        ));
58    }
59    Ok(())
60}
61
62/// Operation type stored in the delta (L1) log.
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum Op {
65    /// Edge was inserted.
66    Insert = 0,
67    /// Edge was soft-deleted.
68    Delete = 1,
69}
70
71/// A single entry in the L1 (sorted run) delta dataset.
72#[derive(Clone, Debug)]
73pub struct L1Entry {
74    pub src_vid: Vid,
75    pub dst_vid: Vid,
76    pub eid: Eid,
77    pub op: Op,
78    pub version: u64,
79    pub properties: Properties,
80    /// Timestamp when the edge was created (nanoseconds since epoch).
81    pub created_at: Option<i64>,
82    /// Timestamp when the edge was last updated (nanoseconds since epoch).
83    pub updated_at: Option<i64>,
84}
85
86/// LSM-style delta dataset for a single edge type and direction.
87///
88/// Stores L1 sorted runs that accumulate edge mutations before compaction
89/// merges them into the base CSR.
90#[derive(Debug)]
91pub struct DeltaDataset {
92    #[cfg_attr(not(feature = "lance-backend"), allow(dead_code))]
93    uri: String,
94    edge_type: String,
95    direction: String, // "fwd" or "bwd"
96}
97
98impl DeltaDataset {
99    /// Create a new `DeltaDataset` rooted at `base_uri` for the given edge type and direction.
100    pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
101        let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
102        Self {
103            uri,
104            edge_type: edge_type.to_string(),
105            direction: direction.to_string(),
106        }
107    }
108
109    /// Open the delta dataset at its latest version.
110    #[cfg(feature = "lance-backend")]
111    pub async fn open(&self) -> Result<Arc<Dataset>> {
112        self.open_at(None).await
113    }
114
115    /// Open the delta dataset, optionally pinned to a specific Lance version.
116    #[cfg(feature = "lance-backend")]
117    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
118        let mut ds = Dataset::open(&self.uri).await?;
119        if let Some(v) = version {
120            ds = ds.checkout_version(v).await?;
121        }
122        Ok(Arc::new(ds))
123    }
124
125    /// Build the Arrow schema for this delta table using the given graph schema.
126    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
127        let mut fields = vec![
128            Field::new("src_vid", arrow_schema::DataType::UInt64, false),
129            Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
130            Field::new("eid", arrow_schema::DataType::UInt64, false),
131            Field::new("op", arrow_schema::DataType::UInt8, false), // 0=INSERT, 1=DELETE
132            Field::new("_version", arrow_schema::DataType::UInt64, false),
133            // New timestamp columns per STORAGE_DESIGN.md
134            Field::new(
135                "_created_at",
136                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
137                true,
138            ),
139            Field::new(
140                "_updated_at",
141                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
142                true,
143            ),
144        ];
145
146        if let Some(type_props) = schema.properties.get(&self.edge_type) {
147            let mut sorted_props: Vec<_> = type_props.iter().collect();
148            sorted_props.sort_by_key(|(name, _)| *name);
149
150            for (name, meta) in sorted_props {
151                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
152            }
153        }
154
155        // Add overflow_json column for non-schema properties (JSONB binary format)
156        fields.push(Field::new(
157            "overflow_json",
158            arrow_schema::DataType::LargeBinary,
159            true,
160        ));
161
162        Ok(Arc::new(ArrowSchema::new(fields)))
163    }
164
165    /// Serialize `entries` into an Arrow `RecordBatch` using the given graph schema.
166    pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
167        let arrow_schema = self.get_arrow_schema(schema)?;
168
169        let mut src_vids = Vec::with_capacity(entries.len());
170        let mut dst_vids = Vec::with_capacity(entries.len());
171        let mut eids = Vec::with_capacity(entries.len());
172        let mut ops = Vec::with_capacity(entries.len());
173        let mut versions = Vec::with_capacity(entries.len());
174
175        for entry in entries {
176            src_vids.push(entry.src_vid.as_u64());
177            dst_vids.push(entry.dst_vid.as_u64());
178            eids.push(entry.eid.as_u64());
179            ops.push(entry.op as u8);
180            versions.push(entry.version);
181        }
182
183        let mut columns: Vec<ArrayRef> = vec![
184            Arc::new(UInt64Array::from(src_vids)),
185            Arc::new(UInt64Array::from(dst_vids)),
186            Arc::new(UInt64Array::from(eids)),
187            Arc::new(UInt8Array::from(ops)),
188            Arc::new(UInt64Array::from(versions)),
189        ];
190
191        // Build _created_at and _updated_at columns using shared builder
192        columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
193        columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
194
195        // Derive deleted flags from Op for property column building
196        // Tombstones (Op::Delete) are logically deleted and should use default values
197        let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
198
199        // Build property columns using shared builder
200        let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
201            .with_deleted(&deleted_flags)
202            .build(|i| &entries[i].properties)?;
203
204        columns.extend(prop_columns);
205
206        // Build overflow_json column for non-schema properties
207        let overflow_column = self.build_overflow_json_column(entries, schema)?;
208        columns.push(overflow_column);
209
210        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
211    }
212
213    /// Build the overflow_json column containing properties not in schema.
214    fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
215        crate::storage::property_builder::build_overflow_json_column(
216            entries.len(),
217            &self.edge_type,
218            schema,
219            |i| &entries[i].properties,
220            &[],
221        )
222    }
223
224    /// Scan and return all L1 entries, sorted by direction key and version.
225    #[cfg(feature = "lance-backend")]
226    pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
227        self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
228            .await
229    }
230
231    /// Scan all entries with a configurable row limit to prevent OOM.
232    #[cfg(feature = "lance-backend")]
233    pub async fn scan_all_with_limit(
234        &self,
235        schema: &Schema,
236        max_rows: usize,
237    ) -> Result<Vec<L1Entry>> {
238        let ds = match self.open().await {
239            Ok(ds) => ds,
240            Err(_) => return Ok(vec![]),
241        };
242
243        let row_count = ds.count_rows(None).await?;
244        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
245
246        info!(
247            edge_type = %self.edge_type,
248            direction = %self.direction,
249            row_count,
250            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
251            "Starting delta scan for compaction"
252        );
253
254        let mut stream = ds.scan().try_into_stream().await?;
255
256        let mut entries = Vec::new();
257
258        while let Some(batch) = stream.try_next().await? {
259            let mut batch_entries = self.parse_batch(&batch, schema)?;
260            entries.append(&mut batch_entries);
261        }
262
263        self.sort_entries(&mut entries);
264
265        Ok(entries)
266    }
267
268    /// Sort entries by direction key (src_vid for fwd, dst_vid for bwd) then by version.
269    fn sort_entries(&self, entries: &mut [L1Entry]) {
270        let is_fwd = self.direction == "fwd";
271        entries.sort_by(|a, b| {
272            let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
273            let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
274            key_a.cmp(&key_b).then(a.version.cmp(&b.version))
275        });
276    }
277
278    fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
279        let src_vids = batch
280            .column_by_name("src_vid")
281            .ok_or(anyhow!("Missing src_vid"))?
282            .as_any()
283            .downcast_ref::<UInt64Array>()
284            .ok_or(anyhow!("Invalid src_vid type"))?;
285        let dst_vids = batch
286            .column_by_name("dst_vid")
287            .ok_or(anyhow!("Missing dst_vid"))?
288            .as_any()
289            .downcast_ref::<UInt64Array>()
290            .ok_or(anyhow!("Invalid dst_vid type"))?;
291        let eids = batch
292            .column_by_name("eid")
293            .ok_or(anyhow!("Missing eid"))?
294            .as_any()
295            .downcast_ref::<UInt64Array>()
296            .ok_or(anyhow!("Invalid eid type"))?;
297        let ops = batch
298            .column_by_name("op")
299            .ok_or(anyhow!("Missing op"))?
300            .as_any()
301            .downcast_ref::<UInt8Array>()
302            .ok_or(anyhow!("Invalid op type"))?;
303        let versions = batch
304            .column_by_name("_version")
305            .ok_or(anyhow!("Missing _version"))?
306            .as_any()
307            .downcast_ref::<UInt64Array>()
308            .ok_or(anyhow!("Invalid _version type"))?;
309
310        // Try to read timestamp columns (may not exist in old data)
311        let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
312            c.as_any()
313                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
314        });
315        let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
316            c.as_any()
317                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
318        });
319
320        // Prepare property columns
321        let mut prop_cols = Vec::new();
322        if let Some(type_props) = schema.properties.get(&self.edge_type) {
323            for (name, meta) in type_props {
324                if let Some(col) = batch.column_by_name(name) {
325                    prop_cols.push((name, meta.r#type.clone(), col));
326                }
327            }
328        }
329
330        let mut entries = Vec::with_capacity(batch.num_rows());
331
332        for i in 0..batch.num_rows() {
333            let op = match ops.value(i) {
334                0 => Op::Insert,
335                1 => Op::Delete,
336                _ => continue, // Unknown op
337            };
338
339            let properties = self.extract_properties(&prop_cols, i)?;
340
341            // Read timestamps if present
342            let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
343                col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
344            };
345            let created_at = read_ts(created_at_col);
346            let updated_at = read_ts(updated_at_col);
347
348            entries.push(L1Entry {
349                src_vid: Vid::from(src_vids.value(i)),
350                dst_vid: Vid::from(dst_vids.value(i)),
351                eid: Eid::from(eids.value(i)),
352                op,
353                version: versions.value(i),
354                properties,
355                created_at,
356                updated_at,
357            });
358        }
359        Ok(entries)
360    }
361
362    /// Extract properties from columns for a single row.
363    fn extract_properties(
364        &self,
365        prop_cols: &[(&String, DataType, &ArrayRef)],
366        row: usize,
367    ) -> Result<Properties> {
368        let mut properties = Properties::new();
369        for (name, dtype, col) in prop_cols {
370            if col.is_null(row) {
371                continue;
372            }
373            let val = Self::value_from_column(col.as_ref(), dtype, row)?;
374            properties.insert(name.to_string(), uni_common::Value::from(val));
375        }
376        Ok(properties)
377    }
378
379    /// Decode an Arrow column value to JSON with lenient CRDT error handling.
380    fn value_from_column(
381        col: &dyn arrow_array::Array,
382        dtype: &uni_common::DataType,
383        row: usize,
384    ) -> Result<serde_json::Value> {
385        crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
386    }
387
388    /// Returns the filter column name based on direction ("src_vid" for fwd, "dst_vid" for bwd).
389    fn filter_column(&self) -> &'static str {
390        if self.direction == "fwd" {
391            "src_vid"
392        } else {
393            "dst_vid"
394        }
395    }
396
397    // ========================================================================
398    // Backend-agnostic Methods
399    // ========================================================================
400
401    /// Open or create a delta table via the storage backend.
402    pub async fn open_or_create(
403        &self,
404        backend: &dyn StorageBackend,
405        schema: &Schema,
406    ) -> Result<()> {
407        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
408        let arrow_schema = self.get_arrow_schema(schema)?;
409        backend
410            .open_or_create_table(&table_name, arrow_schema)
411            .await
412    }
413
414    /// Write a run to a delta table.
415    ///
416    /// Creates the table if it doesn't exist, otherwise appends to it.
417    pub async fn write_run(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
418        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
419        if backend.table_exists(&table_name).await? {
420            backend
421                .write(&table_name, vec![batch], WriteMode::Append)
422                .await
423        } else {
424            backend.create_table(&table_name, vec![batch]).await
425        }
426    }
427
428    /// Ensure a BTree index exists on the 'eid' column.
429    pub async fn ensure_eid_index(&self, backend: &dyn StorageBackend) -> Result<()> {
430        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
431        let indices = backend.list_indexes(&table_name).await?;
432
433        if !indices
434            .iter()
435            .any(|idx| idx.columns.contains(&"eid".to_string()))
436        {
437            log::info!(
438                "Creating eid BTree index for edge type '{}'",
439                self.edge_type
440            );
441            if let Err(e) = backend
442                .create_scalar_index(&table_name, "eid", ScalarIndexType::BTree)
443                .await
444            {
445                log::warn!("Failed to create eid index for '{}': {}", self.edge_type, e);
446            }
447        }
448
449        Ok(())
450    }
451
452    /// Get the table name for this delta dataset.
453    pub fn table_name(&self) -> String {
454        table_names::delta_table_name(&self.edge_type, &self.direction)
455    }
456
457    /// Scan all entries from the backend table.
458    ///
459    /// Returns an empty vector if the table doesn't exist.
460    pub async fn scan_all_backend(
461        &self,
462        backend: &dyn StorageBackend,
463        schema: &Schema,
464    ) -> Result<Vec<L1Entry>> {
465        self.scan_all_backend_with_limit(backend, schema, DEFAULT_MAX_COMPACTION_ROWS)
466            .await
467    }
468
469    /// Scan all entries from the backend table with a configurable row limit to prevent OOM.
470    pub async fn scan_all_backend_with_limit(
471        &self,
472        backend: &dyn StorageBackend,
473        schema: &Schema,
474        max_rows: usize,
475    ) -> Result<Vec<L1Entry>> {
476        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
477
478        if !backend.table_exists(&table_name).await? {
479            return Ok(vec![]);
480        }
481
482        let row_count = backend.count_rows(&table_name, None).await?;
483        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
484
485        info!(
486            edge_type = %self.edge_type,
487            direction = %self.direction,
488            row_count,
489            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
490            "Starting delta scan for compaction (backend)"
491        );
492
493        let batches = backend.scan(ScanRequest::all(&table_name)).await?;
494
495        let mut entries = Vec::new();
496        for batch in batches {
497            let mut batch_entries = self.parse_batch(&batch, schema)?;
498            entries.append(&mut batch_entries);
499        }
500
501        self.sort_entries(&mut entries);
502
503        Ok(entries)
504    }
505
506    /// Replace the delta table with a new batch (atomic replacement).
507    ///
508    /// This is used during compaction to clear the delta table after merging into L2.
509    pub async fn replace(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
510        let table_name = self.table_name();
511        let arrow_schema = batch.schema();
512        backend
513            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
514            .await
515    }
516
517    /// Read delta entries for a specific vertex ID.
518    ///
519    /// Returns an empty vector if the table doesn't exist or no entries match.
520    pub async fn read_deltas(
521        &self,
522        backend: &dyn StorageBackend,
523        vid: Vid,
524        schema: &Schema,
525        version_hwm: Option<u64>,
526    ) -> Result<Vec<L1Entry>> {
527        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
528
529        if !backend.table_exists(&table_name).await? {
530            return Ok(vec![]);
531        }
532
533        let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
534
535        // Add version filtering if snapshot is active
536        let final_filter = if let Some(hwm) = version_hwm {
537            format!("({}) AND (_version <= {})", base_filter, hwm)
538        } else {
539            base_filter
540        };
541
542        let batches = backend
543            .scan(ScanRequest::all(&table_name).with_filter(final_filter))
544            .await?;
545
546        let mut entries = Vec::new();
547        for batch in batches {
548            let mut batch_entries = self.parse_batch(&batch, schema)?;
549            entries.append(&mut batch_entries);
550        }
551
552        Ok(entries)
553    }
554
555    /// Read delta entries for multiple vertex IDs in a single batch query.
556    ///
557    /// Returns a HashMap mapping each vid to its delta entries.
558    /// VIDs with no delta entries will not be in the map.
559    pub async fn read_deltas_batch(
560        &self,
561        backend: &dyn StorageBackend,
562        vids: &[Vid],
563        schema: &Schema,
564        version_hwm: Option<u64>,
565    ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
566        if vids.is_empty() {
567            return Ok(HashMap::new());
568        }
569
570        let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
571
572        if !backend.table_exists(&table_name).await? {
573            return Ok(HashMap::new());
574        }
575
576        // Build IN filter for batch query
577        let vid_list = vids
578            .iter()
579            .map(|v| v.as_u64().to_string())
580            .collect::<Vec<_>>()
581            .join(", ");
582        let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
583
584        // Add version filtering if snapshot is active
585        if let Some(hwm) = version_hwm {
586            filter = format!("({}) AND (_version <= {})", filter, hwm);
587        }
588
589        let batches = backend
590            .scan(ScanRequest::all(&table_name).with_filter(filter))
591            .await?;
592
593        // Parse all batches and group by direction key VID
594        let is_fwd = self.direction == "fwd";
595        let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
596        for batch in batches {
597            let entries = self.parse_batch(&batch, schema)?;
598            for entry in entries {
599                let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
600                result.entry(vid).or_default().push(entry);
601            }
602        }
603
604        Ok(result)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    #[test]
613    #[expect(
614        clippy::assertions_on_constants,
615        reason = "Validating configuration constants intentionally"
616    )]
617    fn test_constants_are_reasonable() {
618        // Verify DEFAULT_MAX_COMPACTION_ROWS is set to 5 million
619        assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
620
621        // Verify ENTRY_SIZE_ESTIMATE is reasonable (should be between 100-300 bytes)
622        assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
623        assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
624
625        // Verify that 5M entries at the estimated size fits in reasonable memory
626        let estimated_gb =
627            (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
628        assert!(
629            estimated_gb < 1.0,
630            "5M entries should fit in under 1GB with current estimate"
631        );
632    }
633
634    #[test]
635    fn test_memory_estimate_formatting() {
636        // Test that our GB formatting works correctly
637        let row_count = 10_000_000;
638        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
639        let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
640
641        // Should be around 1.35 GB for 10M rows
642        assert!(
643            estimated_gb > 1.0 && estimated_gb < 2.0,
644            "10M rows should be 1-2 GB"
645        );
646    }
647}