batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
use super::Cursor;
use crate::store::delivery::observation::CheckpointId;
use crate::store::platform;
use crate::store::platform::fs::{RealFs, StoreFs};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};

/// Durable cursor checkpoint.
///
/// Written atomically to `{data_dir}/cursors/{id}.ckpt` via tempfile +
/// parent-directory fsync after every successful batch so a cursor with a
/// `checkpoint_id` resumes from the durable position after a process
/// restart. `process_boot_ns` reserves space for monotonic-clock
/// cross-checks without wiring any clock dependency today — set to
/// `None` when that wiring is not required.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CursorCheckpoint {
    /// Global sequence of the last delivered event.
    ///
    /// When `started` is true, a subsequent poll returns events strictly
    /// after this position.
    pub position: u64,
    /// Whether the cursor has delivered at least one event. A fresh
    /// cursor starts at position 0 with `started = false` so that
    /// global_sequence 0 (a legitimate value) is not skipped.
    pub started: bool,
    /// Process-boot monotonic clock value at the time of the last save.
    /// Reserved for monotonic-clock integration; `None` when not wired.
    pub process_boot_ns: Option<u64>,
    /// Stable identity of the region this checkpoint belongs to.
    ///
    /// Old checkpoints may deserialize with `None`; startup treats that as
    /// a mismatch and fails closed instead of silently resuming an
    /// unscoped checkpoint against an arbitrary region.
    #[serde(default)]
    pub region_identity: Option<String>,
}

impl CursorCheckpoint {
    pub(super) fn from_checkpoint(position: u64, started: bool, region_identity: String) -> Self {
        Self {
            position,
            started,
            process_boot_ns: None,
            region_identity: Some(region_identity),
        }
    }
}

fn cursor_checkpoint_dir(data_dir: &Path) -> PathBuf {
    data_dir.join("cursors")
}

pub(super) fn cursor_checkpoint_path(data_dir: &Path, id: &CheckpointId) -> PathBuf {
    cursor_checkpoint_dir(data_dir).join(format!("{}.ckpt", id.as_str()))
}

#[derive(Clone, Debug)]
pub(super) struct CursorDurableBinding {
    pub(super) data_dir: PathBuf,
    pub(super) id: CheckpointId,
}

impl Cursor {
    /// Load a persisted cursor checkpoint, or `Ok(None)` if none exists.
    ///
    /// # Errors
    /// Returns an I/O error if the checkpoint file exists but cannot be
    /// read. A decoding error yields `io::ErrorKind::InvalidData` so
    /// durable-resume callers can fail closed instead of silently
    /// rewinding to position 0.
    pub fn load_checkpoint(
        data_dir: &Path,
        id: &CheckpointId,
    ) -> std::io::Result<Option<CursorCheckpoint>> {
        let path = cursor_checkpoint_path(data_dir, id);
        let bytes = match platform::fs::read(&path) {
            Ok(b) => b,
            Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
            Err(error) => return Err(error),
        };
        match crate::encoding::from_bytes::<CursorCheckpoint>(&bytes) {
            Ok(ckpt) => Ok(Some(ckpt)),
            Err(error) => Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("cursor checkpoint decode failed: {error}"),
            )),
        }
    }

    /// Persist a cursor checkpoint atomically with a parent-directory
    /// fsync. The cursor-directory is created lazily if it does not
    /// already exist.
    ///
    /// # Errors
    /// Returns any I/O error from temp-file creation, write, fsync, or
    /// rename. Encoding errors are surfaced as `io::Error` with kind
    /// `Other`.
    pub fn save_checkpoint(
        data_dir: &Path,
        id: &CheckpointId,
        ckpt: &CursorCheckpoint,
    ) -> std::io::Result<()> {
        // Public surface: the production default backend. The crash-sensitive
        // durable-cursor write path (`persist_current`) drives the
        // [`Cursor::save_checkpoint_with_fs`] variant with the store's configured
        // filesystem so a `SimFs` can tear the checkpoint persist; this thin
        // wrapper keeps the standalone public API behavior-identical via
        // [`RealFs`].
        Self::save_checkpoint_with_fs(data_dir, id, ckpt, &RealFs)
    }

    /// [`Cursor::save_checkpoint`], routed through the supplied [`StoreFs`]
    /// backend so the temp-file create and the atomic publish
    /// ([`StoreFs::persist_temp_with_parent_sync`]) are fault-injectable under
    /// `SimFs`. The durable-cursor write path calls this with the store's
    /// configured fs; `RealFs` makes it byte-for-byte the production behavior.
    ///
    /// # Errors
    /// Returns any I/O error from temp-file creation, write, fsync, or the
    /// atomic publish. Encoding errors are surfaced as `io::Error` with kind
    /// `Other`.
    pub(crate) fn save_checkpoint_with_fs(
        data_dir: &Path,
        id: &CheckpointId,
        ckpt: &CursorCheckpoint,
        fs: &dyn StoreFs,
    ) -> std::io::Result<()> {
        let dir = cursor_checkpoint_dir(data_dir);
        fs.create_dir_all(&dir)?;
        let bytes =
            crate::encoding::to_bytes(ckpt).map_err(|e| std::io::Error::other(e.to_string()))?;
        let final_path = cursor_checkpoint_path(data_dir, id);

        let mut tmp = fs.named_temp_in(&dir)?;
        {
            use std::io::Write;
            tmp.write_all(&bytes)?;
            tmp.flush()?;
        }
        // Fsync the temp contents before rename; `persist_temp_with_parent_sync`
        // does a defensive fsync too, but doing it here keeps the
        // durability boundary explicit.
        crate::store::platform::sync::sync_file_all_io(tmp.as_file())?;
        let admission = crate::store::platform::sync::admit_current_parent_dir_sync()
            .map_err(|error| std::io::Error::other(error.to_string()))?;
        fs.persist_temp_with_parent_sync(tmp, &final_path, admission)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::{cursor_checkpoint_path, Cursor};
    use crate::store::delivery::observation::CheckpointId;

    #[test]
    fn load_checkpoint_propagates_non_not_found_read_errors_instead_of_forgetting() {
        // Kills checkpoint.rs:79 match guard `error.kind() == NotFound` -> `true`.
        // Only a genuinely ABSENT checkpoint may collapse to Ok(None); any OTHER
        // read failure must surface as an Err so a durable-resume caller fails
        // closed rather than silently rewinding to position 0. The `true` mutant
        // maps EVERY read error (including a corrupt/unreadable path) to Ok(None).
        let dir = tempfile::TempDir::new().expect("temp dir");
        let id = CheckpointId::new("batpak-load-checkpoint-non-notfound").expect("valid id");

        // Materialise a DIRECTORY where the checkpoint file is expected. Reading
        // it yields an io error whose kind is NOT NotFound (the file "is there",
        // just unreadable as a file), exercising the non-NotFound arm.
        let path = cursor_checkpoint_path(dir.path(), &id);
        crate::store::platform::fs::create_dir_all(&path)
            .expect("create a directory at the checkpoint path");

        let result = Cursor::load_checkpoint(dir.path(), &id);
        assert!(
            matches!(&result, Err(error) if error.kind() != std::io::ErrorKind::NotFound),
            "PROPERTY: an unreadable (non-absent) checkpoint must propagate a \
             non-NotFound Err, not collapse to Ok(None) as the `true` guard mutant \
             does; got {result:?}"
        );
    }
}