use crate::dynamic::{self, data::DataTyped};
use crate::storage::file::SerializerInner;
use crate::{Error, NumEntries, TypedBox};
use feldera_types::checkpoint::CheckpointMetadata;
use feldera_types::constants::{
ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME,
DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE,
};
use itertools::Itertools;
use size_of::SizeOf;
use std::io::ErrorKind;
use std::sync::atomic::Ordering;
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};
use feldera_storage::error::StorageError;
use feldera_storage::fbuf::FBuf;
use feldera_storage::{StorageBackend, StorageFileType, StoragePath};
use uuid::Uuid;
use super::RuntimeError;
#[derive(derive_more::Debug, Clone)]
pub struct Checkpointer {
#[debug(skip)]
backend: Arc<dyn StorageBackend>,
checkpoint_list: VecDeque<CheckpointMetadata>,
}
impl Checkpointer {
pub(super) const MIN_CHECKPOINT_THRESHOLD: usize = 2;
pub fn new(backend: Arc<dyn StorageBackend>) -> Result<Self, Error> {
let checkpoint_list = Self::read_checkpoints(&*backend)?;
let this = Checkpointer {
backend,
checkpoint_list,
};
this.init_storage()?;
Ok(this)
}
pub fn verify_fingerprint(&self, fingerprint: u64) -> Result<(), Error> {
if self
.checkpoint_list
.iter()
.any(|cpm| cpm.fingerprint != fingerprint)
{
Err(Error::Runtime(RuntimeError::IncompatibleStorage))
} else {
Ok(())
}
}
fn init_storage(&self) -> Result<(), Error> {
let usage = self.gc_startup()?;
self.backend.usage().store(usage as i64, Ordering::Relaxed);
Ok(())
}
pub(super) fn measure_checkpoint_storage_use(&self, uuid: uuid::Uuid) -> Result<u64, Error> {
let mut usage = 0;
StorageError::ignore_notfound(self.backend.list(
&Self::checkpoint_dir(uuid),
&mut |_path, file_type| {
if let StorageFileType::File { size } = file_type {
usage += size;
}
},
))?;
Ok(usage)
}
pub(super) fn gather_batches_for_checkpoint(
&self,
cpm: &CheckpointMetadata,
) -> Result<HashSet<StoragePath>, StorageError> {
self.backend.gather_batches_for_checkpoint(cpm)
}
pub fn gc_startup(&self) -> Result<u64, Error> {
let mut in_use_paths: HashSet<StoragePath> = HashSet::new();
in_use_paths.insert(CHECKPOINT_FILE_NAME.into());
in_use_paths.insert(STEPS_FILE.into());
in_use_paths.insert(STATE_FILE.into());
in_use_paths.insert(STATUS_FILE.into());
in_use_paths.insert(format!("{}.mut", STATUS_FILE).into());
in_use_paths.insert(ADHOC_TEMP_DIR.into());
in_use_paths.insert(ACTIVATION_MARKER_FILE.into());
for cpm in self.checkpoint_list.iter() {
in_use_paths.insert(cpm.uuid.to_string().into());
let batches = self
.gather_batches_for_checkpoint(cpm)
.expect("Batches for a checkpoint should be discoverable");
for batch in batches {
in_use_paths.insert(batch);
}
}
in_use_paths.insert("coordinator".into());
fn is_feldera_filename(path: &StoragePath) -> bool {
path.extension()
.is_some_and(|extension| DBSP_FILE_EXTENSION.contains(&extension))
}
let mut usage = 0;
self.backend.list(&StoragePath::default(), &mut |path, file_type| {
if !in_use_paths.contains(path) && (is_feldera_filename(path) || file_type == StorageFileType::Directory) {
match self.backend.delete_recursive(path) {
Ok(_) => {
tracing::debug!("Removed unused {file_type:?} '{path}'");
}
Err(e) => {
tracing::warn!("Unable to remove old-checkpoint file {path}: {e} (the pipeline will try to delete the file again on a restart)");
}
}
} else if let StorageFileType::File { size } = file_type {
usage += size;
}
})?;
Ok(usage)
}
pub(super) fn checkpoint_dir(uuid: Uuid) -> StoragePath {
uuid.to_string().into()
}
pub(super) fn commit(
&mut self,
uuid: Uuid,
fingerprint: u64,
identifier: Option<String>,
steps: Option<u64>,
processed_records: Option<u64>,
) -> Result<CheckpointMetadata, Error> {
self.backend
.write(&Self::checkpoint_dir(uuid).child("CHECKPOINT"), FBuf::new())?;
let mut md = CheckpointMetadata {
uuid,
identifier,
fingerprint,
size: None,
processed_records,
steps,
};
let batches = self.gather_batches_for_checkpoint(&md)?;
self.backend
.write_json(
&Self::checkpoint_dir(uuid).child(CHECKPOINT_DEPENDENCIES),
&batches.into_iter().map(|p| p.to_string()).collect_vec(),
)
.and_then(|reader| reader.commit())?;
md.size = Some(self.measure_checkpoint_storage_use(uuid)?);
self.checkpoint_list.push_back(md.clone());
self.update_checkpoint_file()?;
Ok(md)
}
pub(super) fn list_checkpoints(&self) -> Result<Vec<CheckpointMetadata>, Error> {
Ok(self.checkpoint_list.clone().into())
}
pub fn read_checkpoints(
backend: &dyn StorageBackend,
) -> Result<VecDeque<CheckpointMetadata>, Error> {
match backend.read_json(&StoragePath::from(CHECKPOINT_FILE_NAME)) {
Ok(checkpoints) => Ok(checkpoints),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(VecDeque::new()),
Err(error) => Err(error)?,
}
}
fn update_checkpoint_file(&self) -> Result<(), Error> {
Ok(self
.backend
.write_json(&CHECKPOINT_FILE_NAME.into(), &self.checkpoint_list)
.and_then(|reader| reader.commit())?)
}
fn remove_batch_file(&self, file: &StoragePath) {
match self.backend.delete_if_exists(file) {
Ok(_) => {
tracing::debug!("Removed file {file}");
}
Err(e) => {
tracing::warn!(
"Unable to remove old-checkpoint file {file}: {e} (the pipeline will try to delete the file again on a restart)"
);
}
}
}
fn remove_checkpoint_dir(&self, cpm: uuid::Uuid) -> Result<(), Error> {
assert_ne!(cpm, Uuid::nil());
self.backend.delete_recursive(&cpm.to_string().into())?;
Ok(())
}
pub fn gc_checkpoint(
&mut self,
except: HashSet<uuid::Uuid>,
) -> Result<HashSet<uuid::Uuid>, Error> {
if self.checkpoint_list.len() <= Self::MIN_CHECKPOINT_THRESHOLD {
return Ok(HashSet::new());
}
let mut batch_files_to_keep: HashSet<_> = except
.iter()
.filter_map(|uuid| self.backend.gather_batches_for_checkpoint_uuid(*uuid).ok())
.flatten()
.collect();
let to_remove: HashSet<_> = self
.checkpoint_list
.iter()
.take(
self.checkpoint_list
.len()
.saturating_sub(Self::MIN_CHECKPOINT_THRESHOLD),
)
.map(|cpm| cpm.uuid)
.filter(|cpm| !except.contains(cpm))
.collect();
self.checkpoint_list
.retain(|cpm| !to_remove.contains(&cpm.uuid));
self.update_checkpoint_file()?;
self.checkpoint_list
.iter()
.filter(|c| !except.contains(&c.uuid))
.take(1)
.filter_map(|c| self.backend.gather_batches_for_checkpoint(c).ok())
.for_each(|batches| {
for batch in batches {
batch_files_to_keep.insert(batch);
}
});
for cpm in &to_remove {
for batch_file in self
.backend
.gather_batches_for_checkpoint_uuid(*cpm)?
.difference(&batch_files_to_keep)
{
self.remove_batch_file(batch_file);
}
self.remove_checkpoint_dir(*cpm)?;
}
tracing::info!(
"cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}",
to_remove.len(),
self.checkpoint_list
.iter()
.map(|cpm| cpm.uuid)
.collect::<Vec<_>>()
);
Ok(to_remove)
}
}
pub trait Checkpoint {
fn checkpoint(&self) -> Result<Vec<u8>, Error>;
fn restore(&mut self, data: &[u8]) -> Result<(), Error>;
}
impl Checkpoint for isize {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
impl Checkpoint for usize {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
impl Checkpoint for i32 {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
impl<N> Checkpoint for Box<N>
where
N: Checkpoint + ?Sized,
{
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
self.as_ref().checkpoint()
}
fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
self.as_mut().restore(data)
}
}
impl<T> Checkpoint for Option<T>
where
T: Checkpoint,
{
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
impl<T, D: ?Sized> Checkpoint for TypedBox<T, D> {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
impl Checkpoint for dyn dynamic::data::Data + 'static {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
Ok(SerializerInner::to_fbuf_with_thread_local(|s| self.serialize(s)).into_vec())
}
fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
unsafe { self.deserialize_from_bytes(data, 0) };
Ok(())
}
}
impl Checkpoint for dyn DataTyped<Type = u64> + 'static {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
todo!()
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
todo!()
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash, SizeOf)]
pub struct EmptyCheckpoint<T: Default> {
pub val: T,
}
impl<T> NumEntries for EmptyCheckpoint<T>
where
T: Default + NumEntries,
{
const CONST_NUM_ENTRIES: Option<usize> = T::CONST_NUM_ENTRIES;
fn num_entries_shallow(&self) -> usize {
self.val.num_entries_shallow()
}
fn num_entries_deep(&self) -> usize {
self.val.num_entries_deep()
}
}
impl<T: Default> EmptyCheckpoint<T> {
pub fn new(val: T) -> Self {
Self { val }
}
}
impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
Ok(vec![])
}
fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
self.val = T::default();
Ok(())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use feldera_storage::StorageBackend;
use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
use std::collections::HashSet;
use crate::storage::backend::posixio_impl::PosixBackend;
use super::Checkpointer;
struct Empty;
struct MinCheckpoints;
struct ExtraCheckpoints;
struct TestState<S> {
checkpointer: Checkpointer,
tempdir: tempfile::TempDir,
_phantom: std::marker::PhantomData<S>,
}
impl<S> TestState<S> {
fn extras(&self) -> Vec<uuid::Uuid> {
self.checkpointer
.checkpoint_list
.iter()
.map(|cpm| cpm.uuid)
.take(
self.checkpointer
.checkpoint_list
.len()
.saturating_sub(Checkpointer::MIN_CHECKPOINT_THRESHOLD),
)
.collect()
}
fn oldest_extra(&self) -> uuid::Uuid {
self.extras().first().cloned().unwrap()
}
fn newest_extra(&self) -> uuid::Uuid {
self.extras().last().cloned().unwrap()
}
}
impl TestState<Empty> {
fn new() -> Self {
let tempdir = tempfile::tempdir().unwrap();
let backend: Arc<dyn StorageBackend> = Arc::new(PosixBackend::new(
tempdir.path(),
StorageCacheConfig::default(),
&FileBackendConfig::default(),
));
Self {
checkpointer: Checkpointer::new(backend).unwrap(),
tempdir,
_phantom: std::marker::PhantomData,
}
}
fn precondition(&self) {
assert_eq!(self.checkpointer.checkpoint_list.len(), 0);
}
fn checkpoint(mut self) -> TestState<MinCheckpoints> {
self.precondition();
for i in 0..Checkpointer::MIN_CHECKPOINT_THRESHOLD {
self.checkpointer
.commit(uuid::Uuid::now_v7(), 0, None, Some(i as u64), Some(0))
.unwrap();
}
TestState::<MinCheckpoints> {
checkpointer: self.checkpointer,
tempdir: self.tempdir,
_phantom: std::marker::PhantomData,
}
}
}
impl TestState<MinCheckpoints> {
fn precondition(&self) {
assert_eq!(
self.checkpointer.checkpoint_list.len(),
Checkpointer::MIN_CHECKPOINT_THRESHOLD
);
assert!(self.extras().is_empty());
}
fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
self.precondition();
let uuid = uuid::Uuid::now_v7();
self.checkpointer
.commit(uuid, 0, None, Some(2), Some(0))
.unwrap();
TestState::<ExtraCheckpoints> {
checkpointer: self.checkpointer,
tempdir: self.tempdir,
_phantom: std::marker::PhantomData,
}
}
}
impl TestState<ExtraCheckpoints> {
fn precondition(&self) {
assert!(
self.checkpointer.checkpoint_list.len() > Checkpointer::MIN_CHECKPOINT_THRESHOLD
);
assert!(!self.extras().is_empty());
}
fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
self.precondition();
self.checkpointer
.commit(uuid::Uuid::now_v7(), 0, None, Some(3), Some(0))
.unwrap();
TestState::<ExtraCheckpoints> {
checkpointer: self.checkpointer,
tempdir: self.tempdir,
_phantom: std::marker::PhantomData,
}
}
fn gc(mut self) -> TestState<MinCheckpoints> {
self.precondition();
let removed = self.checkpointer.gc_checkpoint(HashSet::new()).unwrap();
assert!(!removed.is_empty());
TestState::<MinCheckpoints> {
checkpointer: self.checkpointer,
tempdir: self.tempdir,
_phantom: std::marker::PhantomData,
}
}
fn gc_with_except(mut self, except: uuid::Uuid) -> TestState<ExtraCheckpoints> {
self.precondition();
self.checkpointer.gc_checkpoint([except].into()).unwrap();
assert!(self.extras().contains(&except));
TestState::<ExtraCheckpoints> {
checkpointer: self.checkpointer,
tempdir: self.tempdir,
_phantom: std::marker::PhantomData,
}
}
}
#[test]
fn test_checkpointer() {
let empty_checkpoints = TestState::<Empty>::new();
let min_checkpoints = empty_checkpoints.checkpoint();
let extra_checkpoints = min_checkpoints.checkpoint();
let min_checkpoints = extra_checkpoints.gc();
let one_extra = min_checkpoints.checkpoint();
let two_extra = one_extra.checkpoint();
let keep = two_extra.newest_extra();
let one_extra = two_extra.gc_with_except(keep);
assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
let two_extra = one_extra.checkpoint();
let keep = two_extra.oldest_extra();
let one_extra = two_extra.gc_with_except(keep);
assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
let min_checkpoints = one_extra.gc();
min_checkpoints.precondition();
}
}