Skip to main content

dbsp/circuit/
checkpointer.rs

1//! Logic to manage persistent checkpoints for a circuit.
2
3use crate::dynamic::{self, data::DataTyped};
4use crate::storage::file::SerializerInner;
5use crate::{Error, NumEntries, TypedBox};
6use feldera_types::checkpoint::CheckpointMetadata;
7use feldera_types::constants::{
8    ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME,
9    DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE,
10};
11use itertools::Itertools;
12use size_of::SizeOf;
13
14use std::io::ErrorKind;
15use std::sync::atomic::Ordering;
16use std::{
17    collections::{HashSet, VecDeque},
18    sync::Arc,
19};
20
21use feldera_storage::error::StorageError;
22use feldera_storage::fbuf::FBuf;
23use feldera_storage::{StorageBackend, StorageFileType, StoragePath};
24use uuid::Uuid;
25
26use super::RuntimeError;
27
28/// A "checkpointer" is responsible for the creation, and removal of
29/// checkpoints for a circuit.
30///
31/// It handles list of available checkpoints, and the files associated
32/// with each checkpoint.
33#[derive(derive_more::Debug, Clone)]
34pub struct Checkpointer {
35    #[debug(skip)]
36    backend: Arc<dyn StorageBackend>,
37    checkpoint_list: VecDeque<CheckpointMetadata>,
38}
39
40impl Checkpointer {
41    /// We keep at least this many checkpoints around.
42    pub(super) const MIN_CHECKPOINT_THRESHOLD: usize = 2;
43
44    /// Creates a new checkpointer for directory `storage_path`.  Deletes any
45    /// unreferenced files in the directory.
46    pub fn new(backend: Arc<dyn StorageBackend>) -> Result<Self, Error> {
47        let checkpoint_list = Self::read_checkpoints(&*backend)?;
48
49        let this = Checkpointer {
50            backend,
51            checkpoint_list,
52        };
53
54        this.init_storage()?;
55
56        Ok(this)
57    }
58
59    /// Verifies that existing checkpoints have the specified fingerprint.
60    pub fn verify_fingerprint(&self, fingerprint: u64) -> Result<(), Error> {
61        if self
62            .checkpoint_list
63            .iter()
64            .any(|cpm| cpm.fingerprint != fingerprint)
65        {
66            Err(Error::Runtime(RuntimeError::IncompatibleStorage))
67        } else {
68            Ok(())
69        }
70    }
71
72    fn init_storage(&self) -> Result<(), Error> {
73        let usage = self.gc_startup()?;
74
75        // We measured the amount of storage in use. Give it to the backend as
76        // the initial value.
77        self.backend.usage().store(usage as i64, Ordering::Relaxed);
78
79        Ok(())
80    }
81
82    pub(super) fn measure_checkpoint_storage_use(&self, uuid: uuid::Uuid) -> Result<u64, Error> {
83        let mut usage = 0;
84        StorageError::ignore_notfound(self.backend.list(
85            &Self::checkpoint_dir(uuid),
86            &mut |_path, file_type| {
87                if let StorageFileType::File { size } = file_type {
88                    usage += size;
89                }
90            },
91        ))?;
92        Ok(usage)
93    }
94
95    pub(super) fn gather_batches_for_checkpoint(
96        &self,
97        cpm: &CheckpointMetadata,
98    ) -> Result<HashSet<StoragePath>, StorageError> {
99        self.backend.gather_batches_for_checkpoint(cpm)
100    }
101
102    /// Remove unexpected/leftover files from a previous run in the storage
103    /// directory.  Returns the amount of storage still in use.
104    pub fn gc_startup(&self) -> Result<u64, Error> {
105        // Collect all directories and files still referenced by a checkpoint
106        let mut in_use_paths: HashSet<StoragePath> = HashSet::new();
107        in_use_paths.insert(CHECKPOINT_FILE_NAME.into());
108        in_use_paths.insert(STEPS_FILE.into());
109        in_use_paths.insert(STATE_FILE.into());
110        // Don't delete either `status.json` or `status.json.mut` either because
111        // these files get updated asynchronously and we must not interfere with
112        // it.
113        in_use_paths.insert(STATUS_FILE.into());
114        in_use_paths.insert(format!("{}.mut", STATUS_FILE).into());
115        in_use_paths.insert(ADHOC_TEMP_DIR.into());
116        in_use_paths.insert(ACTIVATION_MARKER_FILE.into());
117        for cpm in self.checkpoint_list.iter() {
118            in_use_paths.insert(cpm.uuid.to_string().into());
119            let batches = self
120                .gather_batches_for_checkpoint(cpm)
121                .expect("Batches for a checkpoint should be discoverable");
122            for batch in batches {
123                in_use_paths.insert(batch);
124            }
125        }
126        // Give the coordinator a namespace for persistent files.
127        in_use_paths.insert("coordinator".into());
128
129        /// True if `path` is a name that we might have created ourselves.
130        fn is_feldera_filename(path: &StoragePath) -> bool {
131            path.extension()
132                .is_some_and(|extension| DBSP_FILE_EXTENSION.contains(&extension))
133        }
134
135        // Collect everything found in the storage directory
136        let mut usage = 0;
137        self.backend.list(&StoragePath::default(), &mut |path, file_type| {
138            if !in_use_paths.contains(path) && (is_feldera_filename(path) || file_type == StorageFileType::Directory) {
139                match self.backend.delete_recursive(path) {
140                    Ok(_) => {
141                        tracing::debug!("Removed unused {file_type:?} '{path}'");
142                    }
143                    Err(e) => {
144                        tracing::warn!("Unable to remove old-checkpoint file {path}: {e} (the pipeline will try to delete the file again on a restart)");
145                    }
146                }
147            } else if let StorageFileType::File { size } = file_type {
148                usage += size;
149            }
150        })?;
151
152        Ok(usage)
153    }
154
155    pub(super) fn checkpoint_dir(uuid: Uuid) -> StoragePath {
156        uuid.to_string().into()
157    }
158
159    pub(super) fn commit(
160        &mut self,
161        uuid: Uuid,
162        fingerprint: u64,
163        identifier: Option<String>,
164        steps: Option<u64>,
165        processed_records: Option<u64>,
166    ) -> Result<CheckpointMetadata, Error> {
167        // Write marker file to ensure that this directory is detected as a
168        // checkpoint.
169        self.backend
170            .write(&Self::checkpoint_dir(uuid).child("CHECKPOINT"), FBuf::new())?;
171
172        let mut md = CheckpointMetadata {
173            uuid,
174            identifier,
175            fingerprint,
176            size: None,
177            processed_records,
178            steps,
179        };
180
181        let batches = self.gather_batches_for_checkpoint(&md)?;
182
183        self.backend
184            .write_json(
185                &Self::checkpoint_dir(uuid).child(CHECKPOINT_DEPENDENCIES),
186                &batches.into_iter().map(|p| p.to_string()).collect_vec(),
187            )
188            .and_then(|reader| reader.commit())?;
189
190        md.size = Some(self.measure_checkpoint_storage_use(uuid)?);
191
192        self.checkpoint_list.push_back(md.clone());
193        self.update_checkpoint_file()?;
194        Ok(md)
195    }
196
197    /// List all currently available checkpoints.
198    pub(super) fn list_checkpoints(&self) -> Result<Vec<CheckpointMetadata>, Error> {
199        Ok(self.checkpoint_list.clone().into())
200    }
201
202    /// Reads the list of checkpoints available through `backend`.
203    pub fn read_checkpoints(
204        backend: &dyn StorageBackend,
205    ) -> Result<VecDeque<CheckpointMetadata>, Error> {
206        match backend.read_json(&StoragePath::from(CHECKPOINT_FILE_NAME)) {
207            Ok(checkpoints) => Ok(checkpoints),
208            Err(error) if error.kind() == ErrorKind::NotFound => Ok(VecDeque::new()),
209            Err(error) => Err(error)?,
210        }
211    }
212
213    fn update_checkpoint_file(&self) -> Result<(), Error> {
214        Ok(self
215            .backend
216            .write_json(&CHECKPOINT_FILE_NAME.into(), &self.checkpoint_list)
217            .and_then(|reader| reader.commit())?)
218    }
219
220    /// Removes `file` and logs any error.
221    fn remove_batch_file(&self, file: &StoragePath) {
222        match self.backend.delete_if_exists(file) {
223            Ok(_) => {
224                tracing::debug!("Removed file {file}");
225            }
226            Err(e) => {
227                tracing::warn!(
228                    "Unable to remove old-checkpoint file {file}: {e} (the pipeline will try to delete the file again on a restart)"
229                );
230            }
231        }
232    }
233
234    /// Removes all meta-data files associated with the checkpoint given by
235    /// `cpm` by removing the folder associated with the checkpoint.
236    fn remove_checkpoint_dir(&self, cpm: uuid::Uuid) -> Result<(), Error> {
237        assert_ne!(cpm, Uuid::nil());
238        self.backend.delete_recursive(&cpm.to_string().into())?;
239        Ok(())
240    }
241
242    /// Remove the oldest checkpoints from the list.
243    /// - Preserves at least `MIN_CHECKPOINT_THRESHOLD` checkpoints.
244    /// - Does not remove any checkpoints whose UUID is in the `except` list.
245    ///
246    /// # Returns
247    /// - Uuid of the removed checkpoints, if there are more than `MIN_CHECKPOINT_THRESHOLD`.
248    /// - Empty set otherwise.
249    pub fn gc_checkpoint(
250        &mut self,
251        except: HashSet<uuid::Uuid>,
252    ) -> Result<HashSet<uuid::Uuid>, Error> {
253        if self.checkpoint_list.len() <= Self::MIN_CHECKPOINT_THRESHOLD {
254            return Ok(HashSet::new());
255        }
256
257        let mut batch_files_to_keep: HashSet<_> = except
258            .iter()
259            .filter_map(|uuid| self.backend.gather_batches_for_checkpoint_uuid(*uuid).ok())
260            .flatten()
261            .collect();
262
263        let to_remove: HashSet<_> = self
264            .checkpoint_list
265            .iter()
266            .take(
267                self.checkpoint_list
268                    .len()
269                    .saturating_sub(Self::MIN_CHECKPOINT_THRESHOLD),
270            )
271            .map(|cpm| cpm.uuid)
272            .filter(|cpm| !except.contains(cpm))
273            .collect();
274
275        self.checkpoint_list
276            .retain(|cpm| !to_remove.contains(&cpm.uuid));
277
278        // Update the checkpoint list file, we do this first intentionally, in case
279        // later operations fail we don't want the checkpoint list to
280        // contain a checkpoint that only has part of the files.
281        //
282        // If any of the later operations fail, restarting the circuit will try
283        // to remove the checkpoint files again (see also [`Self::gc_startup`]).
284        self.update_checkpoint_file()?;
285
286        // Find the first checkpoint in checkpoint list that is not in `except`.
287        self.checkpoint_list
288            .iter()
289            .filter(|c| !except.contains(&c.uuid))
290            .take(1)
291            .filter_map(|c| self.backend.gather_batches_for_checkpoint(c).ok())
292            .for_each(|batches| {
293                for batch in batches {
294                    batch_files_to_keep.insert(batch);
295                }
296            });
297
298        for cpm in &to_remove {
299            for batch_file in self
300                .backend
301                .gather_batches_for_checkpoint_uuid(*cpm)?
302                .difference(&batch_files_to_keep)
303            {
304                self.remove_batch_file(batch_file);
305            }
306
307            self.remove_checkpoint_dir(*cpm)?;
308        }
309
310        tracing::info!(
311            "cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}",
312            to_remove.len(),
313            self.checkpoint_list
314                .iter()
315                .map(|cpm| cpm.uuid)
316                .collect::<Vec<_>>()
317        );
318
319        Ok(to_remove)
320    }
321}
322
323/// Trait for types that can be check-pointed and restored.
324///
325/// This is to be used for any additional state within circuit operators
326/// that's not stored within a batch (which are already stored in files).
327pub trait Checkpoint {
328    fn checkpoint(&self) -> Result<Vec<u8>, Error>;
329    fn restore(&mut self, data: &[u8]) -> Result<(), Error>;
330}
331
332impl Checkpoint for isize {
333    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
334        todo!()
335    }
336
337    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
338        todo!()
339    }
340}
341
342impl Checkpoint for usize {
343    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
344        todo!()
345    }
346
347    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
348        todo!()
349    }
350}
351
352impl Checkpoint for i32 {
353    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
354        todo!()
355    }
356
357    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
358        todo!()
359    }
360}
361
362impl<N> Checkpoint for Box<N>
363where
364    N: Checkpoint + ?Sized,
365{
366    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
367        self.as_ref().checkpoint()
368    }
369
370    fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
371        self.as_mut().restore(data)
372    }
373}
374
375impl<T> Checkpoint for Option<T>
376where
377    T: Checkpoint,
378{
379    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
380        todo!()
381    }
382
383    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
384        todo!()
385    }
386}
387
388impl<T, D: ?Sized> Checkpoint for TypedBox<T, D> {
389    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
390        todo!()
391    }
392    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
393        todo!()
394    }
395}
396
397impl Checkpoint for dyn dynamic::data::Data + 'static {
398    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
399        Ok(SerializerInner::to_fbuf_with_thread_local(|s| self.serialize(s)).into_vec())
400    }
401
402    fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
403        unsafe { self.deserialize_from_bytes(data, 0) };
404        Ok(())
405    }
406}
407
408impl Checkpoint for dyn DataTyped<Type = u64> + 'static {
409    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
410        todo!()
411    }
412
413    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
414        todo!()
415    }
416}
417
418#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash, SizeOf)]
419pub struct EmptyCheckpoint<T: Default> {
420    pub val: T,
421}
422
423impl<T> NumEntries for EmptyCheckpoint<T>
424where
425    T: Default + NumEntries,
426{
427    const CONST_NUM_ENTRIES: Option<usize> = T::CONST_NUM_ENTRIES;
428
429    fn num_entries_shallow(&self) -> usize {
430        self.val.num_entries_shallow()
431    }
432
433    fn num_entries_deep(&self) -> usize {
434        self.val.num_entries_deep()
435    }
436}
437
438impl<T: Default> EmptyCheckpoint<T> {
439    pub fn new(val: T) -> Self {
440        Self { val }
441    }
442}
443
444impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
445    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
446        Ok(vec![])
447    }
448
449    fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
450        self.val = T::default();
451        Ok(())
452    }
453}
454
455#[cfg(test)]
456mod test {
457    use std::sync::Arc;
458
459    use feldera_storage::StorageBackend;
460    use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
461    use std::collections::HashSet;
462
463    use crate::storage::backend::posixio_impl::PosixBackend;
464
465    use super::Checkpointer;
466
467    struct Empty;
468    struct MinCheckpoints;
469    struct ExtraCheckpoints;
470
471    struct TestState<S> {
472        checkpointer: Checkpointer,
473        tempdir: tempfile::TempDir,
474        _phantom: std::marker::PhantomData<S>,
475    }
476
477    impl<S> TestState<S> {
478        fn extras(&self) -> Vec<uuid::Uuid> {
479            self.checkpointer
480                .checkpoint_list
481                .iter()
482                .map(|cpm| cpm.uuid)
483                .take(
484                    self.checkpointer
485                        .checkpoint_list
486                        .len()
487                        .saturating_sub(Checkpointer::MIN_CHECKPOINT_THRESHOLD),
488                )
489                .collect()
490        }
491
492        fn oldest_extra(&self) -> uuid::Uuid {
493            self.extras().first().cloned().unwrap()
494        }
495
496        fn newest_extra(&self) -> uuid::Uuid {
497            self.extras().last().cloned().unwrap()
498        }
499    }
500
501    impl TestState<Empty> {
502        fn new() -> Self {
503            let tempdir = tempfile::tempdir().unwrap();
504
505            let backend: Arc<dyn StorageBackend> = Arc::new(PosixBackend::new(
506                tempdir.path(),
507                StorageCacheConfig::default(),
508                &FileBackendConfig::default(),
509            ));
510
511            Self {
512                checkpointer: Checkpointer::new(backend).unwrap(),
513                tempdir,
514                _phantom: std::marker::PhantomData,
515            }
516        }
517
518        fn precondition(&self) {
519            assert_eq!(self.checkpointer.checkpoint_list.len(), 0);
520        }
521
522        fn checkpoint(mut self) -> TestState<MinCheckpoints> {
523            self.precondition();
524
525            for i in 0..Checkpointer::MIN_CHECKPOINT_THRESHOLD {
526                self.checkpointer
527                    .commit(uuid::Uuid::now_v7(), 0, None, Some(i as u64), Some(0))
528                    .unwrap();
529            }
530
531            TestState::<MinCheckpoints> {
532                checkpointer: self.checkpointer,
533                tempdir: self.tempdir,
534                _phantom: std::marker::PhantomData,
535            }
536        }
537    }
538
539    impl TestState<MinCheckpoints> {
540        fn precondition(&self) {
541            assert_eq!(
542                self.checkpointer.checkpoint_list.len(),
543                Checkpointer::MIN_CHECKPOINT_THRESHOLD
544            );
545
546            assert!(self.extras().is_empty());
547        }
548
549        fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
550            self.precondition();
551
552            let uuid = uuid::Uuid::now_v7();
553            self.checkpointer
554                .commit(uuid, 0, None, Some(2), Some(0))
555                .unwrap();
556
557            TestState::<ExtraCheckpoints> {
558                checkpointer: self.checkpointer,
559                tempdir: self.tempdir,
560                _phantom: std::marker::PhantomData,
561            }
562        }
563    }
564
565    impl TestState<ExtraCheckpoints> {
566        fn precondition(&self) {
567            assert!(
568                self.checkpointer.checkpoint_list.len() > Checkpointer::MIN_CHECKPOINT_THRESHOLD
569            );
570
571            assert!(!self.extras().is_empty());
572        }
573
574        fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
575            self.precondition();
576
577            self.checkpointer
578                .commit(uuid::Uuid::now_v7(), 0, None, Some(3), Some(0))
579                .unwrap();
580
581            TestState::<ExtraCheckpoints> {
582                checkpointer: self.checkpointer,
583                tempdir: self.tempdir,
584                _phantom: std::marker::PhantomData,
585            }
586        }
587
588        fn gc(mut self) -> TestState<MinCheckpoints> {
589            self.precondition();
590
591            let removed = self.checkpointer.gc_checkpoint(HashSet::new()).unwrap();
592            assert!(!removed.is_empty());
593
594            TestState::<MinCheckpoints> {
595                checkpointer: self.checkpointer,
596                tempdir: self.tempdir,
597                _phantom: std::marker::PhantomData,
598            }
599        }
600
601        fn gc_with_except(mut self, except: uuid::Uuid) -> TestState<ExtraCheckpoints> {
602            self.precondition();
603
604            self.checkpointer.gc_checkpoint([except].into()).unwrap();
605
606            assert!(self.extras().contains(&except));
607
608            TestState::<ExtraCheckpoints> {
609                checkpointer: self.checkpointer,
610                tempdir: self.tempdir,
611                _phantom: std::marker::PhantomData,
612            }
613        }
614    }
615
616    #[test]
617    fn test_checkpointer() {
618        // Empty checkpointer.
619        let empty_checkpoints = TestState::<Empty>::new();
620
621        // Add minimum number of checkpoints.
622        let min_checkpoints = empty_checkpoints.checkpoint();
623
624        // Add one extra checkpoint.
625        let extra_checkpoints = min_checkpoints.checkpoint();
626
627        // Veify we can GC back to minimum.
628        let min_checkpoints = extra_checkpoints.gc();
629
630        // Add two extra checkpoints.
631        let one_extra = min_checkpoints.checkpoint();
632        let two_extra = one_extra.checkpoint();
633
634        // There should be more than the minimum number of checkpoints.
635        let keep = two_extra.newest_extra();
636
637        // And GC while keeping the newest extra checkpoint.
638        // This should be the only extra checkpoint remaining.
639        let one_extra = two_extra.gc_with_except(keep);
640        assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
641
642        // Add one extra checkpoint.
643        let two_extra = one_extra.checkpoint();
644
645        // Now lets try to GC while keeping the oldest extra checkpoint.
646        let keep = two_extra.oldest_extra();
647        let one_extra = two_extra.gc_with_except(keep);
648        assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
649
650        // Finally, GC back to minimum.
651        let min_checkpoints = one_extra.gc();
652        // Verify that this is a valid minimum checkpoint state.
653        min_checkpoints.precondition();
654    }
655}