mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::marker::PhantomData;
use std::path::Path;
use std::sync::{Arc, mpsc};
use std::thread::{self, JoinHandle};
use std::time::{SystemTime, UNIX_EPOCH};

use parking_lot::Mutex;

use crate::storage::manifest::{CompactionMetadata, SegmentTier};
use crate::storage::segment::{CompressionCodec, SegmentMetadata, SegmentReader, SegmentWriter};
use crate::storage::{Manifest, Result, Segment, StorageError};

/// Description of a background compaction request.
#[derive(Clone, Debug)]
pub struct CompactionJob {
    pub job_id: String,
    pub input_segments: Vec<SegmentMetadata>,
    pub target_tier: SegmentTier,
    pub compression: CompressionCodec,
}

enum Command {
    Run(CompactionJob),
    Shutdown,
}

/// Simple background worker that rewrites immutable segments into a single output.
pub struct Compactor<M: Manifest + Send + 'static> {
    tx: Option<mpsc::Sender<Command>>,
    handle: Option<JoinHandle<()>>,
    _marker: PhantomData<M>,
}

impl<M: Manifest + Send + 'static> Compactor<M> {
    pub fn spawn(manifest: Arc<Mutex<M>>, output_dir: impl AsRef<Path>) -> Result<Self> {
        let output_dir = output_dir.as_ref().to_path_buf();
        if !output_dir.exists() {
            std::fs::create_dir_all(&output_dir)?;
        }
        let (tx, rx) = mpsc::channel::<Command>();

        let handle = thread::spawn(move || {
            while let Ok(command) = rx.recv() {
                match command {
                    Command::Run(job) => {
                        if let Err(err) = process_job(&output_dir, manifest.clone(), job) {
                            eprintln!("compaction job failed: {err}");
                        }
                    }
                    Command::Shutdown => break,
                }
            }
        });

        Ok(Self {
            tx: Some(tx),
            handle: Some(handle),
            _marker: PhantomData,
        })
    }

    pub fn submit(&self, job: CompactionJob) -> Result<()> {
        if let Some(tx) = &self.tx {
            tx.send(Command::Run(job))
                .map_err(|err| StorageError::InvalidFormat(err.to_string()))
        } else {
            Err(StorageError::InvalidFormat("compactor stopped".into()))
        }
    }
}

impl<M: Manifest + Send + 'static> Drop for Compactor<M> {
    fn drop(&mut self) {
        if let Some(tx) = self.tx.take() {
            let _ = tx.send(Command::Shutdown);
        }
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }
}

fn process_job<M: Manifest + Send + 'static>(
    output_dir: &Path,
    manifest: Arc<Mutex<M>>,
    job: CompactionJob,
) -> Result<()> {
    let started_at = now_millis();
    let mut bytes_rewritten = 0u64;

    let output_path = output_dir.join(format!("{}-compact.segment", job.job_id));
    let mut writer = SegmentWriter::new(output_path, job.compression.clone());

    for segment in &job.input_segments {
        let reader = SegmentReader::open(&segment.path)?;
        writer.append_block(reader.as_bytes());
        bytes_rewritten += segment.size_bytes;
    }

    let file_segment = writer.finish()?;
    let completed_at = now_millis();
    let output_metadata = file_segment.metadata().clone();
    let retired: Vec<String> = job
        .input_segments
        .iter()
        .map(|seg| seg.id.clone())
        .collect();

    let compaction_metadata = CompactionMetadata {
        job_id: job.job_id,
        inputs: retired.clone(),
        output_id: output_metadata.id.clone(),
        output_digest: output_metadata.sha256.clone(),
        target_tier: job.target_tier,
        started_at_ms: started_at,
        completed_at_ms: completed_at,
        bytes_rewritten,
        retired: retired.clone(),
    };

    let mut guard = manifest.lock();
    guard.record_compaction(
        job.target_tier,
        output_metadata,
        retired.clone(),
        compaction_metadata,
    )?;
    guard.persist()?;
    Ok(())
}

fn now_millis() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis()
}