1use crate::storage::delta::{ENTRY_SIZE_ESTIMATE, L1Entry, Op};
5use crate::storage::manager::StorageManager;
6use anyhow::{Result, anyhow};
7use arrow_array::Array;
8use arrow_array::builder::{ArrayBuilder, ListBuilder, UInt64Builder};
9use arrow_array::{ListArray, RecordBatch, UInt64Array};
10use metrics;
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use tracing::{error, info, instrument};
14use uni_common::core::id::{Eid, Vid};
15use uni_common::core::schema::DataType;
16use uni_common::{Properties, Value};
17use uni_crdt::Crdt;
18
19pub struct Compactor {
20 storage: Arc<StorageManager>,
21}
22
23impl Compactor {
24 pub fn new(storage: Arc<StorageManager>) -> Self {
25 Self { storage }
26 }
27
28 #[instrument(skip(self), level = "info")]
29 pub async fn compact_all(&self) -> Result<Vec<CompactionInfo>> {
30 let start = std::time::Instant::now();
31 let schema = self.storage.schema_manager().schema();
32 let mut compaction_results = Vec::new();
33
34 for label in schema.labels.keys() {
36 info!("Compacting vertices for label {}", label);
37 if let Err(e) = self.compact_vertices(label).await {
38 error!("Failed to compact vertices for {}: {}", label, e);
39 }
40 }
41
42 for (edge_type, meta) in &schema.edge_types {
44 for label in &meta.src_labels {
46 info!("Compacting adjacency {} -> {} (fwd)", label, edge_type);
47 match self.compact_adjacency(edge_type, label, "fwd").await {
48 Ok(info) => compaction_results.push(info),
49 Err(e) => {
50 error!(
51 "Failed to compact adjacency {} -> {}: {}",
52 label, edge_type, e
53 );
54 }
55 }
56 }
57
58 for label in &meta.dst_labels {
60 info!("Compacting adjacency {} <- {} (bwd)", label, edge_type);
61 match self.compact_adjacency(edge_type, label, "bwd").await {
62 Ok(info) => compaction_results.push(info),
63 Err(e) => {
64 error!(
65 "Failed to compact adjacency {} <- {}: {}",
66 label, edge_type, e
67 );
68 }
69 }
70 }
71 }
72
73 metrics::counter!("uni_compaction_runs_total").increment(1);
74 metrics::histogram!("uni_compaction_duration_seconds")
75 .record(start.elapsed().as_secs_f64());
76
77 Ok(compaction_results)
78 }
79
80 #[instrument(skip(self), fields(rows_processed, duration_ms), level = "info")]
81 pub async fn compact_vertices(&self, label: &str) -> Result<()> {
82 let start = std::time::Instant::now();
83 let schema_manager = self.storage.schema_manager();
84 let schema = schema_manager.schema();
85
86 let label_props = schema
87 .properties
88 .get(label)
89 .ok_or_else(|| anyhow!("Label not found"))?;
90
91 let crdt_props: HashSet<String> = label_props
93 .iter()
94 .filter(|(_, meta)| matches!(meta.r#type, DataType::Crdt(_)))
95 .map(|(name, _)| name.clone())
96 .collect();
97
98 let dataset = self.storage.vertex_dataset(label)?;
99 let backend = self.storage.backend();
100 let table_name = dataset.table_name();
101
102 if !backend.table_exists(&table_name).await.unwrap_or(false) {
104 info!("No vertex data to compact for label '{}'", label);
105 return Ok(());
106 }
107
108 let row_count = backend.count_rows(&table_name, None).await?;
116 crate::storage::delta::check_oom_guard(
117 row_count,
118 self.storage.config.max_compaction_rows,
119 label,
120 "vertices",
121 )?;
122
123 info!(
124 label = %label,
125 row_count,
126 estimated_bytes = row_count * 200,
127 "Starting vertex compaction"
128 );
129
130 use crate::backend::types::ScanRequest;
131 let batches: Vec<RecordBatch> = backend.scan(ScanRequest::all(&table_name)).await?;
132
133 let mut vertex_state: HashMap<Vid, (Properties, bool)> = HashMap::new();
135 let mut vertex_versions: HashMap<Vid, u64> = HashMap::new();
136 let mut vertex_labels: HashMap<Vid, Vec<String>> = HashMap::new();
137
138 let mut rows_processed = 0;
139
140 for batch in batches {
141 rows_processed += batch.num_rows();
142 let vid_col = batch
143 .column_by_name("_vid")
144 .unwrap()
145 .as_any()
146 .downcast_ref::<UInt64Array>()
147 .unwrap();
148 let ver_col = batch
149 .column_by_name("_version")
150 .unwrap()
151 .as_any()
152 .downcast_ref::<UInt64Array>()
153 .unwrap();
154 let del_col = batch
155 .column_by_name("_deleted")
156 .unwrap()
157 .as_any()
158 .downcast_ref::<arrow_array::BooleanArray>()
159 .unwrap();
160
161 let labels_col = batch
163 .column_by_name("_labels")
164 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
165
166 for i in 0..batch.num_rows() {
167 let vid = Vid::from(vid_col.value(i));
168 let version = ver_col.value(i);
169 let deleted = del_col.value(i);
170
171 if let Some(list_arr) = labels_col
173 && version >= *vertex_versions.entry(vid).or_insert(0)
174 {
175 let labels = crate::storage::arrow_convert::labels_from_list_array(list_arr, i);
176 if !labels.is_empty() {
177 vertex_labels.insert(vid, labels);
178 }
179 }
180
181 let current_entry = vertex_state
182 .entry(vid)
183 .or_insert((Properties::new(), false));
184 let current_version = vertex_versions.entry(vid).or_insert(0);
185
186 if deleted {
193 if version >= *current_version {
194 current_entry.1 = true;
195 current_entry.0.clear(); *current_version = version;
197 }
198 continue;
199 }
200
201 let mut row_props = Properties::new();
204 let mut null_props = Vec::new(); for (name, meta) in label_props {
206 if let Some(col) = batch.column_by_name(name) {
207 if col.is_null(i) {
208 null_props.push(name.clone());
210 } else {
211 let val = crate::storage::value_codec::decode_column_value(
212 col.as_ref(),
213 &meta.r#type,
214 i,
215 crate::storage::value_codec::CrdtDecodeMode::Strict,
216 )?;
217 row_props.insert(name.clone(), val);
218 }
219 }
220 }
221
222 Self::merge_row_into_state(
223 row_props,
224 null_props,
225 version,
226 current_entry,
227 current_version,
228 &crdt_props,
229 )?;
230 }
231 }
232
233 let mut valid_vertices = Vec::new();
235 let mut valid_versions = Vec::new();
236 let mut valid_deleted = Vec::new(); for (vid, (props, deleted)) in vertex_state {
241 if !deleted {
242 let labels = vertex_labels.remove(&vid).unwrap_or_default();
243 valid_vertices.push((vid, labels, props));
244 valid_versions.push(vertex_versions[&vid]);
245 valid_deleted.push(false);
246 }
247 }
248
249 if !valid_vertices.is_empty() {
250 let batch = dataset.build_record_batch(
251 &valid_vertices,
252 &valid_deleted,
253 &valid_versions,
254 &schema,
255 )?;
256 dataset
257 .replace(self.storage.backend(), batch, &schema)
258 .await?;
259 }
260
261 let duration = start.elapsed();
262 let rows_reclaimed = rows_processed as u64 - valid_vertices.len() as u64;
263 metrics::counter!("uni_compaction_rows_reclaimed_total", "type" => "vertex")
264 .increment(rows_reclaimed);
265
266 tracing::Span::current().record("rows_processed", rows_processed);
267 tracing::Span::current().record("duration_ms", duration.as_millis());
268 info!(
269 rows = rows_processed,
270 duration_ms = duration.as_millis(),
271 "Vertex compaction completed"
272 );
273
274 metrics::histogram!("uni_compaction_duration_seconds", "type" => "vertex")
275 .record(duration.as_secs_f64());
276
277 Ok(())
278 }
279
280 fn merge_crdt_values(a: &Value, b: &Value) -> Result<Value> {
281 if a.is_null() {
282 return Ok(b.clone());
283 }
284 if b.is_null() {
285 return Ok(a.clone());
286 }
287 let mut crdt_a: Crdt = serde_json::from_value(a.clone().into())?;
288 let crdt_b: Crdt = serde_json::from_value(b.clone().into())?;
289 crdt_a
290 .try_merge(&crdt_b)
291 .map_err(|e| anyhow::anyhow!("{e}"))?;
292 Ok(Value::from(serde_json::to_value(crdt_a)?))
293 }
294
295 fn merge_row_into_state(
297 row_props: Properties,
298 null_props: Vec<String>,
299 version: u64,
300 current_entry: &mut (Properties, bool),
301 current_version: &mut u64,
302 crdt_props: &HashSet<String>,
303 ) -> Result<()> {
304 if version > *current_version {
305 *current_version = version;
307 current_entry.1 = false;
308
309 for (k, v) in row_props {
310 if crdt_props.contains(&k) {
311 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
312 *existing = Self::merge_crdt_values(existing, &v)?;
313 } else {
314 current_entry.0.insert(k, v);
315 }
316 }
317
318 for null_prop in &null_props {
320 if !crdt_props.contains(null_prop) {
321 current_entry.0.remove(null_prop);
322 }
323 }
324 } else if version == *current_version {
325 current_entry.1 = false;
327 for (k, v) in row_props {
328 if crdt_props.contains(&k) {
329 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
330 *existing = Self::merge_crdt_values(existing, &v)?;
331 } else {
332 current_entry.0.insert(k, v);
333 }
334 }
335 } else {
336 if !current_entry.1 {
338 for (k, v) in row_props {
339 if crdt_props.contains(&k) {
340 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
341 *existing = Self::merge_crdt_values(existing, &v)?;
342 }
343 }
344 }
345 }
346 Ok(())
347 }
348
349 #[instrument(skip(self), fields(delta_count, duration_ms), level = "info")]
350 pub async fn compact_adjacency(
351 &self,
352 edge_type: &str,
353 label: &str,
354 direction: &str,
355 ) -> Result<CompactionInfo> {
356 let start = std::time::Instant::now();
357 let schema = self.storage.schema_manager().schema();
358
359 let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
361 let deltas = delta_ds
362 .scan_all_backend(self.storage.backend(), &schema)
363 .await?;
364
365 let delta_count = deltas.len();
366 tracing::Span::current().record("delta_count", delta_count);
367
368 if deltas.is_empty() {
369 return Ok(CompactionInfo {
371 edge_type: edge_type.to_string(),
372 direction: direction.to_string(),
373 });
374 }
375
376 let mut delta_map: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
380 for entry in &deltas {
381 let key = if direction == "fwd" {
382 entry.src_vid
383 } else {
384 entry.dst_vid
385 };
386 delta_map.entry(key).or_default().push(entry.clone());
387 }
388
389 for ops in delta_map.values_mut() {
392 ops.sort_by_key(|e| e.version);
393 }
394
395 let adj_ds = self
397 .storage
398 .adjacency_dataset(edge_type, label, direction)?;
399
400 let mut src_vid_builder = UInt64Builder::new();
411 let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
412 let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
413
414 let mut processed_vids = HashSet::new();
415
416 let backend = self.storage.backend();
418 let adj_table_name = adj_ds.table_name();
419 if backend.table_exists(&adj_table_name).await.unwrap_or(false) {
420 let adj_row_count = backend.count_rows(&adj_table_name, None).await?;
421 crate::storage::delta::check_oom_guard(
422 adj_row_count,
423 self.storage.config.max_compaction_rows,
424 &format!("{}_{}", edge_type, label),
425 direction,
426 )?;
427
428 info!(
429 edge_type = %edge_type,
430 label = %label,
431 direction = %direction,
432 adj_row_count,
433 delta_count,
434 estimated_bytes = adj_row_count * 100 + delta_count * ENTRY_SIZE_ESTIMATE,
435 "Starting adjacency compaction"
436 );
437
438 use crate::backend::types::ScanRequest;
439 let batches: Vec<RecordBatch> = backend.scan(ScanRequest::all(&adj_table_name)).await?;
440
441 for batch in batches {
442 let src_col = batch
443 .column_by_name("src_vid")
444 .ok_or(anyhow!("Missing src_vid"))?
445 .as_any()
446 .downcast_ref::<UInt64Array>()
447 .ok_or(anyhow!("Invalid src_vid"))?;
448 let neighbors_col = batch
449 .column_by_name("neighbors")
450 .ok_or(anyhow!("Missing neighbors"))?
451 .as_any()
452 .downcast_ref::<ListArray>()
453 .ok_or(anyhow!("Invalid neighbors"))?;
454 let edge_ids_col = batch
455 .column_by_name("edge_ids")
456 .ok_or(anyhow!("Missing edge_ids"))?
457 .as_any()
458 .downcast_ref::<ListArray>()
459 .ok_or(anyhow!("Invalid edge_ids"))?;
460
461 for i in 0..batch.num_rows() {
462 let vid = Vid::from(src_col.value(i));
463 processed_vids.insert(vid);
464
465 let n_list = neighbors_col.value(i);
467 let n_array = n_list.as_any().downcast_ref::<UInt64Array>().unwrap();
468 let e_list = edge_ids_col.value(i);
469 let e_array = e_list.as_any().downcast_ref::<UInt64Array>().unwrap();
470
471 let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
472 for j in 0..n_array.len() {
473 current_edges
474 .insert(Eid::from(e_array.value(j)), Vid::from(n_array.value(j)));
475 }
476
477 if let Some(ops) = delta_map.get(&vid) {
478 apply_deltas_to_edges(&mut current_edges, ops, direction);
479 }
480
481 append_edges_to_builders(
482 vid,
483 ¤t_edges,
484 &mut src_vid_builder,
485 &mut neighbors_builder,
486 &mut edge_ids_builder,
487 );
488 }
489 }
490 }
491
492 for (vid, ops) in delta_map {
494 if processed_vids.contains(&vid) {
495 continue;
496 }
497
498 let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
499 apply_deltas_to_edges(&mut current_edges, &ops, direction);
500
501 append_edges_to_builders(
502 vid,
503 ¤t_edges,
504 &mut src_vid_builder,
505 &mut neighbors_builder,
506 &mut edge_ids_builder,
507 );
508 }
509
510 if src_vid_builder.len() > 0 {
512 let src_arr = Arc::new(src_vid_builder.finish());
513 let neighbors_arr = Arc::new(neighbors_builder.finish());
514 let edge_ids_arr = Arc::new(edge_ids_builder.finish());
515
516 let schema = adj_ds.get_arrow_schema();
517 let batch = RecordBatch::try_new(schema, vec![src_arr, neighbors_arr, edge_ids_arr])?;
518
519 adj_ds.replace(self.storage.backend(), batch).await?;
521 }
522
523 if !deltas.is_empty() {
528 info!(
529 "Clearing Delta L1 for edge_type={} direction={} after compaction (incorporated {} ops)",
530 edge_type,
531 direction,
532 deltas.len()
533 );
534
535 #[cfg(debug_assertions)]
540 {
541 use crate::storage::main_edge::MainEdgeDataset;
542
543 let delta_eids: std::collections::HashSet<Eid> =
544 deltas.iter().map(|e| e.eid).collect();
545
546 for eid in delta_eids {
547 let main_edge_exists =
548 MainEdgeDataset::exists_by_eid(self.storage.backend(), eid)
549 .await
550 .unwrap_or(false);
551
552 debug_assert!(
553 main_edge_exists,
554 "EID {} from Delta L1 not found in main_edges after compaction. \
555 This indicates edge properties were not dual-written during flush.",
556 eid.as_u64()
557 );
558 }
559 }
560
561 let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
563 let delta_schema = delta_ds.get_arrow_schema(&schema)?;
564 let empty_batch = RecordBatch::new_empty(delta_schema);
565 delta_ds
566 .replace(self.storage.backend(), empty_batch)
567 .await?;
568 }
569
570 let duration = start.elapsed();
571 tracing::Span::current().record("duration_ms", duration.as_millis());
572 info!(
573 delta_count,
574 duration_ms = duration.as_millis(),
575 "Adjacency compaction completed"
576 );
577
578 metrics::histogram!("uni_compaction_duration_seconds", "type" => "adjacency")
579 .record(duration.as_secs_f64());
580
581 Ok(CompactionInfo {
582 edge_type: edge_type.to_string(),
583 direction: direction.to_string(),
584 })
585 }
586}
587
588fn apply_deltas_to_edges(current_edges: &mut HashMap<Eid, Vid>, ops: &[L1Entry], direction: &str) {
590 for op in ops {
591 match op.op {
592 Op::Insert => {
593 let neighbor = if direction == "fwd" {
594 op.dst_vid
595 } else {
596 op.src_vid
597 };
598 current_edges.insert(op.eid, neighbor);
599 }
600 Op::Delete => {
601 current_edges.remove(&op.eid);
602 }
603 }
604 }
605}
606
607fn append_edges_to_builders(
609 vid: Vid,
610 current_edges: &HashMap<Eid, Vid>,
611 src_vid_builder: &mut UInt64Builder,
612 neighbors_builder: &mut ListBuilder<UInt64Builder>,
613 edge_ids_builder: &mut ListBuilder<UInt64Builder>,
614) {
615 if current_edges.is_empty() {
616 return;
617 }
618 src_vid_builder.append_value(vid.as_u64());
619
620 let mut sorted_eids: Vec<_> = current_edges.keys().cloned().collect();
621 sorted_eids.sort();
622
623 for eid in sorted_eids {
624 let neighbor = current_edges[&eid];
625 neighbors_builder.values().append_value(neighbor.as_u64());
626 edge_ids_builder.values().append_value(eid.as_u64());
627 }
628 neighbors_builder.append(true);
629 edge_ids_builder.append(true);
630}
631
632#[derive(Debug, Clone)]
635pub struct CompactionInfo {
636 pub edge_type: String,
637 pub direction: String,
638}