Skip to main content

uni_store/storage/
compaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::storage::delta::{ENTRY_SIZE_ESTIMATE, L1Entry, Op};
5use crate::storage::manager::StorageManager;
6use anyhow::{Result, anyhow};
7use arrow_array::Array;
8use arrow_array::builder::{ArrayBuilder, ListBuilder, UInt64Builder};
9use arrow_array::{ListArray, RecordBatch, UInt64Array};
10use metrics;
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use tracing::{error, info, instrument};
14use uni_common::core::id::{Eid, Vid};
15use uni_common::core::schema::DataType;
16use uni_common::{Properties, Value};
17use uni_crdt::Crdt;
18
19pub struct Compactor {
20    storage: Arc<StorageManager>,
21}
22
23impl Compactor {
24    pub fn new(storage: Arc<StorageManager>) -> Self {
25        Self { storage }
26    }
27
28    #[instrument(skip(self), level = "info")]
29    pub async fn compact_all(&self) -> Result<Vec<CompactionInfo>> {
30        let start = std::time::Instant::now();
31        let schema = self.storage.schema_manager().schema();
32        let mut compaction_results = Vec::new();
33
34        // Compact Vertices
35        for label in schema.labels.keys() {
36            info!("Compacting vertices for label {}", label);
37            if let Err(e) = self.compact_vertices(label).await {
38                error!("Failed to compact vertices for {}: {}", label, e);
39            }
40        }
41
42        // Compact Edges
43        for (edge_type, meta) in &schema.edge_types {
44            // Outgoing: src_labels
45            for label in &meta.src_labels {
46                info!("Compacting adjacency {} -> {} (fwd)", label, edge_type);
47                match self.compact_adjacency(edge_type, label, "fwd").await {
48                    Ok(info) => compaction_results.push(info),
49                    Err(e) => {
50                        error!(
51                            "Failed to compact adjacency {} -> {}: {}",
52                            label, edge_type, e
53                        );
54                    }
55                }
56            }
57
58            // Incoming: dst_labels
59            for label in &meta.dst_labels {
60                info!("Compacting adjacency {} <- {} (bwd)", label, edge_type);
61                match self.compact_adjacency(edge_type, label, "bwd").await {
62                    Ok(info) => compaction_results.push(info),
63                    Err(e) => {
64                        error!(
65                            "Failed to compact adjacency {} <- {}: {}",
66                            label, edge_type, e
67                        );
68                    }
69                }
70            }
71        }
72
73        metrics::counter!("uni_compaction_runs_total").increment(1);
74        metrics::histogram!("uni_compaction_duration_seconds")
75            .record(start.elapsed().as_secs_f64());
76
77        Ok(compaction_results)
78    }
79
80    #[instrument(skip(self), fields(rows_processed, duration_ms), level = "info")]
81    pub async fn compact_vertices(&self, label: &str) -> Result<()> {
82        let start = std::time::Instant::now();
83        let schema_manager = self.storage.schema_manager();
84        let schema = schema_manager.schema();
85
86        let label_props = schema
87            .properties
88            .get(label)
89            .ok_or_else(|| anyhow!("Label not found"))?;
90
91        // Identify CRDT properties
92        let crdt_props: HashSet<String> = label_props
93            .iter()
94            .filter(|(_, meta)| matches!(meta.r#type, DataType::Crdt(_)))
95            .map(|(name, _)| name.clone())
96            .collect();
97
98        let dataset = self.storage.vertex_dataset(label)?;
99        let backend = self.storage.backend();
100        let table_name = dataset.table_name();
101
102        // Check if table exists
103        if !backend.table_exists(&table_name).await.unwrap_or(false) {
104            info!("No vertex data to compact for label '{}'", label);
105            return Ok(());
106        }
107
108        // In-memory compaction for now (MVP).
109        // For large datasets, this needs to be streaming/chunked with external sort.
110        // Current approach: Read ALL, merge in map, write NEW.
111        // TODO(perf): This accumulates ALL vertices in memory, causing OOM for large
112        // labels (millions of vertices). Refactor to use streaming merge-sort with
113        // constant memory usage (e.g., external sort or Lance fragment-by-fragment merge).
114
115        let row_count = backend.count_rows(&table_name, None).await?;
116        crate::storage::delta::check_oom_guard(
117            row_count,
118            self.storage.config.max_compaction_rows,
119            label,
120            "vertices",
121        )?;
122
123        info!(
124            label = %label,
125            row_count,
126            estimated_bytes = row_count * 200,
127            "Starting vertex compaction"
128        );
129
130        use crate::backend::types::ScanRequest;
131        let batches: Vec<RecordBatch> = backend.scan(ScanRequest::all(&table_name)).await?;
132
133        // Vid -> (Properties, Deleted)
134        let mut vertex_state: HashMap<Vid, (Properties, bool)> = HashMap::new();
135        let mut vertex_versions: HashMap<Vid, u64> = HashMap::new();
136        let mut vertex_labels: HashMap<Vid, Vec<String>> = HashMap::new();
137
138        let mut rows_processed = 0;
139
140        for batch in batches {
141            rows_processed += batch.num_rows();
142            let vid_col = batch
143                .column_by_name("_vid")
144                .unwrap()
145                .as_any()
146                .downcast_ref::<UInt64Array>()
147                .unwrap();
148            let ver_col = batch
149                .column_by_name("_version")
150                .unwrap()
151                .as_any()
152                .downcast_ref::<UInt64Array>()
153                .unwrap();
154            let del_col = batch
155                .column_by_name("_deleted")
156                .unwrap()
157                .as_any()
158                .downcast_ref::<arrow_array::BooleanArray>()
159                .unwrap();
160
161            // Read _labels column (List<Utf8>) if present
162            let labels_col = batch
163                .column_by_name("_labels")
164                .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
165
166            for i in 0..batch.num_rows() {
167                let vid = Vid::from(vid_col.value(i));
168                let version = ver_col.value(i);
169                let deleted = del_col.value(i);
170
171                // Extract labels from the _labels column (keep latest version's labels)
172                if let Some(list_arr) = labels_col
173                    && version >= *vertex_versions.entry(vid).or_insert(0)
174                {
175                    let labels = crate::storage::arrow_convert::labels_from_list_array(list_arr, i);
176                    if !labels.is_empty() {
177                        vertex_labels.insert(vid, labels);
178                    }
179                }
180
181                let current_entry = vertex_state
182                    .entry(vid)
183                    .or_insert((Properties::new(), false));
184                let current_version = vertex_versions.entry(vid).or_insert(0);
185
186                // If this row is newer than what we've seen (or same), we apply logic.
187                // Wait, if we process unordered, we need to be careful.
188                // For CRDTs, we MERGE regardless of version (commutative).
189                // For LWW, we take MAX version.
190
191                // If it's a deletion, and it's newer, it wins.
192                if deleted {
193                    if version >= *current_version {
194                        current_entry.1 = true;
195                        current_entry.0.clear(); // Clear properties on delete
196                        *current_version = version;
197                    }
198                    continue;
199                }
200
201                // It's an update/insert
202                // Extract props and track NULLs (property removals)
203                let mut row_props = Properties::new();
204                let mut null_props = Vec::new(); // Track explicitly NULL properties
205                for (name, meta) in label_props {
206                    if let Some(col) = batch.column_by_name(name) {
207                        if col.is_null(i) {
208                            // Property was explicitly removed (set to NULL)
209                            null_props.push(name.clone());
210                        } else {
211                            let val = crate::storage::value_codec::decode_column_value(
212                                col.as_ref(),
213                                &meta.r#type,
214                                i,
215                                crate::storage::value_codec::CrdtDecodeMode::Strict,
216                            )?;
217                            row_props.insert(name.clone(), val);
218                        }
219                    }
220                }
221
222                Self::merge_row_into_state(
223                    row_props,
224                    null_props,
225                    version,
226                    current_entry,
227                    current_version,
228                    &crdt_props,
229                )?;
230            }
231        }
232
233        // Convert state to RecordBatch and write OVERWRITE
234        let mut valid_vertices = Vec::new();
235        let mut valid_versions = Vec::new();
236        let mut valid_deleted = Vec::new(); // Should be all false if we filter out tombstones?
237        // Or we keep tombstones if they are recent?
238        // Compaction usually removes tombstones.
239
240        for (vid, (props, deleted)) in vertex_state {
241            if !deleted {
242                let labels = vertex_labels.remove(&vid).unwrap_or_default();
243                valid_vertices.push((vid, labels, props));
244                valid_versions.push(vertex_versions[&vid]);
245                valid_deleted.push(false);
246            }
247        }
248
249        if !valid_vertices.is_empty() {
250            let batch = dataset.build_record_batch(
251                &valid_vertices,
252                &valid_deleted,
253                &valid_versions,
254                &schema,
255            )?;
256            dataset
257                .replace(self.storage.backend(), batch, &schema)
258                .await?;
259        }
260
261        let duration = start.elapsed();
262        let rows_reclaimed = rows_processed as u64 - valid_vertices.len() as u64;
263        metrics::counter!("uni_compaction_rows_reclaimed_total", "type" => "vertex")
264            .increment(rows_reclaimed);
265
266        tracing::Span::current().record("rows_processed", rows_processed);
267        tracing::Span::current().record("duration_ms", duration.as_millis());
268        info!(
269            rows = rows_processed,
270            duration_ms = duration.as_millis(),
271            "Vertex compaction completed"
272        );
273
274        metrics::histogram!("uni_compaction_duration_seconds", "type" => "vertex")
275            .record(duration.as_secs_f64());
276
277        Ok(())
278    }
279
280    fn merge_crdt_values(a: &Value, b: &Value) -> Result<Value> {
281        if a.is_null() {
282            return Ok(b.clone());
283        }
284        if b.is_null() {
285            return Ok(a.clone());
286        }
287        let mut crdt_a: Crdt = serde_json::from_value(a.clone().into())?;
288        let crdt_b: Crdt = serde_json::from_value(b.clone().into())?;
289        crdt_a
290            .try_merge(&crdt_b)
291            .map_err(|e| anyhow::anyhow!("{e}"))?;
292        Ok(Value::from(serde_json::to_value(crdt_a)?))
293    }
294
295    /// Merge row properties into state based on version comparison.
296    fn merge_row_into_state(
297        row_props: Properties,
298        null_props: Vec<String>,
299        version: u64,
300        current_entry: &mut (Properties, bool),
301        current_version: &mut u64,
302        crdt_props: &HashSet<String>,
303    ) -> Result<()> {
304        if version > *current_version {
305            // New version wins for LWW, merge for CRDTs
306            *current_version = version;
307            current_entry.1 = false;
308
309            for (k, v) in row_props {
310                if crdt_props.contains(&k) {
311                    let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
312                    *existing = Self::merge_crdt_values(existing, &v)?;
313                } else {
314                    current_entry.0.insert(k, v);
315                }
316            }
317
318            // Remove properties explicitly set to NULL in the newer version
319            for null_prop in &null_props {
320                if !crdt_props.contains(null_prop) {
321                    current_entry.0.remove(null_prop);
322                }
323            }
324        } else if version == *current_version {
325            // Same version: merge all
326            current_entry.1 = false;
327            for (k, v) in row_props {
328                if crdt_props.contains(&k) {
329                    let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
330                    *existing = Self::merge_crdt_values(existing, &v)?;
331                } else {
332                    current_entry.0.insert(k, v);
333                }
334            }
335        } else {
336            // Older version: only merge CRDTs
337            if !current_entry.1 {
338                for (k, v) in row_props {
339                    if crdt_props.contains(&k) {
340                        let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
341                        *existing = Self::merge_crdt_values(existing, &v)?;
342                    }
343                }
344            }
345        }
346        Ok(())
347    }
348
349    #[instrument(skip(self), fields(delta_count, duration_ms), level = "info")]
350    pub async fn compact_adjacency(
351        &self,
352        edge_type: &str,
353        label: &str,
354        direction: &str,
355    ) -> Result<CompactionInfo> {
356        let start = std::time::Instant::now();
357        let schema = self.storage.schema_manager().schema();
358
359        // 1. Load all L1 Deltas sorted by key
360        let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
361        let deltas = delta_ds
362            .scan_all_backend(self.storage.backend(), &schema)
363            .await?;
364
365        let delta_count = deltas.len();
366        tracing::Span::current().record("delta_count", delta_count);
367
368        if deltas.is_empty() {
369            // Nothing to compact, return info anyway
370            return Ok(CompactionInfo {
371                edge_type: edge_type.to_string(),
372                direction: direction.to_string(),
373            });
374        }
375
376        // Group deltas by src_vid (if fwd) or dst_vid (if bwd)
377        // We'll use a HashMap for now since we loaded all into memory.
378        // Value is list of ops for that vertex.
379        let mut delta_map: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
380        for entry in &deltas {
381            let key = if direction == "fwd" {
382                entry.src_vid
383            } else {
384                entry.dst_vid
385            };
386            delta_map.entry(key).or_default().push(entry.clone());
387        }
388
389        // Sort each VID's ops by version to ensure correct ordering
390        // This guarantees Delete(v=2) beats Insert(v=1) regardless of scan order
391        for ops in delta_map.values_mut() {
392            ops.sort_by_key(|e| e.version);
393        }
394
395        // 2. Open L2 Adjacency stream
396        let adj_ds = self
397            .storage
398            .adjacency_dataset(edge_type, label, direction)?;
399
400        // We need to write a NEW version.
401        // Strategy:
402        // - Read L2 batch by batch.
403        // - For each row (vertex), check if we have deltas.
404        // - Apply deltas.
405        // - Write to new batch.
406        // - Track which vertices from deltas we've processed.
407        // - After L2 stream ends, process remaining "new" vertices from deltas.
408
409        // Output Builders
410        let mut src_vid_builder = UInt64Builder::new();
411        let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
412        let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
413
414        let mut processed_vids = HashSet::new();
415
416        // Try to read from backend (canonical storage)
417        let backend = self.storage.backend();
418        let adj_table_name = adj_ds.table_name();
419        if backend.table_exists(&adj_table_name).await.unwrap_or(false) {
420            let adj_row_count = backend.count_rows(&adj_table_name, None).await?;
421            crate::storage::delta::check_oom_guard(
422                adj_row_count,
423                self.storage.config.max_compaction_rows,
424                &format!("{}_{}", edge_type, label),
425                direction,
426            )?;
427
428            info!(
429                edge_type = %edge_type,
430                label = %label,
431                direction = %direction,
432                adj_row_count,
433                delta_count,
434                estimated_bytes = adj_row_count * 100 + delta_count * ENTRY_SIZE_ESTIMATE,
435                "Starting adjacency compaction"
436            );
437
438            use crate::backend::types::ScanRequest;
439            let batches: Vec<RecordBatch> = backend.scan(ScanRequest::all(&adj_table_name)).await?;
440
441            for batch in batches {
442                let src_col = batch
443                    .column_by_name("src_vid")
444                    .ok_or(anyhow!("Missing src_vid"))?
445                    .as_any()
446                    .downcast_ref::<UInt64Array>()
447                    .ok_or(anyhow!("Invalid src_vid"))?;
448                let neighbors_col = batch
449                    .column_by_name("neighbors")
450                    .ok_or(anyhow!("Missing neighbors"))?
451                    .as_any()
452                    .downcast_ref::<ListArray>()
453                    .ok_or(anyhow!("Invalid neighbors"))?;
454                let edge_ids_col = batch
455                    .column_by_name("edge_ids")
456                    .ok_or(anyhow!("Missing edge_ids"))?
457                    .as_any()
458                    .downcast_ref::<ListArray>()
459                    .ok_or(anyhow!("Invalid edge_ids"))?;
460
461                for i in 0..batch.num_rows() {
462                    let vid = Vid::from(src_col.value(i));
463                    processed_vids.insert(vid);
464
465                    // Reconstruct current adjacency list
466                    let n_list = neighbors_col.value(i);
467                    let n_array = n_list.as_any().downcast_ref::<UInt64Array>().unwrap();
468                    let e_list = edge_ids_col.value(i);
469                    let e_array = e_list.as_any().downcast_ref::<UInt64Array>().unwrap();
470
471                    let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
472                    for j in 0..n_array.len() {
473                        current_edges
474                            .insert(Eid::from(e_array.value(j)), Vid::from(n_array.value(j)));
475                    }
476
477                    if let Some(ops) = delta_map.get(&vid) {
478                        apply_deltas_to_edges(&mut current_edges, ops, direction);
479                    }
480
481                    append_edges_to_builders(
482                        vid,
483                        &current_edges,
484                        &mut src_vid_builder,
485                        &mut neighbors_builder,
486                        &mut edge_ids_builder,
487                    );
488                }
489            }
490        }
491
492        // Process new vertices (in deltas but not in L2)
493        for (vid, ops) in delta_map {
494            if processed_vids.contains(&vid) {
495                continue;
496            }
497
498            let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
499            apply_deltas_to_edges(&mut current_edges, &ops, direction);
500
501            append_edges_to_builders(
502                vid,
503                &current_edges,
504                &mut src_vid_builder,
505                &mut neighbors_builder,
506                &mut edge_ids_builder,
507            );
508        }
509
510        // Final Flush
511        if src_vid_builder.len() > 0 {
512            let src_arr = Arc::new(src_vid_builder.finish());
513            let neighbors_arr = Arc::new(neighbors_builder.finish());
514            let edge_ids_arr = Arc::new(edge_ids_builder.finish());
515
516            let schema = adj_ds.get_arrow_schema();
517            let batch = RecordBatch::try_new(schema, vec![src_arr, neighbors_arr, edge_ids_arr])?;
518
519            // Replace the table with compacted data
520            adj_ds.replace(self.storage.backend(), batch).await?;
521        }
522
523        // CRITICAL: Clear Delta L1 after compaction
524        // Topology ops from Delta L1 are now incorporated into L2 adjacency.
525        // Edge properties survive in main_edges (dual-written during flush).
526        // Clearing Delta L1 prevents stale topology data from being read.
527        if !deltas.is_empty() {
528            info!(
529                "Clearing Delta L1 for edge_type={} direction={} after compaction (incorporated {} ops)",
530                edge_type,
531                direction,
532                deltas.len()
533            );
534
535            // Invariant: Every EID in Delta L1 must have a corresponding entry in
536            // main_edges, because Writer::flush_to_l1 performs a dual-write.
537            // Tests that create delta entries directly (for schema/overflow testing)
538            // must not call compact_adjacency without also populating main_edges.
539            #[cfg(debug_assertions)]
540            {
541                use crate::storage::main_edge::MainEdgeDataset;
542
543                let delta_eids: std::collections::HashSet<Eid> =
544                    deltas.iter().map(|e| e.eid).collect();
545
546                for eid in delta_eids {
547                    let main_edge_exists =
548                        MainEdgeDataset::exists_by_eid(self.storage.backend(), eid)
549                            .await
550                            .unwrap_or(false);
551
552                    debug_assert!(
553                        main_edge_exists,
554                        "EID {} from Delta L1 not found in main_edges after compaction. \
555                        This indicates edge properties were not dual-written during flush.",
556                        eid.as_u64()
557                    );
558                }
559            }
560
561            // Clear the Delta L1 table by replacing with empty batch
562            let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
563            let delta_schema = delta_ds.get_arrow_schema(&schema)?;
564            let empty_batch = RecordBatch::new_empty(delta_schema);
565            delta_ds
566                .replace(self.storage.backend(), empty_batch)
567                .await?;
568        }
569
570        let duration = start.elapsed();
571        tracing::Span::current().record("duration_ms", duration.as_millis());
572        info!(
573            delta_count,
574            duration_ms = duration.as_millis(),
575            "Adjacency compaction completed"
576        );
577
578        metrics::histogram!("uni_compaction_duration_seconds", "type" => "adjacency")
579            .record(duration.as_secs_f64());
580
581        Ok(CompactionInfo {
582            edge_type: edge_type.to_string(),
583            direction: direction.to_string(),
584        })
585    }
586}
587
588/// Apply delta operations to an edge map, returning the resolved neighbor for the direction.
589fn apply_deltas_to_edges(current_edges: &mut HashMap<Eid, Vid>, ops: &[L1Entry], direction: &str) {
590    for op in ops {
591        match op.op {
592            Op::Insert => {
593                let neighbor = if direction == "fwd" {
594                    op.dst_vid
595                } else {
596                    op.src_vid
597                };
598                current_edges.insert(op.eid, neighbor);
599            }
600            Op::Delete => {
601                current_edges.remove(&op.eid);
602            }
603        }
604    }
605}
606
607/// Write sorted edges from a HashMap into adjacency list builders.
608fn append_edges_to_builders(
609    vid: Vid,
610    current_edges: &HashMap<Eid, Vid>,
611    src_vid_builder: &mut UInt64Builder,
612    neighbors_builder: &mut ListBuilder<UInt64Builder>,
613    edge_ids_builder: &mut ListBuilder<UInt64Builder>,
614) {
615    if current_edges.is_empty() {
616        return;
617    }
618    src_vid_builder.append_value(vid.as_u64());
619
620    let mut sorted_eids: Vec<_> = current_edges.keys().cloned().collect();
621    sorted_eids.sort();
622
623    for eid in sorted_eids {
624        let neighbor = current_edges[&eid];
625        neighbors_builder.values().append_value(neighbor.as_u64());
626        edge_ids_builder.values().append_value(eid.as_u64());
627    }
628    neighbors_builder.append(true);
629    edge_ids_builder.append(true);
630}
631
632/// Information returned by adjacency compaction about what was compacted.
633/// Used to coordinate in-memory CSR re-warm after storage compaction.
634#[derive(Debug, Clone)]
635pub struct CompactionInfo {
636    pub edge_type: String,
637    pub direction: String,
638}