dbx_core/engine/compaction.rs
1//! Compaction module — background data movement between storage tiers.
2//!
3//! Implements strategies for flushing and compacting data, including
4//! the WOS bypass strategy for Columnar Delta.
5
6use crate::engine::{Database, DeltaVariant};
7use crate::error::DbxResult;
8use crate::storage::parquet_io::ParquetWriter;
9use std::path::Path;
10
11pub struct Compactor;
12
13impl Compactor {
14 /// Flush ColumnarDelta directly to Parquet (Tier 5), bypassing WOS (Tier 3).
15 ///
16 /// This implementation:
17 /// 1. Drains batches from ColumnarDelta for the given table.
18 /// 2. Merges them into a single RecordBatch.
19 /// 3. Writes the batch to a new Parquet file in the ROS directory.
20 pub fn bypass_flush(db: &Database, table: &str) -> DbxResult<()> {
21 if let DeltaVariant::Columnar(delta) = &db.delta {
22 // 1. Drain batches
23 let versioned_batches = delta.drain_table(table);
24 if versioned_batches.is_empty() {
25 return Ok(());
26 }
27
28 // 2. Merge batches
29 use crate::storage::kv_adapter::merge_batches;
30 let batch_refs: Vec<_> = versioned_batches
31 .iter()
32 .map(|vb| std::sync::Arc::clone(&vb.data))
33 .collect();
34 let merged_batch = merge_batches(batch_refs)?;
35
36 // 3. Generate path and write Parquet
37 // We use a timestamp-based filename for uniqueness.
38 let timestamp = std::time::SystemTime::now()
39 .duration_since(std::time::UNIX_EPOCH)
40 .unwrap_or_default()
41 .as_nanos();
42
43 // For now, use a fixed data/ros directory relative to current working dir.
44 // In a production implementation, this path would be part of Database config.
45 let ros_dir = Path::new("data").join("ros").join(table);
46 if let Err(e) = std::fs::create_dir_all(&ros_dir) {
47 return Err(crate::error::DbxError::Storage(format!(
48 "Failed to create ROS directory: {}",
49 e
50 )));
51 }
52
53 let file_path = ros_dir.join(format!("{}.parquet", timestamp));
54
55 ParquetWriter::write(&file_path, &merged_batch)?;
56 }
57 Ok(())
58 }
59}