Skip to main content

dbx_core/engine/
compaction.rs

1//! Compaction module - background data movement between storage tiers.
2//!
3//! Implements strategies for flushing and compacting data, including
4//! the WOS bypass strategy for Columnar Delta.
5
6use crate::engine::{Database, DeltaVariant};
7use crate::error::DbxResult;
8use crate::storage::parquet_io::ParquetWriter;
9use rayon::prelude::*;
10use std::path::Path;
11
12pub struct Compactor;
13
14impl Compactor {
15    /// Flush ColumnarDelta directly to Parquet (Tier 5), bypassing WOS (Tier 3).
16    ///
17    /// P9: `batch_refs` 수집을 `par_iter()`로 병렬화합니다.
18    ///
19    /// This implementation:
20    /// 1. Drains batches from ColumnarDelta for the given table.
21    /// 2. Merges them into a single RecordBatch.
22    /// 3. Writes the batch to a new Parquet file in the ROS directory.
23    pub fn bypass_flush(db: &Database, table: &str) -> DbxResult<()> {
24        if let DeltaVariant::Columnar(delta) = &db.delta {
25            // 1. Drain batches
26            let versioned_batches = delta.drain_table(table);
27            if versioned_batches.is_empty() {
28                return Ok(());
29            }
30
31            // 2. P9: batch Arc::clone을 par_iter()로 병렬 수집
32            use crate::storage::kv_adapter::merge_batches;
33            let batch_refs: Vec<_> = versioned_batches
34                .par_iter()
35                .map(|vb| std::sync::Arc::clone(&vb.data))
36                .collect();
37            let merged_batch = merge_batches(batch_refs)?;
38
39            // 3. Generate path and write Parquet
40            let timestamp = std::time::SystemTime::now()
41                .duration_since(std::time::UNIX_EPOCH)
42                .unwrap_or_default()
43                .as_nanos();
44
45            let ros_dir = Path::new("data").join("ros").join(table);
46            if let Err(e) = std::fs::create_dir_all(&ros_dir) {
47                return Err(crate::error::DbxError::Storage(format!(
48                    "Failed to create ROS directory: {}",
49                    e
50                )));
51            }
52
53            let file_path = ros_dir.join(format!("{}.parquet", timestamp));
54
55            ParquetWriter::write(&file_path, &merged_batch)?;
56        }
57        Ok(())
58    }
59
60    /// 여러 테이블을 병렬로 bypass_flush합니다. (P9 확장)
61    ///
62    /// 각 테이블이 독립적이므로 Rayon work-stealing으로 동시에 처리합니다.
63    pub fn bypass_flush_tables(db: &Database, tables: &[&str]) -> Vec<DbxResult<()>> {
64        tables
65            .par_iter()
66            .map(|&table| Self::bypass_flush(db, table))
67            .collect()
68    }
69}