use std::io;
use std::path::PathBuf;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum CheckpointPolicy {
Flush,
Fsync,
FsyncEveryN(u32),
}
pub trait CheckpointSink {
fn record(&mut self, checkpoint: &Checkpoint) -> io::Result<()>;
fn policy(&self) -> CheckpointPolicy {
CheckpointPolicy::Flush
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopCheckpointSink;
impl CheckpointSink for NoopCheckpointSink {
fn record(&mut self, _checkpoint: &Checkpoint) -> io::Result<()> {
Ok(())
}
}
impl<F> CheckpointSink for F
where
F: FnMut(&Checkpoint) -> io::Result<()>,
{
fn record(&mut self, checkpoint: &Checkpoint) -> io::Result<()> {
self(checkpoint)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Checkpoint {
Sequential(SequentialCheckpoint),
Indexed(IndexedCheckpoint),
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SequentialCheckpoint {
pub schema_version: u32,
pub next_chunk_index: u64,
pub bytes_read: u64,
pub patch_name: Option<String>,
pub patch_size: Option<u64>,
pub in_flight: Option<InFlightAddFile>,
}
impl SequentialCheckpoint {
pub const CURRENT_SCHEMA_VERSION: u32 = 1;
#[must_use]
pub fn new(
next_chunk_index: u64,
bytes_read: u64,
patch_name: Option<String>,
patch_size: Option<u64>,
in_flight: Option<InFlightAddFile>,
) -> Self {
Self {
schema_version: Self::CURRENT_SCHEMA_VERSION,
next_chunk_index,
bytes_read,
patch_name,
patch_size,
in_flight,
}
}
#[must_use]
pub fn with_in_flight(mut self, in_flight: Option<InFlightAddFile>) -> Self {
self.in_flight = in_flight;
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct InFlightAddFile {
pub target_path: PathBuf,
pub file_offset: u64,
pub block_idx: u32,
pub bytes_into_target: u64,
}
impl InFlightAddFile {
#[must_use]
pub fn new(
target_path: PathBuf,
file_offset: u64,
block_idx: u32,
bytes_into_target: u64,
) -> Self {
Self {
target_path,
file_offset,
block_idx,
bytes_into_target,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct IndexedCheckpoint {
pub schema_version: u32,
pub plan_crc32: u32,
pub fs_ops_done: bool,
pub next_target_idx: u64,
pub next_region_idx: u64,
pub bytes_written: u64,
}
impl IndexedCheckpoint {
pub const CURRENT_SCHEMA_VERSION: u32 = 2;
#[must_use]
pub fn new(
plan_crc32: u32,
fs_ops_done: bool,
next_target_idx: u64,
next_region_idx: u64,
bytes_written: u64,
) -> Self {
Self {
schema_version: Self::CURRENT_SCHEMA_VERSION,
plan_crc32,
fs_ops_done,
next_target_idx,
next_region_idx,
bytes_written,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::Mutex;
#[test]
fn noop_sink_records_succeed_for_every_variant() {
let mut sink = NoopCheckpointSink;
let seq = Checkpoint::Sequential(SequentialCheckpoint {
schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
next_chunk_index: 3,
bytes_read: 1024,
patch_name: None,
patch_size: None,
in_flight: None,
});
let indexed = Checkpoint::Indexed(IndexedCheckpoint {
schema_version: IndexedCheckpoint::CURRENT_SCHEMA_VERSION,
plan_crc32: 0xDEAD_BEEF,
fs_ops_done: true,
next_target_idx: 7,
next_region_idx: 128,
bytes_written: 65536,
});
sink.record(&seq).expect("Noop must succeed");
sink.record(&indexed).expect("Noop must succeed");
}
#[test]
fn noop_sink_default_policy_is_flush() {
let sink = NoopCheckpointSink;
assert_eq!(sink.policy(), CheckpointPolicy::Flush);
}
#[test]
fn closure_sink_captures_records_in_order() {
let captured: Arc<Mutex<Vec<Checkpoint>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let mut sink = move |c: &Checkpoint| -> io::Result<()> {
captured_clone.lock().unwrap().push(c.clone());
Ok(())
};
let a = Checkpoint::Sequential(SequentialCheckpoint {
schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
next_chunk_index: 1,
bytes_read: 32,
patch_name: None,
patch_size: None,
in_flight: None,
});
let b = Checkpoint::Sequential(SequentialCheckpoint {
schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
next_chunk_index: 2,
bytes_read: 64,
patch_name: None,
patch_size: None,
in_flight: None,
});
sink.record(&a).unwrap();
sink.record(&b).unwrap();
let got = captured.lock().unwrap();
assert_eq!(got.len(), 2);
assert_eq!(got[0], a);
assert_eq!(got[1], b);
}
#[test]
fn closure_sink_propagates_io_error_unchanged() {
let mut sink = |_: &Checkpoint| -> io::Result<()> { Err(io::Error::other("synthetic")) };
let c = Checkpoint::Indexed(IndexedCheckpoint {
schema_version: IndexedCheckpoint::CURRENT_SCHEMA_VERSION,
plan_crc32: 0,
fs_ops_done: false,
next_target_idx: 0,
next_region_idx: 0,
bytes_written: 0,
});
let err = sink.record(&c).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
assert_eq!(err.to_string(), "synthetic");
}
#[test]
fn schema_version_constants_have_expected_values() {
assert_eq!(SequentialCheckpoint::CURRENT_SCHEMA_VERSION, 1);
assert_eq!(IndexedCheckpoint::CURRENT_SCHEMA_VERSION, 2);
}
#[cfg(feature = "serde")]
#[test]
fn bincode_round_trips_sequential_checkpoint_without_in_flight() {
let cp = Checkpoint::Sequential(SequentialCheckpoint {
schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
next_chunk_index: 42,
bytes_read: 0x1_0000_0000,
patch_name: Some("H2017.07.11.0000.0000a.patch".into()),
patch_size: Some(0x2_0000_0000),
in_flight: None,
});
let cfg = bincode::config::standard();
let bytes = bincode::serde::encode_to_vec(&cp, cfg).unwrap();
let (decoded, _): (Checkpoint, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(cp, decoded);
}
#[cfg(feature = "serde")]
#[test]
fn bincode_round_trips_sequential_checkpoint_with_in_flight() {
let cp = Checkpoint::Sequential(SequentialCheckpoint {
schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
next_chunk_index: 9,
bytes_read: 1_048_576,
patch_name: Some("patch.patch".into()),
patch_size: Some(1_048_576 * 4),
in_flight: Some(InFlightAddFile {
target_path: PathBuf::from("/install/sqpack/ffxiv/000000.win32.dat0"),
file_offset: 0,
block_idx: 17,
bytes_into_target: 17 * 16_384,
}),
});
let cfg = bincode::config::standard();
let bytes = bincode::serde::encode_to_vec(&cp, cfg).unwrap();
let (decoded, _): (Checkpoint, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(cp, decoded);
}
#[cfg(feature = "serde")]
#[test]
fn bincode_round_trips_indexed_checkpoint() {
let cp = Checkpoint::Indexed(IndexedCheckpoint {
schema_version: IndexedCheckpoint::CURRENT_SCHEMA_VERSION,
plan_crc32: 0xA5A5_5A5A,
fs_ops_done: true,
next_target_idx: 12,
next_region_idx: 64,
bytes_written: 12345,
});
let cfg = bincode::config::standard();
let bytes = bincode::serde::encode_to_vec(&cp, cfg).unwrap();
let (decoded, _): (Checkpoint, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(cp, decoded);
}
#[cfg(feature = "serde")]
#[test]
fn bincode_round_trips_checkpoint_policy_variants() {
for policy in [
CheckpointPolicy::Flush,
CheckpointPolicy::Fsync,
CheckpointPolicy::FsyncEveryN(64),
CheckpointPolicy::FsyncEveryN(1),
] {
let cfg = bincode::config::standard();
let bytes = bincode::serde::encode_to_vec(&policy, cfg).unwrap();
let (decoded, _): (CheckpointPolicy, _) =
bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(policy, decoded);
}
}
}