use super::Cursor;
use crate::store::delivery::observation::CheckpointId;
use crate::store::platform;
use crate::store::platform::fs::{RealFs, StoreFs};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CursorCheckpoint {
pub position: u64,
pub started: bool,
pub process_boot_ns: Option<u64>,
#[serde(default)]
pub region_identity: Option<String>,
}
impl CursorCheckpoint {
pub(super) fn from_checkpoint(position: u64, started: bool, region_identity: String) -> Self {
Self {
position,
started,
process_boot_ns: None,
region_identity: Some(region_identity),
}
}
}
fn cursor_checkpoint_dir(data_dir: &Path) -> PathBuf {
data_dir.join("cursors")
}
pub(super) fn cursor_checkpoint_path(data_dir: &Path, id: &CheckpointId) -> PathBuf {
cursor_checkpoint_dir(data_dir).join(format!("{}.ckpt", id.as_str()))
}
#[derive(Clone, Debug)]
pub(super) struct CursorDurableBinding {
pub(super) data_dir: PathBuf,
pub(super) id: CheckpointId,
}
impl Cursor {
pub fn load_checkpoint(
data_dir: &Path,
id: &CheckpointId,
) -> std::io::Result<Option<CursorCheckpoint>> {
let path = cursor_checkpoint_path(data_dir, id);
let bytes = match platform::fs::read(&path) {
Ok(b) => b,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error),
};
match crate::encoding::from_bytes::<CursorCheckpoint>(&bytes) {
Ok(ckpt) => Ok(Some(ckpt)),
Err(error) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("cursor checkpoint decode failed: {error}"),
)),
}
}
pub fn save_checkpoint(
data_dir: &Path,
id: &CheckpointId,
ckpt: &CursorCheckpoint,
) -> std::io::Result<()> {
Self::save_checkpoint_with_fs(data_dir, id, ckpt, &RealFs)
}
pub(crate) fn save_checkpoint_with_fs(
data_dir: &Path,
id: &CheckpointId,
ckpt: &CursorCheckpoint,
fs: &dyn StoreFs,
) -> std::io::Result<()> {
let dir = cursor_checkpoint_dir(data_dir);
fs.create_dir_all(&dir)?;
let bytes =
crate::encoding::to_bytes(ckpt).map_err(|e| std::io::Error::other(e.to_string()))?;
let final_path = cursor_checkpoint_path(data_dir, id);
let mut tmp = fs.named_temp_in(&dir)?;
{
use std::io::Write;
tmp.write_all(&bytes)?;
tmp.flush()?;
}
crate::store::platform::sync::sync_file_all_io(tmp.as_file())?;
let admission = crate::store::platform::sync::admit_current_parent_dir_sync()
.map_err(|error| std::io::Error::other(error.to_string()))?;
fs.persist_temp_with_parent_sync(tmp, &final_path, admission)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{cursor_checkpoint_path, Cursor};
use crate::store::delivery::observation::CheckpointId;
#[test]
fn load_checkpoint_propagates_non_not_found_read_errors_instead_of_forgetting() {
let dir = tempfile::TempDir::new().expect("temp dir");
let id = CheckpointId::new("batpak-load-checkpoint-non-notfound").expect("valid id");
let path = cursor_checkpoint_path(dir.path(), &id);
crate::store::platform::fs::create_dir_all(&path)
.expect("create a directory at the checkpoint path");
let result = Cursor::load_checkpoint(dir.path(), &id);
assert!(
matches!(&result, Err(error) if error.kind() != std::io::ErrorKind::NotFound),
"PROPERTY: an unreadable (non-absent) checkpoint must propagate a \
non-NotFound Err, not collapse to Ok(None) as the `true` guard mutant \
does; got {result:?}"
);
}
}