use super::Cursor;
use crate::store::delivery::observation::CheckpointId;
use crate::store::platform;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CursorCheckpoint {
pub position: u64,
pub started: bool,
pub process_boot_ns: Option<u64>,
#[serde(default)]
pub region_identity: Option<String>,
}
impl CursorCheckpoint {
pub(super) fn from_checkpoint(position: u64, started: bool, region_identity: String) -> Self {
Self {
position,
started,
process_boot_ns: None,
region_identity: Some(region_identity),
}
}
}
fn cursor_checkpoint_dir(data_dir: &Path) -> PathBuf {
data_dir.join("cursors")
}
pub(super) fn cursor_checkpoint_path(data_dir: &Path, id: &CheckpointId) -> PathBuf {
cursor_checkpoint_dir(data_dir).join(format!("{}.ckpt", id.as_str()))
}
#[derive(Clone, Debug)]
pub(super) struct CursorDurableBinding {
pub(super) data_dir: PathBuf,
pub(super) id: CheckpointId,
}
impl Cursor {
pub fn load_checkpoint(
data_dir: &Path,
id: &CheckpointId,
) -> std::io::Result<Option<CursorCheckpoint>> {
let path = cursor_checkpoint_path(data_dir, id);
let bytes = match platform::fs::read(&path) {
Ok(b) => b,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error),
};
match crate::encoding::from_bytes::<CursorCheckpoint>(&bytes) {
Ok(ckpt) => Ok(Some(ckpt)),
Err(error) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("cursor checkpoint decode failed: {error}"),
)),
}
}
pub fn save_checkpoint(
data_dir: &Path,
id: &CheckpointId,
ckpt: &CursorCheckpoint,
) -> std::io::Result<()> {
let dir = cursor_checkpoint_dir(data_dir);
platform::fs::create_dir_all(&dir)?;
let bytes =
crate::encoding::to_bytes(ckpt).map_err(|e| std::io::Error::other(e.to_string()))?;
let final_path = cursor_checkpoint_path(data_dir, id);
let mut tmp = platform::fs::named_temp_in(&dir)?;
{
use std::io::Write;
tmp.write_all(&bytes)?;
tmp.flush()?;
}
crate::store::platform::sync::sync_file_all_io(tmp.as_file())?;
let admission = crate::store::platform::sync::admit_current_parent_dir_sync()
.map_err(|error| std::io::Error::other(error.to_string()))?;
crate::store::platform::sync::persist_temp_with_parent_sync(tmp, &final_path, admission)?;
Ok(())
}
}