dbx_core/engine/
compaction.rs1use crate::engine::{Database, DeltaVariant};
7use crate::error::DbxResult;
8use crate::storage::parquet_io::ParquetWriter;
9use rayon::prelude::*;
10use std::path::Path;
11
12pub struct Compactor;
13
14impl Compactor {
15 pub fn bypass_flush(db: &Database, table: &str) -> DbxResult<()> {
24 if let DeltaVariant::Columnar(delta) = &db.delta {
25 let versioned_batches = delta.drain_table(table);
27 if versioned_batches.is_empty() {
28 return Ok(());
29 }
30
31 use crate::storage::kv_adapter::merge_batches;
33 let batch_refs: Vec<_> = versioned_batches
34 .par_iter()
35 .map(|vb| std::sync::Arc::clone(&vb.data))
36 .collect();
37 let merged_batch = merge_batches(batch_refs)?;
38
39 let timestamp = std::time::SystemTime::now()
41 .duration_since(std::time::UNIX_EPOCH)
42 .unwrap_or_default()
43 .as_nanos();
44
45 let ros_dir = Path::new("data").join("ros").join(table);
46 if let Err(e) = std::fs::create_dir_all(&ros_dir) {
47 return Err(crate::error::DbxError::Storage(format!(
48 "Failed to create ROS directory: {}",
49 e
50 )));
51 }
52
53 let file_path = ros_dir.join(format!("{}.parquet", timestamp));
54
55 ParquetWriter::write(&file_path, &merged_batch)?;
56 }
57 Ok(())
58 }
59
60 pub fn bypass_flush_tables(db: &Database, tables: &[&str]) -> Vec<DbxResult<()>> {
64 tables
65 .par_iter()
66 .map(|&table| Self::bypass_flush(db, table))
67 .collect()
68 }
69}