use crate::core::reactor::Reactor;
use crate::features::storage::wal;
use super::*;
pub(super) fn load_existing_l0_runs(
sstable_dir: &std::path::Path,
reactor: &dyn Reactor,
) -> Result<(
Vec<std::path::PathBuf>,
std::collections::HashMap<std::path::PathBuf, sstable::Sstable>,
)> {
let mut l0_runs = Vec::new();
let mut cache = std::collections::HashMap::new();
let mut entries = reactor.read_dir(sstable_dir)?;
entries.sort();
for path in entries {
let name = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};
if !name.starts_with("ir.l0.") || !name.ends_with(".sst") {
continue;
}
let table = sstable::read_sstable_with_reactor(&path, reactor)?;
l0_runs.push(path.clone());
cache.insert(path, table);
}
Ok((l0_runs, cache))
}
pub(super) fn parse_run_id(path: &std::path::Path) -> Option<u64> {
let name = path.file_name()?.to_str()?;
let trimmed = name.strip_prefix("ir.l0.")?.strip_suffix(".sst")?;
trimmed.parse::<u64>().ok()
}
pub(super) fn compaction_marker_path(sstable_dir: &std::path::Path) -> std::path::PathBuf {
sstable_dir.join("ir.compaction.inprogress")
}
pub(super) fn recover_compaction_artifacts(
sstable_dir: &std::path::Path,
reactor: &dyn Reactor,
) -> Result<()> {
let marker_path = compaction_marker_path(sstable_dir);
let marker_present = reactor.metadata_len(&marker_path).is_ok();
if !marker_present {
return Ok(());
}
let entries = reactor.read_dir(sstable_dir)?;
for path in entries {
let Some(name) = path.file_name().and_then(|v| v.to_str()) else {
continue;
};
if name.ends_with(".tmp") {
std::fs::remove_file(path).ok();
}
}
std::fs::remove_file(marker_path).ok();
Ok(())
}
pub(crate) struct DataDirLock {
path: std::path::PathBuf,
_file: std::fs::File,
}
impl DataDirLock {
pub(super) fn acquire(data_root: &std::path::Path) -> Result<Self> {
std::fs::create_dir_all(data_root)?;
let path = data_root.join("ir.lock");
let mut file = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&path)
.map_err(|err| {
if err.kind() == std::io::ErrorKind::AlreadyExists {
StorageError::InvalidInput(format!(
"data directory is already locked: {}",
data_root.display()
))
} else {
StorageError::Io(err)
}
})?;
use std::io::Write;
let _ = writeln!(file, "pid={}", std::process::id());
Ok(Self { path, _file: file })
}
}
impl Drop for DataDirLock {
fn drop(&mut self) {
std::fs::remove_file(&self.path).ok();
}
}
pub(super) fn infer_data_root(
wal_dir: &std::path::Path,
sstable_dir: &std::path::Path,
manifest_path: &std::path::Path,
) -> Option<std::path::PathBuf> {
let wal_parent = wal_dir.parent()?;
let sst_parent = sstable_dir.parent()?;
let manifest_parent = manifest_path.parent()?;
if wal_parent == sst_parent && wal_parent == manifest_parent {
return Some(wal_parent.to_path_buf());
}
None
}
pub(super) fn wal_sync_policy_from_env() -> wal::WalSyncPolicy {
let mode = std::env::var("IR_WAL_SYNC_POLICY")
.ok()
.unwrap_or_else(|| "interval".to_string())
.to_ascii_lowercase();
match mode.as_str() {
"always" => wal::WalSyncPolicy::Always,
"manual" => wal::WalSyncPolicy::Manual,
"interval" => {
let millis = std::env::var("IR_WAL_SYNC_INTERVAL_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1000);
if millis == 0 {
wal::WalSyncPolicy::Always
} else {
wal::WalSyncPolicy::Interval(std::time::Duration::from_millis(millis))
}
}
_ => wal::WalSyncPolicy::Interval(std::time::Duration::from_millis(1000)),
}
}