#![allow(dead_code)]
use std::marker::PhantomData;
use std::path::Path;
use std::sync::{Arc, mpsc};
use std::thread::{self, JoinHandle};
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::Mutex;
use crate::storage::manifest::{CompactionMetadata, SegmentTier};
use crate::storage::segment::{CompressionCodec, SegmentMetadata, SegmentReader, SegmentWriter};
use crate::storage::{Manifest, Result, Segment, StorageError};
#[derive(Clone, Debug)]
pub struct CompactionJob {
pub job_id: String,
pub input_segments: Vec<SegmentMetadata>,
pub target_tier: SegmentTier,
pub compression: CompressionCodec,
}
enum Command {
Run(CompactionJob),
Shutdown,
}
pub struct Compactor<M: Manifest + Send + 'static> {
tx: Option<mpsc::Sender<Command>>,
handle: Option<JoinHandle<()>>,
_marker: PhantomData<M>,
}
impl<M: Manifest + Send + 'static> Compactor<M> {
pub fn spawn(manifest: Arc<Mutex<M>>, output_dir: impl AsRef<Path>) -> Result<Self> {
let output_dir = output_dir.as_ref().to_path_buf();
if !output_dir.exists() {
std::fs::create_dir_all(&output_dir)?;
}
let (tx, rx) = mpsc::channel::<Command>();
let handle = thread::spawn(move || {
while let Ok(command) = rx.recv() {
match command {
Command::Run(job) => {
if let Err(err) = process_job(&output_dir, manifest.clone(), job) {
eprintln!("compaction job failed: {err}");
}
}
Command::Shutdown => break,
}
}
});
Ok(Self {
tx: Some(tx),
handle: Some(handle),
_marker: PhantomData,
})
}
pub fn submit(&self, job: CompactionJob) -> Result<()> {
if let Some(tx) = &self.tx {
tx.send(Command::Run(job))
.map_err(|err| StorageError::InvalidFormat(err.to_string()))
} else {
Err(StorageError::InvalidFormat("compactor stopped".into()))
}
}
}
impl<M: Manifest + Send + 'static> Drop for Compactor<M> {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(Command::Shutdown);
}
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn process_job<M: Manifest + Send + 'static>(
output_dir: &Path,
manifest: Arc<Mutex<M>>,
job: CompactionJob,
) -> Result<()> {
let started_at = now_millis();
let mut bytes_rewritten = 0u64;
let output_path = output_dir.join(format!("{}-compact.segment", job.job_id));
let mut writer = SegmentWriter::new(output_path, job.compression.clone());
for segment in &job.input_segments {
let reader = SegmentReader::open(&segment.path)?;
writer.append_block(reader.as_bytes());
bytes_rewritten += segment.size_bytes;
}
let file_segment = writer.finish()?;
let completed_at = now_millis();
let output_metadata = file_segment.metadata().clone();
let retired: Vec<String> = job
.input_segments
.iter()
.map(|seg| seg.id.clone())
.collect();
let compaction_metadata = CompactionMetadata {
job_id: job.job_id,
inputs: retired.clone(),
output_id: output_metadata.id.clone(),
output_digest: output_metadata.sha256.clone(),
target_tier: job.target_tier,
started_at_ms: started_at,
completed_at_ms: completed_at,
bytes_rewritten,
retired: retired.clone(),
};
let mut guard = manifest.lock();
guard.record_compaction(
job.target_tier,
output_metadata,
retired.clone(),
compaction_metadata,
)?;
guard.persist()?;
Ok(())
}
fn now_millis() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
}