Skip to main content

uni_store/storage/
compaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::storage::delta::{ENTRY_SIZE_ESTIMATE, L1Entry, Op};
5use crate::storage::manager::StorageManager;
6use anyhow::{Result, anyhow};
7use arrow_array::Array;
8use arrow_array::builder::{ArrayBuilder, ListBuilder, UInt64Builder};
9use arrow_array::{ListArray, RecordBatch, UInt64Array};
10use futures::TryStreamExt;
11use metrics;
12use std::collections::{HashMap, HashSet};
13use std::sync::Arc;
14use tracing::{error, info, instrument};
15use uni_common::core::id::{Eid, Vid};
16use uni_common::core::schema::DataType;
17use uni_common::{Properties, Value};
18use uni_crdt::Crdt;
19
20pub struct Compactor {
21    storage: Arc<StorageManager>,
22}
23
24impl Compactor {
25    pub fn new(storage: Arc<StorageManager>) -> Self {
26        Self { storage }
27    }
28
29    #[instrument(skip(self), level = "info")]
30    pub async fn compact_all(&self) -> Result<Vec<CompactionInfo>> {
31        let start = std::time::Instant::now();
32        let schema = self.storage.schema_manager().schema();
33        let mut compaction_results = Vec::new();
34
35        // Compact Vertices
36        for label in schema.labels.keys() {
37            info!("Compacting vertices for label {}", label);
38            if let Err(e) = self.compact_vertices(label).await {
39                error!("Failed to compact vertices for {}: {}", label, e);
40            }
41        }
42
43        // Compact Edges
44        for (edge_type, meta) in &schema.edge_types {
45            // Outgoing: src_labels
46            for label in &meta.src_labels {
47                info!("Compacting adjacency {} -> {} (fwd)", label, edge_type);
48                match self.compact_adjacency(edge_type, label, "fwd").await {
49                    Ok(info) => compaction_results.push(info),
50                    Err(e) => {
51                        error!(
52                            "Failed to compact adjacency {} -> {}: {}",
53                            label, edge_type, e
54                        );
55                    }
56                }
57            }
58
59            // Incoming: dst_labels
60            for label in &meta.dst_labels {
61                info!("Compacting adjacency {} <- {} (bwd)", label, edge_type);
62                match self.compact_adjacency(edge_type, label, "bwd").await {
63                    Ok(info) => compaction_results.push(info),
64                    Err(e) => {
65                        error!(
66                            "Failed to compact adjacency {} <- {}: {}",
67                            label, edge_type, e
68                        );
69                    }
70                }
71            }
72        }
73
74        metrics::counter!("uni_compaction_runs_total").increment(1);
75        metrics::histogram!("uni_compaction_duration_seconds")
76            .record(start.elapsed().as_secs_f64());
77
78        Ok(compaction_results)
79    }
80
81    #[instrument(skip(self), fields(rows_processed, duration_ms), level = "info")]
82    pub async fn compact_vertices(&self, label: &str) -> Result<()> {
83        let start = std::time::Instant::now();
84        let schema_manager = self.storage.schema_manager();
85        let schema = schema_manager.schema();
86
87        let label_props = schema
88            .properties
89            .get(label)
90            .ok_or_else(|| anyhow!("Label not found"))?;
91
92        // Identify CRDT properties
93        let crdt_props: HashSet<String> = label_props
94            .iter()
95            .filter(|(_, meta)| matches!(meta.r#type, DataType::Crdt(_)))
96            .map(|(name, _)| name.clone())
97            .collect();
98
99        let dataset = self.storage.vertex_dataset(label)?;
100        let lancedb_store = self.storage.lancedb_store();
101
102        // Try to open from LanceDB first (canonical storage)
103        let table = match dataset.open_lancedb(lancedb_store).await {
104            Ok(t) => t,
105            Err(_) => {
106                // Table doesn't exist - nothing to compact
107                info!("No vertex data to compact for label '{}'", label);
108                return Ok(());
109            }
110        };
111
112        // In-memory compaction for now (MVP).
113        // For large datasets, this needs to be streaming/chunked with external sort.
114        // Current approach: Read ALL, merge in map, write NEW.
115        // TODO(perf): This accumulates ALL vertices in memory, causing OOM for large
116        // labels (millions of vertices). Refactor to use streaming merge-sort with
117        // constant memory usage (e.g., external sort or Lance fragment-by-fragment merge).
118
119        let row_count = table.count_rows(None).await?;
120        crate::storage::delta::check_oom_guard(
121            row_count,
122            self.storage.config.max_compaction_rows,
123            label,
124            "vertices",
125        )?;
126
127        info!(
128            label = %label,
129            row_count,
130            estimated_bytes = row_count * 200,
131            "Starting vertex compaction"
132        );
133
134        use lancedb::query::ExecutableQuery;
135        let stream = table.query().execute().await?;
136        let batches: Vec<RecordBatch> = stream.try_collect().await?;
137
138        // Vid -> (Properties, Deleted)
139        let mut vertex_state: HashMap<Vid, (Properties, bool)> = HashMap::new();
140        let mut vertex_versions: HashMap<Vid, u64> = HashMap::new();
141        let mut vertex_labels: HashMap<Vid, Vec<String>> = HashMap::new();
142
143        let mut rows_processed = 0;
144
145        for batch in batches {
146            rows_processed += batch.num_rows();
147            let vid_col = batch
148                .column_by_name("_vid")
149                .unwrap()
150                .as_any()
151                .downcast_ref::<UInt64Array>()
152                .unwrap();
153            let ver_col = batch
154                .column_by_name("_version")
155                .unwrap()
156                .as_any()
157                .downcast_ref::<UInt64Array>()
158                .unwrap();
159            let del_col = batch
160                .column_by_name("_deleted")
161                .unwrap()
162                .as_any()
163                .downcast_ref::<arrow_array::BooleanArray>()
164                .unwrap();
165
166            // Read _labels column (List<Utf8>) if present
167            let labels_col = batch
168                .column_by_name("_labels")
169                .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
170
171            for i in 0..batch.num_rows() {
172                let vid = Vid::from(vid_col.value(i));
173                let version = ver_col.value(i);
174                let deleted = del_col.value(i);
175
176                // Extract labels from the _labels column (keep latest version's labels)
177                if let Some(list_arr) = labels_col
178                    && version >= *vertex_versions.entry(vid).or_insert(0)
179                {
180                    let labels = crate::storage::arrow_convert::labels_from_list_array(list_arr, i);
181                    if !labels.is_empty() {
182                        vertex_labels.insert(vid, labels);
183                    }
184                }
185
186                let current_entry = vertex_state
187                    .entry(vid)
188                    .or_insert((Properties::new(), false));
189                let current_version = vertex_versions.entry(vid).or_insert(0);
190
191                // If this row is newer than what we've seen (or same), we apply logic.
192                // Wait, if we process unordered, we need to be careful.
193                // For CRDTs, we MERGE regardless of version (commutative).
194                // For LWW, we take MAX version.
195
196                // If it's a deletion, and it's newer, it wins.
197                if deleted {
198                    if version >= *current_version {
199                        current_entry.1 = true;
200                        current_entry.0.clear(); // Clear properties on delete
201                        *current_version = version;
202                    }
203                    continue;
204                }
205
206                // It's an update/insert
207                // Extract props and track NULLs (property removals)
208                let mut row_props = Properties::new();
209                let mut null_props = Vec::new(); // Track explicitly NULL properties
210                for (name, meta) in label_props {
211                    if let Some(col) = batch.column_by_name(name) {
212                        if col.is_null(i) {
213                            // Property was explicitly removed (set to NULL)
214                            null_props.push(name.clone());
215                        } else {
216                            // TODO: Refactor value_from_column to be shared
217                            // For now, assuming we can get it.
218                            // We need to move `value_from_column` to uni-common or a shared util?
219                            // Or duplication.
220                            // Duplication for now to avoid large refactor.
221                            let val = crate::runtime::property_manager::PropertyManager::value_from_column(col.as_ref(), &meta.r#type, i)?;
222                            row_props.insert(name.clone(), val);
223                        }
224                    }
225                }
226
227                Self::merge_row_into_state(
228                    row_props,
229                    null_props,
230                    version,
231                    current_entry,
232                    current_version,
233                    &crdt_props,
234                )?;
235            }
236        }
237
238        // Convert state to RecordBatch and write OVERWRITE
239        let mut valid_vertices = Vec::new();
240        let mut valid_versions = Vec::new();
241        let mut valid_deleted = Vec::new(); // Should be all false if we filter out tombstones?
242        // Or we keep tombstones if they are recent?
243        // Compaction usually removes tombstones.
244
245        for (vid, (props, deleted)) in vertex_state {
246            if !deleted {
247                let labels = vertex_labels.remove(&vid).unwrap_or_default();
248                valid_vertices.push((vid, labels, props));
249                valid_versions.push(vertex_versions[&vid]);
250                valid_deleted.push(false);
251            }
252        }
253
254        if !valid_vertices.is_empty() {
255            let batch = dataset.build_record_batch(
256                &valid_vertices,
257                &valid_deleted,
258                &valid_versions,
259                &schema,
260            )?;
261            let lancedb_store = self.storage.lancedb_store();
262            dataset
263                .replace_lancedb(lancedb_store, batch, &schema)
264                .await?;
265        }
266
267        let duration = start.elapsed();
268        let rows_reclaimed = rows_processed as u64 - valid_vertices.len() as u64;
269        metrics::counter!("uni_compaction_rows_reclaimed_total", "type" => "vertex")
270            .increment(rows_reclaimed);
271
272        tracing::Span::current().record("rows_processed", rows_processed);
273        tracing::Span::current().record("duration_ms", duration.as_millis());
274        info!(
275            rows = rows_processed,
276            duration_ms = duration.as_millis(),
277            "Vertex compaction completed"
278        );
279
280        metrics::histogram!("uni_compaction_duration_seconds", "type" => "vertex")
281            .record(duration.as_secs_f64());
282
283        Ok(())
284    }
285
286    fn merge_crdt_values(a: &Value, b: &Value) -> Result<Value> {
287        if a.is_null() {
288            return Ok(b.clone());
289        }
290        if b.is_null() {
291            return Ok(a.clone());
292        }
293        let mut crdt_a: Crdt = serde_json::from_value(a.clone().into())?;
294        let crdt_b: Crdt = serde_json::from_value(b.clone().into())?;
295        crdt_a
296            .try_merge(&crdt_b)
297            .map_err(|e| anyhow::anyhow!("{e}"))?;
298        Ok(Value::from(serde_json::to_value(crdt_a)?))
299    }
300
301    /// Merge row properties into state based on version comparison.
302    fn merge_row_into_state(
303        row_props: Properties,
304        null_props: Vec<String>,
305        version: u64,
306        current_entry: &mut (Properties, bool),
307        current_version: &mut u64,
308        crdt_props: &HashSet<String>,
309    ) -> Result<()> {
310        if version > *current_version {
311            // New version wins for LWW, merge for CRDTs
312            *current_version = version;
313            current_entry.1 = false;
314
315            for (k, v) in row_props {
316                if crdt_props.contains(&k) {
317                    let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
318                    *existing = Self::merge_crdt_values(existing, &v)?;
319                } else {
320                    current_entry.0.insert(k, v);
321                }
322            }
323
324            // Remove properties explicitly set to NULL in the newer version
325            for null_prop in &null_props {
326                if !crdt_props.contains(null_prop) {
327                    current_entry.0.remove(null_prop);
328                }
329            }
330        } else if version == *current_version {
331            // Same version: merge all
332            current_entry.1 = false;
333            for (k, v) in row_props {
334                if crdt_props.contains(&k) {
335                    let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
336                    *existing = Self::merge_crdt_values(existing, &v)?;
337                } else {
338                    current_entry.0.insert(k, v);
339                }
340            }
341        } else {
342            // Older version: only merge CRDTs
343            if !current_entry.1 {
344                for (k, v) in row_props {
345                    if crdt_props.contains(&k) {
346                        let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
347                        *existing = Self::merge_crdt_values(existing, &v)?;
348                    }
349                }
350            }
351        }
352        Ok(())
353    }
354
355    #[instrument(skip(self), fields(delta_count, duration_ms), level = "info")]
356    pub async fn compact_adjacency(
357        &self,
358        edge_type: &str,
359        label: &str,
360        direction: &str,
361    ) -> Result<CompactionInfo> {
362        let start = std::time::Instant::now();
363        let schema = self.storage.schema_manager().schema();
364
365        // 1. Load all L1 Deltas sorted by key from LanceDB
366        let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
367        let lancedb_store = self.storage.lancedb_store();
368        let deltas = delta_ds.scan_all_lancedb(lancedb_store, &schema).await?;
369
370        let delta_count = deltas.len();
371        tracing::Span::current().record("delta_count", delta_count);
372
373        if deltas.is_empty() {
374            // Nothing to compact, return info anyway
375            return Ok(CompactionInfo {
376                edge_type: edge_type.to_string(),
377                direction: direction.to_string(),
378            });
379        }
380
381        // Group deltas by src_vid (if fwd) or dst_vid (if bwd)
382        // We'll use a HashMap for now since we loaded all into memory.
383        // Value is list of ops for that vertex.
384        let mut delta_map: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
385        for entry in &deltas {
386            let key = if direction == "fwd" {
387                entry.src_vid
388            } else {
389                entry.dst_vid
390            };
391            delta_map.entry(key).or_default().push(entry.clone());
392        }
393
394        // Sort each VID's ops by version to ensure correct ordering
395        // This guarantees Delete(v=2) beats Insert(v=1) regardless of scan order
396        for ops in delta_map.values_mut() {
397            ops.sort_by_key(|e| e.version);
398        }
399
400        // 2. Open L2 Adjacency stream
401        let adj_ds = self
402            .storage
403            .adjacency_dataset(edge_type, label, direction)?;
404
405        // We need to write a NEW version.
406        // Strategy:
407        // - Read L2 batch by batch.
408        // - For each row (vertex), check if we have deltas.
409        // - Apply deltas.
410        // - Write to new batch.
411        // - Track which vertices from deltas we've processed.
412        // - After L2 stream ends, process remaining "new" vertices from deltas.
413
414        // Output Builders
415        let mut src_vid_builder = UInt64Builder::new();
416        let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
417        let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
418
419        let mut processed_vids = HashSet::new();
420
421        // Try to read from LanceDB (canonical storage)
422        let lancedb_store = self.storage.lancedb_store();
423        if let Ok(table) = adj_ds.open_lancedb(lancedb_store).await {
424            let adj_row_count = table.count_rows(None).await?;
425            crate::storage::delta::check_oom_guard(
426                adj_row_count,
427                self.storage.config.max_compaction_rows,
428                &format!("{}_{}", edge_type, label),
429                direction,
430            )?;
431
432            info!(
433                edge_type = %edge_type,
434                label = %label,
435                direction = %direction,
436                adj_row_count,
437                delta_count,
438                estimated_bytes = adj_row_count * 100 + delta_count * ENTRY_SIZE_ESTIMATE,
439                "Starting adjacency compaction"
440            );
441
442            use lancedb::query::ExecutableQuery;
443            let stream = table.query().execute().await?;
444            let batches: Vec<RecordBatch> = stream.try_collect().await?;
445
446            for batch in batches {
447                let src_col = batch
448                    .column_by_name("src_vid")
449                    .ok_or(anyhow!("Missing src_vid"))?
450                    .as_any()
451                    .downcast_ref::<UInt64Array>()
452                    .ok_or(anyhow!("Invalid src_vid"))?;
453                let neighbors_col = batch
454                    .column_by_name("neighbors")
455                    .ok_or(anyhow!("Missing neighbors"))?
456                    .as_any()
457                    .downcast_ref::<ListArray>()
458                    .ok_or(anyhow!("Invalid neighbors"))?;
459                let edge_ids_col = batch
460                    .column_by_name("edge_ids")
461                    .ok_or(anyhow!("Missing edge_ids"))?
462                    .as_any()
463                    .downcast_ref::<ListArray>()
464                    .ok_or(anyhow!("Invalid edge_ids"))?;
465
466                for i in 0..batch.num_rows() {
467                    let vid = Vid::from(src_col.value(i));
468                    processed_vids.insert(vid);
469
470                    // Reconstruct current adjacency list
471                    let n_list = neighbors_col.value(i);
472                    let n_array = n_list.as_any().downcast_ref::<UInt64Array>().unwrap();
473                    let e_list = edge_ids_col.value(i);
474                    let e_array = e_list.as_any().downcast_ref::<UInt64Array>().unwrap();
475
476                    let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
477                    for j in 0..n_array.len() {
478                        current_edges
479                            .insert(Eid::from(e_array.value(j)), Vid::from(n_array.value(j)));
480                    }
481
482                    if let Some(ops) = delta_map.get(&vid) {
483                        apply_deltas_to_edges(&mut current_edges, ops, direction);
484                    }
485
486                    append_edges_to_builders(
487                        vid,
488                        &current_edges,
489                        &mut src_vid_builder,
490                        &mut neighbors_builder,
491                        &mut edge_ids_builder,
492                    );
493                }
494            }
495        }
496
497        // Process new vertices (in deltas but not in L2)
498        for (vid, ops) in delta_map {
499            if processed_vids.contains(&vid) {
500                continue;
501            }
502
503            let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
504            apply_deltas_to_edges(&mut current_edges, &ops, direction);
505
506            append_edges_to_builders(
507                vid,
508                &current_edges,
509                &mut src_vid_builder,
510                &mut neighbors_builder,
511                &mut edge_ids_builder,
512            );
513        }
514
515        // Final Flush
516        if src_vid_builder.len() > 0 {
517            let src_arr = Arc::new(src_vid_builder.finish());
518            let neighbors_arr = Arc::new(neighbors_builder.finish());
519            let edge_ids_arr = Arc::new(edge_ids_builder.finish());
520
521            let schema = adj_ds.get_arrow_schema();
522            let batch = RecordBatch::try_new(schema, vec![src_arr, neighbors_arr, edge_ids_arr])?;
523
524            // Replace the table with compacted data using LanceDB
525            let lancedb_store = self.storage.lancedb_store();
526            adj_ds.replace_lancedb(lancedb_store, batch).await?;
527        }
528
529        // CRITICAL: Clear Delta L1 after compaction
530        // Topology ops from Delta L1 are now incorporated into L2 adjacency.
531        // Edge properties survive in main_edges (dual-written during flush).
532        // Clearing Delta L1 prevents stale topology data from being read.
533        if !deltas.is_empty() {
534            info!(
535                "Clearing Delta L1 for edge_type={} direction={} after compaction (incorporated {} ops)",
536                edge_type,
537                direction,
538                deltas.len()
539            );
540
541            // TODO: Re-enable this assertion once test fixtures properly create main_edges entries
542            // Debug assertion: Verify all edge EIDs in delta have entries in main_edges
543            // Currently disabled because some tests create delta entries directly without main_edges
544            /*
545            #[cfg(debug_assertions)]
546            {
547                use crate::storage::main_edge::MainEdgeDataset;
548                let lancedb_store = self.storage.lancedb_store();
549
550                // Collect unique EIDs from deltas
551                let delta_eids: std::collections::HashSet<Eid> =
552                    deltas.iter().map(|e| e.eid).collect();
553
554                for eid in delta_eids {
555                    // Check if EID exists in main_edges
556                    let main_edge_exists = MainEdgeDataset::find_props_by_eid(lancedb_store, eid)
557                        .await
558                        .unwrap_or(None)
559                        .is_some();
560
561                    debug_assert!(
562                        main_edge_exists,
563                        "EID {} from Delta L1 not found in main_edges after compaction. \
564                        This indicates edge properties were not dual-written during flush.",
565                        eid.as_u64()
566                    );
567                }
568            }
569            */
570
571            // Clear the Delta L1 table by replacing with empty batch
572            let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
573            let lancedb_store = self.storage.lancedb_store();
574            let delta_schema = delta_ds.get_arrow_schema(&schema)?;
575            let empty_batch = RecordBatch::new_empty(delta_schema);
576            delta_ds.replace_lancedb(lancedb_store, empty_batch).await?;
577        }
578
579        let duration = start.elapsed();
580        tracing::Span::current().record("duration_ms", duration.as_millis());
581        info!(
582            delta_count,
583            duration_ms = duration.as_millis(),
584            "Adjacency compaction completed"
585        );
586
587        metrics::histogram!("uni_compaction_duration_seconds", "type" => "adjacency")
588            .record(duration.as_secs_f64());
589
590        Ok(CompactionInfo {
591            edge_type: edge_type.to_string(),
592            direction: direction.to_string(),
593        })
594    }
595}
596
597/// Apply delta operations to an edge map, returning the resolved neighbor for the direction.
598fn apply_deltas_to_edges(current_edges: &mut HashMap<Eid, Vid>, ops: &[L1Entry], direction: &str) {
599    for op in ops {
600        match op.op {
601            Op::Insert => {
602                let neighbor = if direction == "fwd" {
603                    op.dst_vid
604                } else {
605                    op.src_vid
606                };
607                current_edges.insert(op.eid, neighbor);
608            }
609            Op::Delete => {
610                current_edges.remove(&op.eid);
611            }
612        }
613    }
614}
615
616/// Write sorted edges from a HashMap into adjacency list builders.
617fn append_edges_to_builders(
618    vid: Vid,
619    current_edges: &HashMap<Eid, Vid>,
620    src_vid_builder: &mut UInt64Builder,
621    neighbors_builder: &mut ListBuilder<UInt64Builder>,
622    edge_ids_builder: &mut ListBuilder<UInt64Builder>,
623) {
624    if current_edges.is_empty() {
625        return;
626    }
627    src_vid_builder.append_value(vid.as_u64());
628
629    let mut sorted_eids: Vec<_> = current_edges.keys().cloned().collect();
630    sorted_eids.sort();
631
632    for eid in sorted_eids {
633        let neighbor = current_edges[&eid];
634        neighbors_builder.values().append_value(neighbor.as_u64());
635        edge_ids_builder.values().append_value(eid.as_u64());
636    }
637    neighbors_builder.append(true);
638    edge_ids_builder.append(true);
639}
640
641/// Information returned by adjacency compaction about what was compacted.
642/// Used to coordinate in-memory CSR re-warm after storage compaction.
643#[derive(Debug, Clone)]
644pub struct CompactionInfo {
645    pub edge_type: String,
646    pub direction: String,
647}