iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use crate::core::reactor::Reactor;
use crate::features::storage::wal;

use super::*;

pub(super) fn load_existing_l0_runs(
    sstable_dir: &std::path::Path,
    reactor: &dyn Reactor,
) -> Result<(
    Vec<std::path::PathBuf>,
    std::collections::HashMap<std::path::PathBuf, sstable::Sstable>,
)> {
    let mut l0_runs = Vec::new();
    let mut cache = std::collections::HashMap::new();
    let mut entries = reactor.read_dir(sstable_dir)?;
    entries.sort();

    for path in entries {
        let name = match path.file_name().and_then(|n| n.to_str()) {
            Some(name) => name,
            None => continue,
        };
        if !name.starts_with("ir.l0.") || !name.ends_with(".sst") {
            continue;
        }
        let table = sstable::read_sstable_with_reactor(&path, reactor)?;
        l0_runs.push(path.clone());
        cache.insert(path, table);
    }
    Ok((l0_runs, cache))
}

pub(super) fn parse_run_id(path: &std::path::Path) -> Option<u64> {
    let name = path.file_name()?.to_str()?;
    let trimmed = name.strip_prefix("ir.l0.")?.strip_suffix(".sst")?;
    trimmed.parse::<u64>().ok()
}

pub(super) fn compaction_marker_path(sstable_dir: &std::path::Path) -> std::path::PathBuf {
    sstable_dir.join("ir.compaction.inprogress")
}

pub(super) fn recover_compaction_artifacts(
    sstable_dir: &std::path::Path,
    reactor: &dyn Reactor,
) -> Result<()> {
    let marker_path = compaction_marker_path(sstable_dir);
    let marker_present = reactor.metadata_len(&marker_path).is_ok();
    if !marker_present {
        return Ok(());
    }

    let entries = reactor.read_dir(sstable_dir)?;
    for path in entries {
        let Some(name) = path.file_name().and_then(|v| v.to_str()) else {
            continue;
        };
        if name.ends_with(".tmp") {
            std::fs::remove_file(path).ok();
        }
    }
    std::fs::remove_file(marker_path).ok();
    Ok(())
}

pub(crate) struct DataDirLock {
    path: std::path::PathBuf,
    _file: std::fs::File,
}

impl DataDirLock {
    pub(super) fn acquire(data_root: &std::path::Path) -> Result<Self> {
        std::fs::create_dir_all(data_root)?;
        let path = data_root.join("ir.lock");
        let mut file = std::fs::OpenOptions::new()
            .create_new(true)
            .write(true)
            .open(&path)
            .map_err(|err| {
                if err.kind() == std::io::ErrorKind::AlreadyExists {
                    StorageError::InvalidInput(format!(
                        "data directory is already locked: {}",
                        data_root.display()
                    ))
                } else {
                    StorageError::Io(err)
                }
            })?;
        use std::io::Write;
        let _ = writeln!(file, "pid={}", std::process::id());
        Ok(Self { path, _file: file })
    }
}

impl Drop for DataDirLock {
    fn drop(&mut self) {
        std::fs::remove_file(&self.path).ok();
    }
}

pub(super) fn infer_data_root(
    wal_dir: &std::path::Path,
    sstable_dir: &std::path::Path,
    manifest_path: &std::path::Path,
) -> Option<std::path::PathBuf> {
    let wal_parent = wal_dir.parent()?;
    let sst_parent = sstable_dir.parent()?;
    let manifest_parent = manifest_path.parent()?;
    if wal_parent == sst_parent && wal_parent == manifest_parent {
        return Some(wal_parent.to_path_buf());
    }
    None
}

pub(super) fn wal_sync_policy_from_env() -> wal::WalSyncPolicy {
    let mode = std::env::var("IR_WAL_SYNC_POLICY")
        .ok()
        .unwrap_or_else(|| "interval".to_string())
        .to_ascii_lowercase();
    match mode.as_str() {
        "always" => wal::WalSyncPolicy::Always,
        "manual" => wal::WalSyncPolicy::Manual,
        "interval" => {
            let millis = std::env::var("IR_WAL_SYNC_INTERVAL_MS")
                .ok()
                .and_then(|v| v.parse::<u64>().ok())
                .unwrap_or(1000);
            if millis == 0 {
                wal::WalSyncPolicy::Always
            } else {
                wal::WalSyncPolicy::Interval(std::time::Duration::from_millis(millis))
            }
        }
        _ => wal::WalSyncPolicy::Interval(std::time::Duration::from_millis(1000)),
    }
}