use std::path::Path;
use super::{
Db,
open::{base_file, create_empty_log, delta_file, file_len, write_superblock},
};
use crate::{
DbError, StorageError,
backing::{self, Base},
crc,
freeze::{self, FreezeStamps},
lock::WriterLock,
storage, wal,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum CheckpointPolicy {
Manual,
SizeRatio {
factor: u32,
},
}
impl CheckpointPolicy {
pub const DEFAULT_FACTOR: u32 = 4;
pub const MIN_BASE_BYTES: u64 = 4 * 1024;
#[must_use]
const fn should_checkpoint(self, log_bytes: u64, base_bytes: u64) -> bool {
match self {
Self::Manual => false,
Self::SizeRatio { factor } => {
let floor = if base_bytes < Self::MIN_BASE_BYTES {
Self::MIN_BASE_BYTES
} else {
base_bytes
};
log_bytes > floor.saturating_mul(factor as u64)
}
}
}
}
impl Default for CheckpointPolicy {
fn default() -> Self {
Self::SizeRatio {
factor: Self::DEFAULT_FACTOR,
}
}
}
impl Db {
#[must_use]
pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
self.checkpoint_policy
}
pub const fn set_checkpoint_policy(&mut self, policy: CheckpointPolicy) {
self.checkpoint_policy = policy;
}
pub fn validate(&self) -> Result<(), DbError> {
wal::read_superblock(&self.root)?;
let backing =
backing::open_backing(&self.root.join(base_file(self.base_generation)), false)?;
let snapshot = oxgraph_snapshot::Snapshot::open_checked(&backing, crc::checksum_append)
.map_err(|error| StorageError::invalid_store(error.to_string()))?;
snapshot
.verify_all(crc::checksum_append)
.map_err(|error| StorageError::invalid_store(error.to_string()))?;
Base::attach(backing).map(|_base| ()).map_err(DbError::from)
}
pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
Self::open(path).map(|_database| ())
}
pub fn compact(&mut self) -> Result<(), DbError> {
self.checkpoint()
}
pub(crate) fn checkpoint(&mut self) -> Result<(), DbError> {
self.checkpoint_inner(
#[cfg(test)]
CheckpointStop::Complete,
)
}
pub(super) fn checkpoint_inner(
&mut self,
#[cfg(test)] stop: CheckpointStop,
) -> Result<(), DbError> {
let _lock = WriterLock::acquire(&self.root)?;
let next_generation = self
.base_generation
.checked_add(1)
.ok_or_else(|| DbError::invalid_store("checkpoint generation overflow"))?;
let view = self.current.view();
let commit_seq = self.current.lsn().get();
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq,
transaction_id: self.last_transaction_id.get(),
generation: next_generation,
},
)?;
storage::atomic_write(
&self.root,
&self
.root
.join(format!("{}.tmp", base_file(next_generation))),
&self.root.join(base_file(next_generation)),
&base_bytes,
)?;
create_empty_log(&self.root, next_generation)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeSuperblock) {
return Ok(());
}
write_superblock(
&self.root,
next_generation,
commit_seq,
commit_seq,
self.last_transaction_id.get(),
)?;
#[cfg(test)]
if matches!(stop, CheckpointStop::BeforeRotate) {
return Ok(());
}
let reopened = Self::open(&self.root)?;
let old_generation = self.base_generation;
let policy = self.checkpoint_policy;
self.current = reopened.current;
self.base_generation = reopened.base_generation;
self.last_transaction_id = reopened.last_transaction_id;
self.checkpoint_policy = policy;
let _ = std::fs::remove_file(self.root.join(base_file(old_generation)));
let _ = std::fs::remove_file(self.root.join(delta_file(old_generation)));
let _ = storage::sync_directory(&self.root);
Ok(())
}
pub(super) fn maybe_auto_checkpoint(&mut self) -> Result<(), DbError> {
let log_bytes = file_len(&self.root.join(delta_file(self.base_generation)));
let base_bytes = file_len(&self.root.join(base_file(self.base_generation)));
if self
.checkpoint_policy
.should_checkpoint(log_bytes, base_bytes)
{
self.checkpoint()?;
}
Ok(())
}
}
#[cfg(test)]
#[cfg_attr(
miri,
expect(
dead_code,
reason = "the crash-injection variants are constructed only by the #[cfg(not(miri))] crash-matrix test"
)
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) enum CheckpointStop {
Complete,
BeforeSuperblock,
BeforeRotate,
}