use crate::coordinate::Coordinate;
use crate::event::{EventKind, StoredEvent};
use crate::store::reader;
use crate::store::segment::{self, Active, FramePayload};
use crate::store::{
Closed, CompactionConfig, CompactionStrategy, Open, Store, StoreDiagnostics, StoreError,
StoreStats, WriterPressure,
};
pub(crate) fn sync(store: &Store<Open>) -> Result<(), StoreError> {
tracing::debug!(target: "batpak::flow", flow = "sync");
let (tx, rx) = flume::bounded(1);
store
.writer
.as_ref()
.expect("open store has writer")
.tx
.send(crate::store::writer::WriterCommand::Sync { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub(crate) fn snapshot(store: &Store<Open>, dest: &std::path::Path) -> Result<(), StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "snapshot",
destination = %dest.display()
);
sync(store)?;
reject_symlink_leaf(dest)?;
std::fs::create_dir_all(dest).map_err(StoreError::Io)?;
let entries = std::fs::read_dir(&store.config.data_dir).map_err(StoreError::Io)?;
for entry in entries.flatten() {
let path = entry.path();
let is_segment = path
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false);
let is_visibility_metadata = path
.file_name()
.map(|name| name == crate::store::visibility_ranges::VISIBILITY_RANGES_FILENAME)
.unwrap_or(false);
if is_segment || is_visibility_metadata {
let dest_path = dest.join(entry.file_name());
reject_symlink_leaf(&dest_path)?;
std::fs::copy(&path, &dest_path).map_err(StoreError::Io)?;
}
}
Ok(())
}
pub(crate) fn compact(
store: &Store<Open>,
config: &CompactionConfig,
) -> Result<segment::CompactionResult, StoreError> {
tracing::debug!(target: "batpak::flow", flow = "compact");
sync(store)?;
let mut all_segments: Vec<(u64, std::path::PathBuf)> =
std::fs::read_dir(&store.config.data_dir)
.map_err(StoreError::Io)?
.filter_map(Result::ok)
.filter_map(|entry| {
let path = entry.path();
let ext_ok = path
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false);
if !ext_ok {
return None;
}
let seg_id = path
.file_stem()
.and_then(|stem| stem.to_str())
.and_then(|stem| stem.parse::<u64>().ok())?;
Some((seg_id, path))
})
.collect();
all_segments.sort_by_key(|(id, _)| *id);
let active_segment_id = all_segments.last().map(|(id, _)| *id).unwrap_or(0);
let mut sealed: Vec<(u64, std::path::PathBuf)> = all_segments
.into_iter()
.filter(|(id, _)| *id < active_segment_id)
.collect();
if sealed.len() < config.min_segments {
return Ok(segment::CompactionResult {
segments_removed: 0,
bytes_reclaimed: 0,
});
}
let merged_id = sealed[0].0;
let merged_path = store
.config
.data_dir
.join(segment::segment_filename(merged_id));
let mut compact_source_path = None;
for (seg_id, _) in &sealed {
store.reader.evict_segment(*seg_id);
}
if let Some((_, source_path)) = sealed.iter_mut().find(|(seg_id, _)| *seg_id == merged_id) {
let temp_source_path = store.config.data_dir.join(format!(
"{merged_id:06}.{}.compact-src",
segment::SEGMENT_EXTENSION
));
let _ = std::fs::remove_file(&temp_source_path);
std::fs::rename(&*source_path, &temp_source_path).map_err(StoreError::Io)?;
*source_path = temp_source_path.clone();
compact_source_path = Some(temp_source_path);
}
let _ = std::fs::remove_file(&merged_path);
let mut merged_segment = segment::Segment::<Active>::create(&store.config.data_dir, merged_id)?;
match &config.strategy {
CompactionStrategy::Merge => {
for (_, path) in &sealed {
merged_segment.append_frames_from_segment(path)?;
}
}
CompactionStrategy::Retention(predicate) => {
let mut all_events: Vec<reader::ScannedEntry> = Vec::new();
for (_, path) in &sealed {
all_events.extend(store.reader.scan_segment(path)?);
}
for entry in all_events {
let coord = Coordinate::new(&entry.entity, &entry.scope)?;
let stored = StoredEvent {
coordinate: coord,
event: entry.event.clone(),
};
if predicate(&stored) {
let frame_payload = FramePayload {
event: entry.event,
entity: entry.entity,
scope: entry.scope,
};
let frame = segment::frame_encode(&frame_payload)?;
merged_segment.write_frame(&frame)?;
}
}
}
CompactionStrategy::Tombstone(predicate) => {
let mut all_events: Vec<reader::ScannedEntry> = Vec::new();
for (_, path) in &sealed {
all_events.extend(store.reader.scan_segment(path)?);
}
let tombstone_kind = EventKind::TOMBSTONE;
for mut entry in all_events {
let coord = Coordinate::new(&entry.entity, &entry.scope)?;
let stored = StoredEvent {
coordinate: coord,
event: entry.event.clone(),
};
if !predicate(&stored) {
entry.event.header.event_kind = tombstone_kind;
}
let frame_payload = FramePayload {
event: entry.event,
entity: entry.entity,
scope: entry.scope,
};
let frame = segment::frame_encode(&frame_payload)?;
merged_segment.write_frame(&frame)?;
}
}
}
merged_segment.sync_with_mode(&store.config.sync.mode)?;
let _sealed_segment = merged_segment.seal();
let mut bytes_reclaimed = 0_u64;
let mut segments_removed = 0_usize;
for (_, path) in &sealed {
if let Ok(meta) = std::fs::metadata(path) {
bytes_reclaimed += meta.len();
}
std::fs::remove_file(path).map_err(StoreError::Io)?;
segments_removed += 1;
}
if let Some(temp_source_path) = compact_source_path {
let _ = std::fs::remove_file(temp_source_path);
}
sync(store)?;
store.index.clear();
crate::store::index_rebuild::rebuild_from_segments(
&store.index,
&store.reader,
&store.config.data_dir,
)?;
crate::store::index_rebuild::restore_cancelled_visibility_ranges(
&store.index,
&store.config.data_dir,
);
if let Err(e) = write_cold_start_artifacts_on_close(store) {
tracing::warn!("post-compaction cold-start artifact write failed: {e}");
}
Ok(segment::CompactionResult {
segments_removed,
bytes_reclaimed,
})
}
pub(crate) fn close(mut store: Store<Open>) -> Result<Closed, StoreError> {
tracing::debug!(target: "batpak::flow", flow = "close");
let (tx, rx) = flume::bounded(1);
store
.writer
.as_ref()
.expect("open store has writer")
.tx
.send(crate::store::writer::WriterCommand::Shutdown { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
let result = rx.recv().map_err(|_| StoreError::WriterCrashed)?;
result?;
write_cold_start_artifacts_on_close(&store)?;
store.should_shutdown_on_drop = false;
Ok(Closed)
}
fn write_cold_start_artifacts_on_close(store: &Store<Open>) -> Result<(), StoreError> {
let (seg_id, offset) = find_latest_segment_watermark(&store.config.data_dir)?;
if store.config.index.enable_checkpoint {
crate::store::checkpoint::write_checkpoint(
&store.index,
&store.config.data_dir,
seg_id,
offset,
)?;
}
if store.config.index.enable_mmap_index {
crate::store::mmap_index::write_mmap_index(
&store.index,
&store.config.data_dir,
seg_id,
offset,
)?;
}
Ok(())
}
fn find_latest_segment_watermark(data_dir: &std::path::Path) -> Result<(u64, u64), StoreError> {
let mut max: Option<(u64, std::path::PathBuf)> = None;
for entry in std::fs::read_dir(data_dir).map_err(StoreError::Io)? {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
let ext_ok = path
.extension()
.map(|ext| ext == crate::store::segment::SEGMENT_EXTENSION)
.unwrap_or(false);
if !ext_ok {
continue;
}
if let Some(id) = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
{
if max.as_ref().map(|(m, _)| id > *m).unwrap_or(true) {
max = Some((id, path));
}
}
}
match max {
Some((id, path)) => {
let offset = std::fs::metadata(&path).map_err(StoreError::Io)?.len();
Ok((id, offset))
}
None => Ok((0, 0)),
}
}
pub(crate) fn stats<State>(store: &Store<State>) -> StoreStats {
StoreStats {
event_count: store.index.len(),
global_sequence: store.index.global_sequence(),
}
}
pub(crate) fn diagnostics<State>(store: &Store<State>) -> StoreDiagnostics {
StoreDiagnostics {
event_count: store.index.len(),
global_sequence: store.index.global_sequence(),
visible_sequence: store.index.visible_sequence(),
data_dir: store.config.data_dir.clone(),
segment_max_bytes: store.config.segment_max_bytes,
fd_budget: store.config.fd_budget,
restart_policy: store.config.writer.restart_policy.clone(),
writer_pressure: store
.writer
.as_ref()
.map(|writer| WriterPressure {
queue_len: writer.tx.len(),
capacity: store.config.writer.channel_capacity,
})
.unwrap_or(WriterPressure {
queue_len: 0,
capacity: 0,
}),
index_layout: store.index.layout_name(),
tile_count: store.index.tile_count(),
}
}
fn reject_symlink_leaf(path: &std::path::Path) -> Result<(), StoreError> {
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_symlink() => Err(StoreError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("refusing to operate on symlink path {}", path.display()),
))),
Ok(_) | Err(_) => Ok(()),
}
}