Skip to main content

uni_store/storage/
delta.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use crate::storage::value_codec::CrdtDecodeMode;
8use anyhow::{Result, anyhow};
9use arrow_array::types::TimestampNanosecondType;
10use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
11use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
12use futures::TryStreamExt;
13use lance::dataset::Dataset;
14use lancedb::Table;
15use lancedb::index::Index as LanceDbIndex;
16use lancedb::index::scalar::BTreeIndexBuilder;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::info;
20use uni_common::DataType;
21use uni_common::Properties;
22use uni_common::core::id::{Eid, Vid};
23use uni_common::core::schema::Schema;
24
25/// Default maximum number of rows allowed in in-memory compaction operations.
26/// Set to 5 million rows to prevent OOM. For larger datasets, use chunked compaction.
27pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
28
29/// Estimated memory footprint per L1Entry in bytes (conservative estimate).
30/// Each entry has: src_vid (8), dst_vid (8), eid (8), op (1), version (8),
31/// properties (variable, ~64 avg), timestamps (16), overhead (~32) = ~145 bytes.
32pub const ENTRY_SIZE_ESTIMATE: usize = 145;
33
34/// Check whether loading `row_count` rows into memory would exceed `max_rows`.
35///
36/// Returns an error with a human-readable message including the estimated memory
37/// footprint. Used by both Lance and LanceDB scan paths.
38pub fn check_oom_guard(
39    row_count: usize,
40    max_rows: usize,
41    entity_name: &str,
42    qualifier: &str,
43) -> Result<()> {
44    if row_count > max_rows {
45        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
46        return Err(anyhow!(
47            "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
48            Use chunked compaction or increase the limit. See issue #143.",
49            entity_name,
50            qualifier,
51            row_count,
52            estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
53            max_rows
54        ));
55    }
56    Ok(())
57}
58
59#[derive(Clone, Debug, PartialEq)]
60pub enum Op {
61    Insert = 0,
62    Delete = 1,
63}
64
65#[derive(Clone, Debug)]
66pub struct L1Entry {
67    pub src_vid: Vid,
68    pub dst_vid: Vid,
69    pub eid: Eid,
70    pub op: Op,
71    pub version: u64,
72    pub properties: Properties,
73    /// Timestamp when the edge was created (nanoseconds since epoch).
74    pub created_at: Option<i64>,
75    /// Timestamp when the edge was last updated (nanoseconds since epoch).
76    pub updated_at: Option<i64>,
77}
78
79pub struct DeltaDataset {
80    uri: String,
81    edge_type: String,
82    direction: String, // "fwd" or "bwd"
83}
84
85impl DeltaDataset {
86    pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
87        let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
88        Self {
89            uri,
90            edge_type: edge_type.to_string(),
91            direction: direction.to_string(),
92        }
93    }
94
95    pub async fn open(&self) -> Result<Arc<Dataset>> {
96        self.open_at(None).await
97    }
98
99    pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
100        let mut ds = Dataset::open(&self.uri).await?;
101        if let Some(v) = version {
102            ds = ds.checkout_version(v).await?;
103        }
104        Ok(Arc::new(ds))
105    }
106
107    pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
108        let mut fields = vec![
109            Field::new("src_vid", arrow_schema::DataType::UInt64, false),
110            Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
111            Field::new("eid", arrow_schema::DataType::UInt64, false),
112            Field::new("op", arrow_schema::DataType::UInt8, false), // 0=INSERT, 1=DELETE
113            Field::new("_version", arrow_schema::DataType::UInt64, false),
114            // New timestamp columns per STORAGE_DESIGN.md
115            Field::new(
116                "_created_at",
117                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
118                true,
119            ),
120            Field::new(
121                "_updated_at",
122                arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
123                true,
124            ),
125        ];
126
127        if let Some(type_props) = schema.properties.get(&self.edge_type) {
128            let mut sorted_props: Vec<_> = type_props.iter().collect();
129            sorted_props.sort_by_key(|(name, _)| *name);
130
131            for (name, meta) in sorted_props {
132                fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
133            }
134        }
135
136        // Add overflow_json column for non-schema properties (JSONB binary format)
137        fields.push(Field::new(
138            "overflow_json",
139            arrow_schema::DataType::LargeBinary,
140            true,
141        ));
142
143        Ok(Arc::new(ArrowSchema::new(fields)))
144    }
145
146    pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
147        let arrow_schema = self.get_arrow_schema(schema)?;
148
149        let mut src_vids = Vec::with_capacity(entries.len());
150        let mut dst_vids = Vec::with_capacity(entries.len());
151        let mut eids = Vec::with_capacity(entries.len());
152        let mut ops = Vec::with_capacity(entries.len());
153        let mut versions = Vec::with_capacity(entries.len());
154
155        for entry in entries {
156            src_vids.push(entry.src_vid.as_u64());
157            dst_vids.push(entry.dst_vid.as_u64());
158            eids.push(entry.eid.as_u64());
159            ops.push(entry.op.clone() as u8);
160            versions.push(entry.version);
161        }
162
163        let mut columns: Vec<ArrayRef> = vec![
164            Arc::new(UInt64Array::from(src_vids)),
165            Arc::new(UInt64Array::from(dst_vids)),
166            Arc::new(UInt64Array::from(eids)),
167            Arc::new(UInt8Array::from(ops)),
168            Arc::new(UInt64Array::from(versions)),
169        ];
170
171        // Build _created_at and _updated_at columns using shared builder
172        columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
173        columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
174
175        // Derive deleted flags from Op for property column building
176        // Tombstones (Op::Delete) are logically deleted and should use default values
177        let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
178
179        // Build property columns using shared builder
180        let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
181            .with_deleted(&deleted_flags)
182            .build(|i| &entries[i].properties)?;
183
184        columns.extend(prop_columns);
185
186        // Build overflow_json column for non-schema properties
187        let overflow_column = self.build_overflow_json_column(entries, schema)?;
188        columns.push(overflow_column);
189
190        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
191    }
192
193    /// Build the overflow_json column containing properties not in schema.
194    fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
195        crate::storage::property_builder::build_overflow_json_column(
196            entries.len(),
197            &self.edge_type,
198            schema,
199            |i| &entries[i].properties,
200            &[],
201        )
202    }
203
204    pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
205        self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
206            .await
207    }
208
209    /// Scan all entries with a configurable row limit to prevent OOM.
210    pub async fn scan_all_with_limit(
211        &self,
212        schema: &Schema,
213        max_rows: usize,
214    ) -> Result<Vec<L1Entry>> {
215        let ds = match self.open().await {
216            Ok(ds) => ds,
217            Err(_) => return Ok(vec![]),
218        };
219
220        let row_count = ds.count_rows(None).await?;
221        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
222
223        info!(
224            edge_type = %self.edge_type,
225            direction = %self.direction,
226            row_count,
227            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
228            "Starting delta scan for compaction"
229        );
230
231        let mut stream = ds.scan().try_into_stream().await?;
232
233        let mut entries = Vec::new();
234
235        while let Some(batch) = stream.try_next().await? {
236            let mut batch_entries = self.parse_batch(&batch, schema)?;
237            entries.append(&mut batch_entries);
238        }
239
240        self.sort_entries(&mut entries);
241
242        Ok(entries)
243    }
244
245    /// Sort entries by direction key (src_vid for fwd, dst_vid for bwd) then by version.
246    fn sort_entries(&self, entries: &mut [L1Entry]) {
247        let is_fwd = self.direction == "fwd";
248        entries.sort_by(|a, b| {
249            let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
250            let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
251            key_a.cmp(&key_b).then(a.version.cmp(&b.version))
252        });
253    }
254
255    fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
256        let src_vids = batch
257            .column_by_name("src_vid")
258            .ok_or(anyhow!("Missing src_vid"))?
259            .as_any()
260            .downcast_ref::<UInt64Array>()
261            .ok_or(anyhow!("Invalid src_vid type"))?;
262        let dst_vids = batch
263            .column_by_name("dst_vid")
264            .ok_or(anyhow!("Missing dst_vid"))?
265            .as_any()
266            .downcast_ref::<UInt64Array>()
267            .ok_or(anyhow!("Invalid dst_vid type"))?;
268        let eids = batch
269            .column_by_name("eid")
270            .ok_or(anyhow!("Missing eid"))?
271            .as_any()
272            .downcast_ref::<UInt64Array>()
273            .ok_or(anyhow!("Invalid eid type"))?;
274        let ops = batch
275            .column_by_name("op")
276            .ok_or(anyhow!("Missing op"))?
277            .as_any()
278            .downcast_ref::<UInt8Array>()
279            .ok_or(anyhow!("Invalid op type"))?;
280        let versions = batch
281            .column_by_name("_version")
282            .ok_or(anyhow!("Missing _version"))?
283            .as_any()
284            .downcast_ref::<UInt64Array>()
285            .ok_or(anyhow!("Invalid _version type"))?;
286
287        // Try to read timestamp columns (may not exist in old data)
288        let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
289            c.as_any()
290                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
291        });
292        let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
293            c.as_any()
294                .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
295        });
296
297        // Prepare property columns
298        let mut prop_cols = Vec::new();
299        if let Some(type_props) = schema.properties.get(&self.edge_type) {
300            for (name, meta) in type_props {
301                if let Some(col) = batch.column_by_name(name) {
302                    prop_cols.push((name, meta.r#type.clone(), col));
303                }
304            }
305        }
306
307        let mut entries = Vec::with_capacity(batch.num_rows());
308
309        for i in 0..batch.num_rows() {
310            let op = match ops.value(i) {
311                0 => Op::Insert,
312                1 => Op::Delete,
313                _ => continue, // Unknown op
314            };
315
316            let properties = self.extract_properties(&prop_cols, i)?;
317
318            // Read timestamps if present
319            let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
320                col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
321            };
322            let created_at = read_ts(created_at_col);
323            let updated_at = read_ts(updated_at_col);
324
325            entries.push(L1Entry {
326                src_vid: Vid::from(src_vids.value(i)),
327                dst_vid: Vid::from(dst_vids.value(i)),
328                eid: Eid::from(eids.value(i)),
329                op,
330                version: versions.value(i),
331                properties,
332                created_at,
333                updated_at,
334            });
335        }
336        Ok(entries)
337    }
338
339    /// Extract properties from columns for a single row.
340    fn extract_properties(
341        &self,
342        prop_cols: &[(&String, DataType, &ArrayRef)],
343        row: usize,
344    ) -> Result<Properties> {
345        let mut properties = Properties::new();
346        for (name, dtype, col) in prop_cols {
347            if col.is_null(row) {
348                continue;
349            }
350            let val = Self::value_from_column(col.as_ref(), dtype, row)?;
351            properties.insert(name.to_string(), uni_common::Value::from(val));
352        }
353        Ok(properties)
354    }
355
356    /// Decode an Arrow column value to JSON with lenient CRDT error handling.
357    fn value_from_column(
358        col: &dyn arrow_array::Array,
359        dtype: &uni_common::DataType,
360        row: usize,
361    ) -> Result<serde_json::Value> {
362        crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
363    }
364
365    /// Returns the filter column name based on direction ("src_vid" for fwd, "dst_vid" for bwd).
366    fn filter_column(&self) -> &'static str {
367        if self.direction == "fwd" {
368            "src_vid"
369        } else {
370            "dst_vid"
371        }
372    }
373
374    // ========================================================================
375    // LanceDB-based Methods
376    // ========================================================================
377
378    /// Open a delta table using LanceDB.
379    pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
380        store
381            .open_delta_table(&self.edge_type, &self.direction)
382            .await
383    }
384
385    /// Open or create a delta table using LanceDB.
386    pub async fn open_or_create_lancedb(
387        &self,
388        store: &LanceDbStore,
389        schema: &Schema,
390    ) -> Result<Table> {
391        let arrow_schema = self.get_arrow_schema(schema)?;
392        store
393            .open_or_create_delta_table(&self.edge_type, &self.direction, arrow_schema)
394            .await
395    }
396
397    /// Write a run to a LanceDB delta table.
398    ///
399    /// Creates the table if it doesn't exist, otherwise appends to it.
400    pub async fn write_run_lancedb(
401        &self,
402        store: &LanceDbStore,
403        batch: RecordBatch,
404    ) -> Result<Table> {
405        let table_name = LanceDbStore::delta_table_name(&self.edge_type, &self.direction);
406
407        if store.table_exists(&table_name).await? {
408            let table = store.open_table(&table_name).await?;
409            store.append_to_table(&table, vec![batch]).await?;
410            Ok(table)
411        } else {
412            store.create_table(&table_name, vec![batch]).await
413        }
414    }
415
416    /// Ensure a BTree index exists on the 'eid' column using LanceDB.
417    pub async fn ensure_eid_index_lancedb(&self, table: &Table) -> Result<()> {
418        let indices = table
419            .list_indices()
420            .await
421            .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
422
423        if !indices
424            .iter()
425            .any(|idx| idx.columns.contains(&"eid".to_string()))
426        {
427            log::info!(
428                "Creating eid BTree index for edge type '{}' via LanceDB",
429                self.edge_type
430            );
431            if let Err(e) = table
432                .create_index(&["eid"], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
433                .execute()
434                .await
435            {
436                log::warn!(
437                    "Failed to create eid index for '{}' via LanceDB: {}",
438                    self.edge_type,
439                    e
440                );
441            }
442        }
443
444        Ok(())
445    }
446
447    /// Get the LanceDB table name for this delta dataset.
448    pub fn lancedb_table_name(&self) -> String {
449        LanceDbStore::delta_table_name(&self.edge_type, &self.direction)
450    }
451
452    /// Scan all entries from LanceDB table.
453    ///
454    /// Returns an empty vector if the table doesn't exist.
455    pub async fn scan_all_lancedb(
456        &self,
457        store: &LanceDbStore,
458        schema: &Schema,
459    ) -> Result<Vec<L1Entry>> {
460        self.scan_all_lancedb_with_limit(store, schema, DEFAULT_MAX_COMPACTION_ROWS)
461            .await
462    }
463
464    /// Scan all entries from LanceDB table with a configurable row limit to prevent OOM.
465    pub async fn scan_all_lancedb_with_limit(
466        &self,
467        store: &LanceDbStore,
468        schema: &Schema,
469        max_rows: usize,
470    ) -> Result<Vec<L1Entry>> {
471        let table = match self.open_lancedb(store).await {
472            Ok(t) => t,
473            Err(_) => return Ok(vec![]),
474        };
475
476        let row_count = table.count_rows(None).await?;
477        check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
478
479        info!(
480            edge_type = %self.edge_type,
481            direction = %self.direction,
482            row_count,
483            estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
484            "Starting delta scan for compaction (LanceDB)"
485        );
486
487        use lancedb::query::ExecutableQuery;
488        let stream = table.query().execute().await?;
489        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
490
491        let mut entries = Vec::new();
492        for batch in batches {
493            let mut batch_entries = self.parse_batch(&batch, schema)?;
494            entries.append(&mut batch_entries);
495        }
496
497        self.sort_entries(&mut entries);
498
499        Ok(entries)
500    }
501
502    /// Replace the delta table with a new batch (atomic replacement).
503    ///
504    /// This is used during compaction to clear the delta table after merging into L2.
505    pub async fn replace_lancedb(&self, store: &LanceDbStore, batch: RecordBatch) -> Result<Table> {
506        let table_name = self.lancedb_table_name();
507        let arrow_schema = batch.schema();
508        store
509            .replace_table_atomic(&table_name, vec![batch], arrow_schema)
510            .await
511    }
512
513    /// Read delta entries for a specific vertex ID from LanceDB.
514    ///
515    /// Returns an empty vector if the table doesn't exist or no entries match.
516    pub async fn read_deltas_lancedb(
517        &self,
518        store: &LanceDbStore,
519        vid: Vid,
520        schema: &Schema,
521        version_hwm: Option<u64>,
522    ) -> Result<Vec<L1Entry>> {
523        let table = match self.open_lancedb(store).await {
524            Ok(t) => t,
525            Err(_) => return Ok(vec![]),
526        };
527
528        use lancedb::query::{ExecutableQuery, QueryBase};
529
530        let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
531
532        // Add version filtering if snapshot is active
533        let final_filter = if let Some(hwm) = version_hwm {
534            format!("({}) AND (_version <= {})", base_filter, hwm)
535        } else {
536            base_filter
537        };
538
539        let query = table.query().only_if(final_filter);
540        let stream = query.execute().await?;
541        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
542
543        let mut entries = Vec::new();
544        for batch in batches {
545            let mut batch_entries = self.parse_batch(&batch, schema)?;
546            entries.append(&mut batch_entries);
547        }
548
549        Ok(entries)
550    }
551
552    /// Read delta entries for multiple vertex IDs in a single batch query from LanceDB.
553    ///
554    /// Returns a HashMap mapping each vid to its delta entries.
555    /// VIDs with no delta entries will not be in the map.
556    pub async fn read_deltas_lancedb_batch(
557        &self,
558        store: &LanceDbStore,
559        vids: &[Vid],
560        schema: &Schema,
561        version_hwm: Option<u64>,
562    ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
563        if vids.is_empty() {
564            return Ok(HashMap::new());
565        }
566
567        let table = match self.open_lancedb(store).await {
568            Ok(t) => t,
569            Err(_) => return Ok(HashMap::new()),
570        };
571
572        use lancedb::query::{ExecutableQuery, QueryBase};
573
574        // Build IN filter for batch query
575        let vid_list = vids
576            .iter()
577            .map(|v| v.as_u64().to_string())
578            .collect::<Vec<_>>()
579            .join(", ");
580        let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
581
582        // Add version filtering if snapshot is active
583        if let Some(hwm) = version_hwm {
584            filter = format!("({}) AND (_version <= {})", filter, hwm);
585        }
586
587        let query = table.query().only_if(filter);
588        let stream = query.execute().await?;
589        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
590
591        // Parse all batches and group by direction key VID
592        let is_fwd = self.direction == "fwd";
593        let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
594        for batch in batches {
595            let entries = self.parse_batch(&batch, schema)?;
596            for entry in entries {
597                let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
598                result.entry(vid).or_default().push(entry);
599            }
600        }
601
602        Ok(result)
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609
610    #[test]
611    #[allow(clippy::assertions_on_constants)] // Validating configuration constants
612    fn test_constants_are_reasonable() {
613        // Verify DEFAULT_MAX_COMPACTION_ROWS is set to 5 million
614        assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
615
616        // Verify ENTRY_SIZE_ESTIMATE is reasonable (should be between 100-300 bytes)
617        assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
618        assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
619
620        // Verify that 5M entries at the estimated size fits in reasonable memory
621        let estimated_gb =
622            (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
623        assert!(
624            estimated_gb < 1.0,
625            "5M entries should fit in under 1GB with current estimate"
626        );
627    }
628
629    #[test]
630    fn test_memory_estimate_formatting() {
631        // Test that our GB formatting works correctly
632        let row_count = 10_000_000;
633        let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
634        let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
635
636        // Should be around 1.35 GB for 10M rows
637        assert!(
638            estimated_gb > 1.0 && estimated_gb < 2.0,
639            "10M rows should be 1-2 GB"
640        );
641    }
642}