use crate::store::file_classification::StoreFileKind;
use crate::store::platform;
use crate::store::platform::fs::StoreFs;
use crate::store::segment;
use crate::store::StoreError;
use std::path::{Path, PathBuf};
pub(crate) const COMPACTION_MARKER_FILENAME: &str = "compaction.pending.json";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct PendingCompaction {
pub(crate) merged_id: u64,
pub(crate) source_segment_ids: Vec<u64>,
}
fn pending_compaction_path(data_dir: &Path) -> PathBuf {
data_dir.join(COMPACTION_MARKER_FILENAME)
}
pub(super) fn compaction_source_temp_path(data_dir: &Path, merged_id: u64) -> PathBuf {
data_dir.join(format!(
"{merged_id:06}.{}.compact-src",
segment::SEGMENT_EXTENSION
))
}
pub(super) fn load_pending_compaction(
data_dir: &Path,
) -> Result<Option<PendingCompaction>, StoreError> {
let path = pending_compaction_path(data_dir);
if !path.exists() {
return Ok(None);
}
let bytes = platform::fs::read(&path).map_err(StoreError::Io)?;
let marker = serde_json::from_slice::<PendingCompaction>(&bytes)
.map_err(|_| StoreError::DataDirMalformed { path: path.clone() })?;
Ok(Some(marker))
}
pub(crate) fn write_pending_compaction(
data_dir: &Path,
merged_id: u64,
source_segment_ids: &[u64],
fs: &dyn StoreFs,
) -> Result<(), StoreError> {
let marker = PendingCompaction {
merged_id,
source_segment_ids: source_segment_ids.to_vec(),
};
let final_path = pending_compaction_path(data_dir);
crate::store::platform::fs::write_file_atomically_with_fs(
data_dir,
&final_path,
"compaction marker",
|file| {
serde_json::to_writer(file, &marker).map_err(|e| StoreError::Serialization(Box::new(e)))
},
fs,
)
}
pub(crate) fn clear_pending_compaction(
data_dir: &Path,
fs: &dyn StoreFs,
) -> Result<(), StoreError> {
let path = pending_compaction_path(data_dir);
match fs.remove_file(&path) {
Ok(()) => {
crate::store::platform::sync::sync_parent_dir(&path)?;
Ok(())
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(StoreError::Io(err)),
}
}
pub(super) fn segment_paths(data_dir: &Path) -> Result<Vec<(u64, PathBuf)>, StoreError> {
let mut entries = Vec::new();
for entry in platform::fs::read_dir(data_dir).map_err(StoreError::Io)? {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
let kind = StoreFileKind::from_path(&path);
if let StoreFileKind::MalformedSegment(error) = &kind {
tracing::warn!(
path = %path.display(),
%error,
"skipping malformed segment filename"
);
continue;
}
let Some(segment_id) = kind.segment_id() else {
continue;
};
entries.push((segment_id.as_u64(), path));
}
if let Some(marker) = load_pending_compaction(data_dir)? {
let merged_present = entries
.iter()
.any(|(segment_id, _)| *segment_id == marker.merged_id);
let temp_source_path = compaction_source_temp_path(data_dir, marker.merged_id);
let temp_source_exists = temp_source_path.exists();
let stale_finalized_marker = merged_present
&& !temp_source_exists
&& marker
.source_segment_ids
.iter()
.filter(|&&segment_id| segment_id != marker.merged_id)
.all(|segment_id| !entries.iter().any(|(id, _)| id == segment_id));
if !stale_finalized_marker {
if merged_present {
entries.retain(|(segment_id, _)| {
*segment_id == marker.merged_id
|| !marker
.source_segment_ids
.iter()
.any(|source_id| source_id == segment_id)
});
} else {
if temp_source_exists {
entries.retain(|(segment_id, _)| *segment_id != marker.merged_id);
entries.push((marker.merged_id, temp_source_path));
}
for source_id in marker
.source_segment_ids
.iter()
.copied()
.filter(|source_id| *source_id != marker.merged_id)
{
if !entries
.iter()
.any(|(segment_id, _)| *segment_id == source_id)
{
return Err(StoreError::DataDirMalformed {
path: pending_compaction_path(data_dir),
});
}
}
}
}
}
entries.sort_by_key(|(segment_id, _)| *segment_id);
Ok(entries)
}
#[cfg(test)]
mod tests {
use super::{
clear_pending_compaction, pending_compaction_path, segment_paths, write_pending_compaction,
};
use crate::store::platform;
use crate::store::platform::fs::RealFs;
use crate::store::segment;
use crate::store::StoreError;
#[test]
fn clear_pending_compaction_propagates_non_not_found_errors() {
let dir = tempfile::tempdir().expect("tempdir");
let marker = pending_compaction_path(dir.path());
std::fs::create_dir(&marker).expect("create directory at marker path");
let result = clear_pending_compaction(dir.path(), &RealFs);
assert!(
matches!(result, Err(StoreError::Io(_))),
"a non-NotFound removal failure must propagate as Err(Io), not be \
swallowed by the NotFound guard; got {result:?}"
);
}
#[test]
fn clear_pending_compaction_is_ok_when_marker_is_absent() {
let dir = tempfile::tempdir().expect("tempdir");
clear_pending_compaction(dir.path(), &RealFs).expect("absent marker clears cleanly");
}
#[test]
fn segment_paths_rejects_a_marker_whose_non_merged_source_is_missing() {
let dir = tempfile::tempdir().expect("tempdir");
drop(
platform::fs::create_new_file(&dir.path().join(segment::segment_filename(1)))
.expect("create segment 1"),
);
write_pending_compaction(dir.path(), 7, &[1, 3], &RealFs).expect("write pending marker");
let result = segment_paths(dir.path());
assert!(
matches!(
&result,
Err(StoreError::DataDirMalformed { path }) if *path == pending_compaction_path(dir.path())
),
"PROPERTY: a pending-compaction marker naming a missing non-merged source \
must fail recovery with DataDirMalformed at the marker path; got {result:?}"
);
}
#[test]
fn segment_paths_recovers_exactly_the_source_set_when_merged_is_absent() {
let dir = tempfile::tempdir().expect("tempdir");
let path_one = dir.path().join(segment::segment_filename(1));
let path_two = dir.path().join(segment::segment_filename(2));
drop(platform::fs::create_new_file(&path_one).expect("create segment 1"));
drop(platform::fs::create_new_file(&path_two).expect("create segment 2"));
write_pending_compaction(dir.path(), 7, &[1, 2], &RealFs).expect("write pending marker");
let entries = segment_paths(dir.path()).expect("recover with all sources present");
assert_eq!(
entries,
vec![(1, path_one), (2, path_two)],
"PROPERTY: with the merged segment absent and every source present, \
recovery yields exactly the source path set in id order"
);
}
}