sombra 0.3.6

High-performance graph database with ACID transactions, single-file storage, and bindings for Rust, TypeScript, and Python
Documentation
use std::path::{Path, PathBuf};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use crate::db::metrics::PerformanceMetrics;
use crate::error::Result;
use crate::pager::Pager;
use crate::storage::heap::RecordStore;

pub enum CompactionMessage {
    Trigger,
    Shutdown,
}

pub struct CompactionConfig {
    pub enabled: bool,
    pub interval_secs: Option<u64>,
    pub threshold_percent: u8,
    pub batch_size: usize,
}

impl Default for CompactionConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            interval_secs: Some(300),
            threshold_percent: 50,
            batch_size: 100,
        }
    }
}

pub struct CompactionState {
    pub sender: Sender<CompactionMessage>,
    pub _compaction_thread: Option<thread::JoinHandle<()>>,
}

impl CompactionState {
    pub fn spawn(
        db_path: PathBuf,
        config: CompactionConfig,
        metrics: Arc<Mutex<PerformanceMetrics>>,
    ) -> Result<Arc<Mutex<Self>>> {
        if !config.enabled {
            let (sender, _receiver) = mpsc::channel();
            return Ok(Arc::new(Mutex::new(CompactionState {
                sender,
                _compaction_thread: None,
            })));
        }

        let (sender, receiver) = mpsc::channel();

        let compaction_thread = thread::spawn(move || {
            Self::compaction_loop(db_path, receiver, config, metrics);
        });

        Ok(Arc::new(Mutex::new(CompactionState {
            sender,
            _compaction_thread: Some(compaction_thread),
        })))
    }

    pub fn trigger_compaction(&self) -> Result<()> {
        self.sender
            .send(CompactionMessage::Trigger)
            .map_err(|_| crate::error::GraphError::Corruption("compaction channel closed".into()))
    }

    pub fn shutdown(&self) -> Result<()> {
        self.sender
            .send(CompactionMessage::Shutdown)
            .map_err(|_| crate::error::GraphError::Corruption("compaction channel closed".into()))
    }

    fn compaction_loop(
        db_path: PathBuf,
        receiver: Receiver<CompactionMessage>,
        config: CompactionConfig,
        metrics: Arc<Mutex<PerformanceMetrics>>,
    ) {
        let interval = Duration::from_secs(config.interval_secs.unwrap_or(300));

        loop {
            match receiver.recv_timeout(interval) {
                Ok(CompactionMessage::Trigger) => {
                    let _ = Self::perform_compaction(db_path.as_path(), &config, &metrics);
                }
                Ok(CompactionMessage::Shutdown) => {
                    break;
                }
                Err(mpsc::RecvTimeoutError::Timeout) => {
                    let _ = Self::perform_compaction(db_path.as_path(), &config, &metrics);
                }
                Err(mpsc::RecvTimeoutError::Disconnected) => {
                    break;
                }
            }
        }
    }

    fn perform_compaction(
        db_path: &Path,
        config: &CompactionConfig,
        metrics: &Arc<Mutex<PerformanceMetrics>>,
    ) -> Result<()> {
        let mut pager = Pager::open(db_path)?;
        let mut store = RecordStore::new(&mut pager);

        let candidates =
            store.identify_compaction_candidates(config.threshold_percent, config.batch_size)?;

        if candidates.is_empty() {
            return Ok(());
        }

        let mut total_bytes_reclaimed = 0;
        let mut pages_compacted = 0;

        for page_id in candidates {
            match store.compact_page(page_id) {
                Ok(bytes_reclaimed) => {
                    total_bytes_reclaimed += bytes_reclaimed;
                    pages_compacted += 1;
                }
                Err(_) => {
                    continue;
                }
            }
        }

        pager.flush()?;

        if let Ok(mut m) = metrics.lock() {
            m.compactions_performed += 1;
            m.pages_compacted += pages_compacted;
            m.bytes_reclaimed += total_bytes_reclaimed as u64;
        }

        Ok(())
    }
}