use crate::coordinate::Coordinate;
use crate::event::{EventKind, StoredEvent};
use crate::store::cold_start::{latest_segment_watermark, ColdStartArtifactKind};
use crate::store::segment::scan as reader;
use crate::store::segment::{self, Active, FramePayload};
use crate::store::write::control::AppendSubmission;
use crate::store::{
AppendOptions, Closed, CompactionConfig, CompactionStrategy, Open, Store, StoreDiagnostics,
StoreError, StoreStats, WriterPressure,
};
use serde::Serialize;
#[derive(Serialize)]
struct CloseLifecyclePayload {
wall_ms: u64,
global_sequence: u64,
}
fn append_close_completed_event(store: &Store<Open>) -> Result<(), StoreError> {
let close_hlc = store.watermark_handle.lock().snapshot().visible_hlc;
let coord = Coordinate::new("batpak:store", "batpak:lifecycle")?;
let submission = AppendSubmission::with_options(
AppendOptions::default().with_idempotency(crate::id::generate_v7_id()),
);
submission.validate_route(store)?;
submission.validate_idempotency(store)?;
let payload = CloseLifecyclePayload {
wall_ms: close_hlc.wall_ms,
global_sequence: close_hlc.global_sequence,
};
let event = submission.build_event(
&payload,
EventKind::SYSTEM_CLOSE_COMPLETED,
super::timestamp_us_for_hlc(close_hlc)?,
)?;
let (tx, rx) = flume::bounded(1);
let command = submission.into_command(coord, EventKind::SYSTEM_CLOSE_COMPLETED, event, tx);
store
.writer_handle()?
.tx
.send(command)
.map_err(|_| StoreError::WriterCrashed)?;
crate::store::recv_writer_reply(&rx)?;
Ok(())
}
pub(crate) fn sync(store: &Store<Open>) -> Result<(), StoreError> {
tracing::debug!(target: "batpak::flow", flow = "sync");
let (tx, rx) = flume::bounded(1);
store
.writer_handle()?
.tx
.send(crate::store::write::writer::WriterCommand::Sync { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
crate::store::recv_writer_reply(&rx)
}
pub(crate) fn snapshot(store: &Store<Open>, dest: &std::path::Path) -> Result<(), StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "snapshot",
destination = %dest.display()
);
let _lifecycle = store.lifecycle_gate.lock();
let snapshot_fence = store.begin_visibility_fence()?;
sync(store)?;
crate::store::platform::fs::reject_symlink_leaf(dest, "snapshot destination")?;
std::fs::create_dir_all(dest).map_err(StoreError::Io)?;
clear_snapshot_store_artifacts(dest)?;
let entries = std::fs::read_dir(&store.config.data_dir).map_err(StoreError::Io)?;
for entry in entries.flatten() {
let path = entry.path();
if snapshot_source_should_copy(&path) {
let dest_path = dest.join(entry.file_name());
crate::store::platform::fs::reject_symlink_leaf(&dest_path, "snapshot entry")?;
std::fs::copy(&path, &dest_path).map_err(StoreError::Io)?;
}
}
snapshot_fence.cancel()?;
Ok(())
}
fn snapshot_source_should_copy(path: &std::path::Path) -> bool {
path.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
|| path
.file_name()
.map(|name| name == crate::store::hidden_ranges::VISIBILITY_RANGES_FILENAME)
.unwrap_or(false)
|| path
.file_name()
.map(|name| name == crate::store::cold_start::rebuild::COMPACTION_MARKER_FILENAME)
.unwrap_or(false)
}
fn snapshot_destination_should_clear(path: &std::path::Path) -> bool {
path.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION || ext == "compact-src")
.unwrap_or(false)
|| path
.file_name()
.map(|name| {
name == crate::store::hidden_ranges::VISIBILITY_RANGES_FILENAME
|| name == crate::store::cold_start::checkpoint::CHECKPOINT_FILENAME
|| name == crate::store::cold_start::mmap::MMAP_INDEX_FILENAME
|| name == crate::store::cold_start::rebuild::COMPACTION_MARKER_FILENAME
})
.unwrap_or(false)
}
fn remove_file_if_present(path: &std::path::Path) -> Result<(), StoreError> {
match std::fs::remove_file(path) {
Ok(()) => Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(error) => Err(StoreError::Io(error)),
}
}
fn remove_dir_all_if_present(path: &std::path::Path) -> Result<(), StoreError> {
match std::fs::remove_dir_all(path) {
Ok(()) => Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(error) => Err(StoreError::Io(error)),
}
}
fn clear_snapshot_store_artifacts(dest: &std::path::Path) -> Result<(), StoreError> {
let entries = std::fs::read_dir(dest).map_err(StoreError::Io)?;
for entry in entries.flatten() {
let path = entry.path();
if snapshot_destination_should_clear(&path) {
remove_file_if_present(&path)?;
continue;
}
if path.is_dir()
&& path
.file_name()
.map(|name| name == "cursors")
.unwrap_or(false)
{
remove_dir_all_if_present(&path)?;
}
}
Ok(())
}
fn rollback_compaction_disk_state(
data_dir: &std::path::Path,
merged_path: &std::path::Path,
compact_source_path: Option<&std::path::Path>,
) -> Result<(), StoreError> {
if let Err(remove_err) = std::fs::remove_file(merged_path) {
if remove_err.kind() != std::io::ErrorKind::NotFound {
return Err(StoreError::Io(remove_err));
}
}
if let Some(temp_source_path) = compact_source_path {
std::fs::rename(temp_source_path, merged_path).map_err(StoreError::Io)?;
}
crate::store::cold_start::rebuild::clear_pending_compaction(data_dir)?;
Ok(())
}
struct FailedCompactionCtx<'a> {
config: &'a CompactionConfig,
active_segment_id: u64,
sealed: &'a [(u64, std::path::PathBuf)],
merged_segment_id: u64,
data_dir: &'a std::path::Path,
merged_path: &'a std::path::Path,
compact_source_path: Option<&'a std::path::Path>,
error: &'a StoreError,
context: &'a str,
}
fn failed_compaction_with_rollback(
ctx: &FailedCompactionCtx<'_>,
) -> Result<
(
segment::CompactionResult,
crate::store::compaction_report::CompactionReportBody,
),
StoreError,
> {
rollback_compaction_disk_state(ctx.data_dir, ctx.merged_path, ctx.compact_source_path)?;
let reason = format!("{}; disk layout rolled back: {}", ctx.context, ctx.error);
tracing::error!(target: "batpak::flow", flow = "compact", error = %ctx.error, "{reason}");
let result = segment::CompactionResult {
outcome: segment::CompactionOutcome::Failed {
reason: reason.clone(),
},
segments_removed: 0,
bytes_reclaimed: 0,
};
let report = crate::store::compaction_report::report_for_run(
ctx.config,
ctx.active_segment_id,
ctx.sealed,
Some(ctx.merged_segment_id),
&result,
None,
)?;
Ok((result, report))
}
fn scan_sealed_entries(
store: &Store<Open>,
sealed: &[(u64, std::path::PathBuf)],
) -> Result<Vec<reader::ScannedEntry>, StoreError> {
let mut all_events = Vec::new();
for (_, path) in sealed {
all_events.extend(store.reader.scan_segment(path)?);
}
Ok(all_events)
}
fn scanned_entry_as_stored_event(
entry: &reader::ScannedEntry,
) -> Result<StoredEvent<serde_json::Value>, StoreError> {
Ok(StoredEvent {
coordinate: Coordinate::new(&entry.entity, &entry.scope)?,
event: entry.event.clone(),
})
}
fn write_scanned_entry(
merged_segment: &mut segment::Segment<Active>,
entry: reader::ScannedEntry,
) -> Result<(), StoreError> {
let frame_payload = FramePayload {
event: entry.event,
entity: entry.entity,
scope: entry.scope,
};
let frame = segment::frame_encode(&frame_payload)?;
merged_segment.write_frame(&frame)?;
Ok(())
}
fn materialize_compacted_segment(
store: &Store<Open>,
strategy: &CompactionStrategy,
sealed: &mut [(u64, std::path::PathBuf)],
merged_id: u64,
merged_path: &std::path::Path,
compact_source_path: &mut Option<std::path::PathBuf>,
) -> Result<(), StoreError> {
for (seg_id, _) in sealed.iter() {
store.reader.evict_segment(*seg_id);
}
if let Some((_, source_path)) = sealed.iter_mut().find(|(seg_id, _)| *seg_id == merged_id) {
let temp_source_path = store.config.data_dir.join(format!(
"{merged_id:06}.{}.compact-src",
segment::SEGMENT_EXTENSION
));
let _ = std::fs::remove_file(&temp_source_path);
std::fs::rename(&*source_path, &temp_source_path).map_err(StoreError::Io)?;
*source_path = temp_source_path.clone();
*compact_source_path = Some(temp_source_path);
}
let _ = std::fs::remove_file(merged_path);
let mut merged_segment = segment::Segment::<Active>::create(&store.config.data_dir, merged_id)?;
match strategy {
CompactionStrategy::Merge => {
for (_, path) in sealed.iter() {
merged_segment.append_frames_from_segment(path)?;
}
}
CompactionStrategy::Retention(predicate) => {
for entry in scan_sealed_entries(store, sealed)? {
if predicate(&scanned_entry_as_stored_event(&entry)?) {
write_scanned_entry(&mut merged_segment, entry)?;
}
}
}
CompactionStrategy::Tombstone(predicate) => {
let tombstone_kind = EventKind::TOMBSTONE;
for mut entry in scan_sealed_entries(store, sealed)? {
if !predicate(&scanned_entry_as_stored_event(&entry)?) {
entry.event.header.event_kind = tombstone_kind;
}
write_scanned_entry(&mut merged_segment, entry)?;
}
}
}
merged_segment.sync_with_mode(&store.config.sync.mode)?;
let _sealed_segment = merged_segment.seal();
Ok(())
}
fn rebuild_fresh_compaction_index(
store: &Store<Open>,
) -> Result<crate::store::index::StoreIndex, StoreError> {
sync(store)?;
let fresh_index = crate::store::index::StoreIndex::with_config(&store.config.index);
crate::store::cold_start::rebuild::rebuild_from_segments(
&fresh_index,
&store.reader,
&store.config.data_dir,
)?;
if let Some(ranges) =
crate::store::hidden_ranges::load_cancelled_ranges(&store.config.data_dir)?
{
fresh_index.restore_cancelled_visibility_ranges(ranges);
}
Ok(fresh_index)
}
pub(crate) fn compact(
store: &Store<Open>,
config: &CompactionConfig,
) -> Result<
(
segment::CompactionResult,
crate::store::compaction_report::CompactionReportBody,
),
StoreError,
> {
tracing::debug!(target: "batpak::flow", flow = "compact");
let _lifecycle = store.lifecycle_gate.lock();
sync(store)?;
let mut all_segments: Vec<(u64, std::path::PathBuf)> =
std::fs::read_dir(&store.config.data_dir)
.map_err(StoreError::Io)?
.filter_map(Result::ok)
.filter_map(|entry| {
let path = entry.path();
let ext_ok = path
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false);
if !ext_ok {
return None;
}
let seg_id = path
.file_stem()
.and_then(|stem| stem.to_str())
.and_then(|stem| stem.parse::<u64>().ok())?;
Some((seg_id, path))
})
.collect();
all_segments.sort_by_key(|(id, _)| *id);
let active_segment_id = all_segments.last().map(|(id, _)| *id).unwrap_or(0);
let mut sealed: Vec<(u64, std::path::PathBuf)> = all_segments
.into_iter()
.filter(|(id, _)| *id < active_segment_id)
.collect();
if sealed.len() < config.min_segments {
let result = segment::CompactionResult {
outcome: segment::CompactionOutcome::Skipped,
segments_removed: 0,
bytes_reclaimed: 0,
};
let report =
crate::store::compaction_report::report_skipped(config, active_segment_id, &sealed)?;
return Ok((result, report));
}
let merged_id = sealed[0].0;
let merged_path = store
.config
.data_dir
.join(segment::segment_filename(merged_id));
let source_segment_ids: Vec<u64> = sealed.iter().map(|(seg_id, _)| *seg_id).collect();
let mut compact_source_path = None;
crate::store::cold_start::rebuild::write_pending_compaction(
&store.config.data_dir,
merged_id,
&source_segment_ids,
)?;
let fresh_index = match materialize_compacted_segment(
store,
&config.strategy,
&mut sealed,
merged_id,
&merged_path,
&mut compact_source_path,
)
.and_then(|_| rebuild_fresh_compaction_index(store))
{
Ok(fresh_index) => fresh_index,
Err(error) => {
return failed_compaction_with_rollback(&FailedCompactionCtx {
config,
active_segment_id,
sealed: &sealed,
merged_segment_id: merged_id,
data_dir: &store.config.data_dir,
merged_path: &merged_path,
compact_source_path: compact_source_path.as_deref(),
error: &error,
context: "compaction pre-swap phase failed",
});
}
};
store.index.replace_contents_from_fresh(fresh_index)?;
let mut bytes_reclaimed = 0_u64;
let mut segments_removed = 0_usize;
for (_, path) in &sealed {
if let Ok(meta) = std::fs::metadata(path) {
bytes_reclaimed += meta.len();
}
std::fs::remove_file(path).map_err(StoreError::Io)?;
segments_removed += 1;
}
if let Some(temp_source_path) = compact_source_path {
let _ = std::fs::remove_file(temp_source_path);
}
crate::store::cold_start::rebuild::clear_pending_compaction(&store.config.data_dir)?;
if let Err(e) = write_cold_start_artifacts_on_close(store) {
tracing::warn!("post-compaction cold-start artifact write failed: {e}");
}
let result = segment::CompactionResult {
outcome: segment::CompactionOutcome::Performed,
segments_removed,
bytes_reclaimed,
};
let report = crate::store::compaction_report::report_for_run(
config,
active_segment_id,
&sealed,
Some(merged_id),
&result,
Some(&merged_path),
)?;
Ok((result, report))
}
pub(crate) fn close(mut store: Store<Open>) -> Result<Closed, StoreError> {
tracing::debug!(target: "batpak::flow", flow = "close");
let _lifecycle = store.lifecycle_gate.lock();
if let Err(error) = append_close_completed_event(&store) {
tracing::warn!(
target: "batpak::flow",
flow = "close",
"failed to append SYSTEM_CLOSE_COMPLETED lifecycle event: {error}"
);
}
let (tx, rx) = flume::bounded(1);
store
.writer_handle()?
.tx
.send(crate::store::write::writer::WriterCommand::Shutdown { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
let result = crate::store::recv_writer_reply(&rx);
result?;
if let Some(writer) = store.writer.as_mut() {
writer.join()?;
}
write_cold_start_artifacts_on_close(&store)?;
store.should_shutdown_on_drop = false;
Ok(Closed)
}
fn write_cold_start_artifacts_on_close(store: &Store<Open>) -> Result<(), StoreError> {
let (seg_id, offset) = latest_segment_watermark(&store.config.data_dir)?;
match store.runtime.cold_start.write_target() {
Some(ColdStartArtifactKind::MmapIndex) => {
crate::store::cold_start::mmap::write_mmap_index_with_reserved_kind_fallbacks(
&store.index,
&store.config.data_dir,
seg_id,
offset,
&store.cumulative_reserved_kind_fallbacks,
)?;
}
Some(ColdStartArtifactKind::Checkpoint) => {
crate::store::cold_start::checkpoint::write_checkpoint_with_reserved_kind_fallbacks(
&store.index,
&store.config.data_dir,
seg_id,
offset,
&store.cumulative_reserved_kind_fallbacks,
)?;
}
None => {}
}
Ok(())
}
pub(crate) fn stats<State>(store: &Store<State>) -> StoreStats {
StoreStats {
event_count: store.index.len(),
global_sequence: store.index.global_sequence(),
}
}
pub(crate) fn diagnostics<State>(store: &Store<State>) -> StoreDiagnostics {
let frontier = store.watermark_handle.lock().snapshot_view();
StoreDiagnostics {
event_count: store.index.len(),
global_sequence: store.index.global_sequence(),
visible_sequence: store.index.visible_sequence(),
data_dir: store.config.data_dir.clone(),
segment_max_bytes: store.config.segment_max_bytes,
fd_budget: store.config.fd_budget,
restart_policy: store.config.writer.restart_policy.clone(),
writer_pressure: store
.writer
.as_ref()
.map(|writer| WriterPressure {
queue_len: writer.tx.len(),
capacity: store.config.writer.channel_capacity,
})
.unwrap_or(WriterPressure {
queue_len: 0,
capacity: 0,
}),
frontier,
index_topology: store.index.topology_name(),
tile_count: store.index.tile_count(),
open_report: store.open_report.clone(),
platform_evidence: crate::store::platform::evidence::collect_for_store_path(
&store.config.data_dir,
),
}
}