1use crate::storage::delta::{ENTRY_SIZE_ESTIMATE, L1Entry, Op};
5use crate::storage::manager::StorageManager;
6use anyhow::{Result, anyhow};
7use arrow_array::Array;
8use arrow_array::builder::{ArrayBuilder, ListBuilder, UInt64Builder};
9use arrow_array::{ListArray, RecordBatch, UInt64Array};
10use futures::TryStreamExt;
11use metrics;
12use std::collections::{HashMap, HashSet};
13use std::sync::Arc;
14use tracing::{error, info, instrument};
15use uni_common::core::id::{Eid, Vid};
16use uni_common::core::schema::DataType;
17use uni_common::{Properties, Value};
18use uni_crdt::Crdt;
19
20pub struct Compactor {
21 storage: Arc<StorageManager>,
22}
23
24impl Compactor {
25 pub fn new(storage: Arc<StorageManager>) -> Self {
26 Self { storage }
27 }
28
29 #[instrument(skip(self), level = "info")]
30 pub async fn compact_all(&self) -> Result<Vec<CompactionInfo>> {
31 let start = std::time::Instant::now();
32 let schema = self.storage.schema_manager().schema();
33 let mut compaction_results = Vec::new();
34
35 for label in schema.labels.keys() {
37 info!("Compacting vertices for label {}", label);
38 if let Err(e) = self.compact_vertices(label).await {
39 error!("Failed to compact vertices for {}: {}", label, e);
40 }
41 }
42
43 for (edge_type, meta) in &schema.edge_types {
45 for label in &meta.src_labels {
47 info!("Compacting adjacency {} -> {} (fwd)", label, edge_type);
48 match self.compact_adjacency(edge_type, label, "fwd").await {
49 Ok(info) => compaction_results.push(info),
50 Err(e) => {
51 error!(
52 "Failed to compact adjacency {} -> {}: {}",
53 label, edge_type, e
54 );
55 }
56 }
57 }
58
59 for label in &meta.dst_labels {
61 info!("Compacting adjacency {} <- {} (bwd)", label, edge_type);
62 match self.compact_adjacency(edge_type, label, "bwd").await {
63 Ok(info) => compaction_results.push(info),
64 Err(e) => {
65 error!(
66 "Failed to compact adjacency {} <- {}: {}",
67 label, edge_type, e
68 );
69 }
70 }
71 }
72 }
73
74 metrics::counter!("uni_compaction_runs_total").increment(1);
75 metrics::histogram!("uni_compaction_duration_seconds")
76 .record(start.elapsed().as_secs_f64());
77
78 Ok(compaction_results)
79 }
80
81 #[instrument(skip(self), fields(rows_processed, duration_ms), level = "info")]
82 pub async fn compact_vertices(&self, label: &str) -> Result<()> {
83 let start = std::time::Instant::now();
84 let schema_manager = self.storage.schema_manager();
85 let schema = schema_manager.schema();
86
87 let label_props = schema
88 .properties
89 .get(label)
90 .ok_or_else(|| anyhow!("Label not found"))?;
91
92 let crdt_props: HashSet<String> = label_props
94 .iter()
95 .filter(|(_, meta)| matches!(meta.r#type, DataType::Crdt(_)))
96 .map(|(name, _)| name.clone())
97 .collect();
98
99 let dataset = self.storage.vertex_dataset(label)?;
100 let lancedb_store = self.storage.lancedb_store();
101
102 let table = match dataset.open_lancedb(lancedb_store).await {
104 Ok(t) => t,
105 Err(_) => {
106 info!("No vertex data to compact for label '{}'", label);
108 return Ok(());
109 }
110 };
111
112 let row_count = table.count_rows(None).await?;
120 crate::storage::delta::check_oom_guard(
121 row_count,
122 self.storage.config.max_compaction_rows,
123 label,
124 "vertices",
125 )?;
126
127 info!(
128 label = %label,
129 row_count,
130 estimated_bytes = row_count * 200,
131 "Starting vertex compaction"
132 );
133
134 use lancedb::query::ExecutableQuery;
135 let stream = table.query().execute().await?;
136 let batches: Vec<RecordBatch> = stream.try_collect().await?;
137
138 let mut vertex_state: HashMap<Vid, (Properties, bool)> = HashMap::new();
140 let mut vertex_versions: HashMap<Vid, u64> = HashMap::new();
141 let mut vertex_labels: HashMap<Vid, Vec<String>> = HashMap::new();
142
143 let mut rows_processed = 0;
144
145 for batch in batches {
146 rows_processed += batch.num_rows();
147 let vid_col = batch
148 .column_by_name("_vid")
149 .unwrap()
150 .as_any()
151 .downcast_ref::<UInt64Array>()
152 .unwrap();
153 let ver_col = batch
154 .column_by_name("_version")
155 .unwrap()
156 .as_any()
157 .downcast_ref::<UInt64Array>()
158 .unwrap();
159 let del_col = batch
160 .column_by_name("_deleted")
161 .unwrap()
162 .as_any()
163 .downcast_ref::<arrow_array::BooleanArray>()
164 .unwrap();
165
166 let labels_col = batch
168 .column_by_name("_labels")
169 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
170
171 for i in 0..batch.num_rows() {
172 let vid = Vid::from(vid_col.value(i));
173 let version = ver_col.value(i);
174 let deleted = del_col.value(i);
175
176 if let Some(list_arr) = labels_col
178 && version >= *vertex_versions.entry(vid).or_insert(0)
179 {
180 let labels = crate::storage::arrow_convert::labels_from_list_array(list_arr, i);
181 if !labels.is_empty() {
182 vertex_labels.insert(vid, labels);
183 }
184 }
185
186 let current_entry = vertex_state
187 .entry(vid)
188 .or_insert((Properties::new(), false));
189 let current_version = vertex_versions.entry(vid).or_insert(0);
190
191 if deleted {
198 if version >= *current_version {
199 current_entry.1 = true;
200 current_entry.0.clear(); *current_version = version;
202 }
203 continue;
204 }
205
206 let mut row_props = Properties::new();
209 let mut null_props = Vec::new(); for (name, meta) in label_props {
211 if let Some(col) = batch.column_by_name(name) {
212 if col.is_null(i) {
213 null_props.push(name.clone());
215 } else {
216 let val = crate::runtime::property_manager::PropertyManager::value_from_column(col.as_ref(), &meta.r#type, i)?;
222 row_props.insert(name.clone(), val);
223 }
224 }
225 }
226
227 Self::merge_row_into_state(
228 row_props,
229 null_props,
230 version,
231 current_entry,
232 current_version,
233 &crdt_props,
234 )?;
235 }
236 }
237
238 let mut valid_vertices = Vec::new();
240 let mut valid_versions = Vec::new();
241 let mut valid_deleted = Vec::new(); for (vid, (props, deleted)) in vertex_state {
246 if !deleted {
247 let labels = vertex_labels.remove(&vid).unwrap_or_default();
248 valid_vertices.push((vid, labels, props));
249 valid_versions.push(vertex_versions[&vid]);
250 valid_deleted.push(false);
251 }
252 }
253
254 if !valid_vertices.is_empty() {
255 let batch = dataset.build_record_batch(
256 &valid_vertices,
257 &valid_deleted,
258 &valid_versions,
259 &schema,
260 )?;
261 let lancedb_store = self.storage.lancedb_store();
262 dataset
263 .replace_lancedb(lancedb_store, batch, &schema)
264 .await?;
265 }
266
267 let duration = start.elapsed();
268 let rows_reclaimed = rows_processed as u64 - valid_vertices.len() as u64;
269 metrics::counter!("uni_compaction_rows_reclaimed_total", "type" => "vertex")
270 .increment(rows_reclaimed);
271
272 tracing::Span::current().record("rows_processed", rows_processed);
273 tracing::Span::current().record("duration_ms", duration.as_millis());
274 info!(
275 rows = rows_processed,
276 duration_ms = duration.as_millis(),
277 "Vertex compaction completed"
278 );
279
280 metrics::histogram!("uni_compaction_duration_seconds", "type" => "vertex")
281 .record(duration.as_secs_f64());
282
283 Ok(())
284 }
285
286 fn merge_crdt_values(a: &Value, b: &Value) -> Result<Value> {
287 if a.is_null() {
288 return Ok(b.clone());
289 }
290 if b.is_null() {
291 return Ok(a.clone());
292 }
293 let mut crdt_a: Crdt = serde_json::from_value(a.clone().into())?;
294 let crdt_b: Crdt = serde_json::from_value(b.clone().into())?;
295 crdt_a
296 .try_merge(&crdt_b)
297 .map_err(|e| anyhow::anyhow!("{e}"))?;
298 Ok(Value::from(serde_json::to_value(crdt_a)?))
299 }
300
301 fn merge_row_into_state(
303 row_props: Properties,
304 null_props: Vec<String>,
305 version: u64,
306 current_entry: &mut (Properties, bool),
307 current_version: &mut u64,
308 crdt_props: &HashSet<String>,
309 ) -> Result<()> {
310 if version > *current_version {
311 *current_version = version;
313 current_entry.1 = false;
314
315 for (k, v) in row_props {
316 if crdt_props.contains(&k) {
317 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
318 *existing = Self::merge_crdt_values(existing, &v)?;
319 } else {
320 current_entry.0.insert(k, v);
321 }
322 }
323
324 for null_prop in &null_props {
326 if !crdt_props.contains(null_prop) {
327 current_entry.0.remove(null_prop);
328 }
329 }
330 } else if version == *current_version {
331 current_entry.1 = false;
333 for (k, v) in row_props {
334 if crdt_props.contains(&k) {
335 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
336 *existing = Self::merge_crdt_values(existing, &v)?;
337 } else {
338 current_entry.0.insert(k, v);
339 }
340 }
341 } else {
342 if !current_entry.1 {
344 for (k, v) in row_props {
345 if crdt_props.contains(&k) {
346 let existing = current_entry.0.entry(k.clone()).or_insert(Value::Null);
347 *existing = Self::merge_crdt_values(existing, &v)?;
348 }
349 }
350 }
351 }
352 Ok(())
353 }
354
355 #[instrument(skip(self), fields(delta_count, duration_ms), level = "info")]
356 pub async fn compact_adjacency(
357 &self,
358 edge_type: &str,
359 label: &str,
360 direction: &str,
361 ) -> Result<CompactionInfo> {
362 let start = std::time::Instant::now();
363 let schema = self.storage.schema_manager().schema();
364
365 let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
367 let lancedb_store = self.storage.lancedb_store();
368 let deltas = delta_ds.scan_all_lancedb(lancedb_store, &schema).await?;
369
370 let delta_count = deltas.len();
371 tracing::Span::current().record("delta_count", delta_count);
372
373 if deltas.is_empty() {
374 return Ok(CompactionInfo {
376 edge_type: edge_type.to_string(),
377 direction: direction.to_string(),
378 });
379 }
380
381 let mut delta_map: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
385 for entry in &deltas {
386 let key = if direction == "fwd" {
387 entry.src_vid
388 } else {
389 entry.dst_vid
390 };
391 delta_map.entry(key).or_default().push(entry.clone());
392 }
393
394 for ops in delta_map.values_mut() {
397 ops.sort_by_key(|e| e.version);
398 }
399
400 let adj_ds = self
402 .storage
403 .adjacency_dataset(edge_type, label, direction)?;
404
405 let mut src_vid_builder = UInt64Builder::new();
416 let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
417 let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
418
419 let mut processed_vids = HashSet::new();
420
421 let lancedb_store = self.storage.lancedb_store();
423 if let Ok(table) = adj_ds.open_lancedb(lancedb_store).await {
424 let adj_row_count = table.count_rows(None).await?;
425 crate::storage::delta::check_oom_guard(
426 adj_row_count,
427 self.storage.config.max_compaction_rows,
428 &format!("{}_{}", edge_type, label),
429 direction,
430 )?;
431
432 info!(
433 edge_type = %edge_type,
434 label = %label,
435 direction = %direction,
436 adj_row_count,
437 delta_count,
438 estimated_bytes = adj_row_count * 100 + delta_count * ENTRY_SIZE_ESTIMATE,
439 "Starting adjacency compaction"
440 );
441
442 use lancedb::query::ExecutableQuery;
443 let stream = table.query().execute().await?;
444 let batches: Vec<RecordBatch> = stream.try_collect().await?;
445
446 for batch in batches {
447 let src_col = batch
448 .column_by_name("src_vid")
449 .ok_or(anyhow!("Missing src_vid"))?
450 .as_any()
451 .downcast_ref::<UInt64Array>()
452 .ok_or(anyhow!("Invalid src_vid"))?;
453 let neighbors_col = batch
454 .column_by_name("neighbors")
455 .ok_or(anyhow!("Missing neighbors"))?
456 .as_any()
457 .downcast_ref::<ListArray>()
458 .ok_or(anyhow!("Invalid neighbors"))?;
459 let edge_ids_col = batch
460 .column_by_name("edge_ids")
461 .ok_or(anyhow!("Missing edge_ids"))?
462 .as_any()
463 .downcast_ref::<ListArray>()
464 .ok_or(anyhow!("Invalid edge_ids"))?;
465
466 for i in 0..batch.num_rows() {
467 let vid = Vid::from(src_col.value(i));
468 processed_vids.insert(vid);
469
470 let n_list = neighbors_col.value(i);
472 let n_array = n_list.as_any().downcast_ref::<UInt64Array>().unwrap();
473 let e_list = edge_ids_col.value(i);
474 let e_array = e_list.as_any().downcast_ref::<UInt64Array>().unwrap();
475
476 let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
477 for j in 0..n_array.len() {
478 current_edges
479 .insert(Eid::from(e_array.value(j)), Vid::from(n_array.value(j)));
480 }
481
482 if let Some(ops) = delta_map.get(&vid) {
483 apply_deltas_to_edges(&mut current_edges, ops, direction);
484 }
485
486 append_edges_to_builders(
487 vid,
488 ¤t_edges,
489 &mut src_vid_builder,
490 &mut neighbors_builder,
491 &mut edge_ids_builder,
492 );
493 }
494 }
495 }
496
497 for (vid, ops) in delta_map {
499 if processed_vids.contains(&vid) {
500 continue;
501 }
502
503 let mut current_edges: HashMap<Eid, Vid> = HashMap::new();
504 apply_deltas_to_edges(&mut current_edges, &ops, direction);
505
506 append_edges_to_builders(
507 vid,
508 ¤t_edges,
509 &mut src_vid_builder,
510 &mut neighbors_builder,
511 &mut edge_ids_builder,
512 );
513 }
514
515 if src_vid_builder.len() > 0 {
517 let src_arr = Arc::new(src_vid_builder.finish());
518 let neighbors_arr = Arc::new(neighbors_builder.finish());
519 let edge_ids_arr = Arc::new(edge_ids_builder.finish());
520
521 let schema = adj_ds.get_arrow_schema();
522 let batch = RecordBatch::try_new(schema, vec![src_arr, neighbors_arr, edge_ids_arr])?;
523
524 let lancedb_store = self.storage.lancedb_store();
526 adj_ds.replace_lancedb(lancedb_store, batch).await?;
527 }
528
529 if !deltas.is_empty() {
534 info!(
535 "Clearing Delta L1 for edge_type={} direction={} after compaction (incorporated {} ops)",
536 edge_type,
537 direction,
538 deltas.len()
539 );
540
541 let delta_ds = self.storage.delta_dataset(edge_type, direction)?;
573 let lancedb_store = self.storage.lancedb_store();
574 let delta_schema = delta_ds.get_arrow_schema(&schema)?;
575 let empty_batch = RecordBatch::new_empty(delta_schema);
576 delta_ds.replace_lancedb(lancedb_store, empty_batch).await?;
577 }
578
579 let duration = start.elapsed();
580 tracing::Span::current().record("duration_ms", duration.as_millis());
581 info!(
582 delta_count,
583 duration_ms = duration.as_millis(),
584 "Adjacency compaction completed"
585 );
586
587 metrics::histogram!("uni_compaction_duration_seconds", "type" => "adjacency")
588 .record(duration.as_secs_f64());
589
590 Ok(CompactionInfo {
591 edge_type: edge_type.to_string(),
592 direction: direction.to_string(),
593 })
594 }
595}
596
597fn apply_deltas_to_edges(current_edges: &mut HashMap<Eid, Vid>, ops: &[L1Entry], direction: &str) {
599 for op in ops {
600 match op.op {
601 Op::Insert => {
602 let neighbor = if direction == "fwd" {
603 op.dst_vid
604 } else {
605 op.src_vid
606 };
607 current_edges.insert(op.eid, neighbor);
608 }
609 Op::Delete => {
610 current_edges.remove(&op.eid);
611 }
612 }
613 }
614}
615
616fn append_edges_to_builders(
618 vid: Vid,
619 current_edges: &HashMap<Eid, Vid>,
620 src_vid_builder: &mut UInt64Builder,
621 neighbors_builder: &mut ListBuilder<UInt64Builder>,
622 edge_ids_builder: &mut ListBuilder<UInt64Builder>,
623) {
624 if current_edges.is_empty() {
625 return;
626 }
627 src_vid_builder.append_value(vid.as_u64());
628
629 let mut sorted_eids: Vec<_> = current_edges.keys().cloned().collect();
630 sorted_eids.sort();
631
632 for eid in sorted_eids {
633 let neighbor = current_edges[&eid];
634 neighbors_builder.values().append_value(neighbor.as_u64());
635 edge_ids_builder.values().append_value(eid.as_u64());
636 }
637 neighbors_builder.append(true);
638 edge_ids_builder.append(true);
639}
640
641#[derive(Debug, Clone)]
644pub struct CompactionInfo {
645 pub edge_type: String,
646 pub direction: String,
647}