1use crate::dynamic::{self, data::DataTyped};
4use crate::storage::file::SerializerInner;
5use crate::{Error, NumEntries, TypedBox};
6use feldera_types::checkpoint::CheckpointMetadata;
7use feldera_types::constants::{
8 ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME,
9 DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE,
10};
11use itertools::Itertools;
12use size_of::SizeOf;
13
14use std::io::ErrorKind;
15use std::sync::atomic::Ordering;
16use std::{
17 collections::{HashSet, VecDeque},
18 sync::Arc,
19};
20
21use feldera_storage::error::StorageError;
22use feldera_storage::fbuf::FBuf;
23use feldera_storage::{StorageBackend, StorageFileType, StoragePath};
24use uuid::Uuid;
25
26use super::RuntimeError;
27
28#[derive(derive_more::Debug, Clone)]
34pub struct Checkpointer {
35 #[debug(skip)]
36 backend: Arc<dyn StorageBackend>,
37 checkpoint_list: VecDeque<CheckpointMetadata>,
38}
39
40impl Checkpointer {
41 pub(super) const MIN_CHECKPOINT_THRESHOLD: usize = 2;
43
44 pub fn new(backend: Arc<dyn StorageBackend>) -> Result<Self, Error> {
47 let checkpoint_list = Self::read_checkpoints(&*backend)?;
48
49 let this = Checkpointer {
50 backend,
51 checkpoint_list,
52 };
53
54 this.init_storage()?;
55
56 Ok(this)
57 }
58
59 pub fn verify_fingerprint(&self, fingerprint: u64) -> Result<(), Error> {
61 if self
62 .checkpoint_list
63 .iter()
64 .any(|cpm| cpm.fingerprint != fingerprint)
65 {
66 Err(Error::Runtime(RuntimeError::IncompatibleStorage))
67 } else {
68 Ok(())
69 }
70 }
71
72 fn init_storage(&self) -> Result<(), Error> {
73 let usage = self.gc_startup()?;
74
75 self.backend.usage().store(usage as i64, Ordering::Relaxed);
78
79 Ok(())
80 }
81
82 pub(super) fn measure_checkpoint_storage_use(&self, uuid: uuid::Uuid) -> Result<u64, Error> {
83 let mut usage = 0;
84 StorageError::ignore_notfound(self.backend.list(
85 &Self::checkpoint_dir(uuid),
86 &mut |_path, file_type| {
87 if let StorageFileType::File { size } = file_type {
88 usage += size;
89 }
90 },
91 ))?;
92 Ok(usage)
93 }
94
95 pub(super) fn gather_batches_for_checkpoint(
96 &self,
97 cpm: &CheckpointMetadata,
98 ) -> Result<HashSet<StoragePath>, StorageError> {
99 self.backend.gather_batches_for_checkpoint(cpm)
100 }
101
102 pub fn gc_startup(&self) -> Result<u64, Error> {
105 let mut in_use_paths: HashSet<StoragePath> = HashSet::new();
107 in_use_paths.insert(CHECKPOINT_FILE_NAME.into());
108 in_use_paths.insert(STEPS_FILE.into());
109 in_use_paths.insert(STATE_FILE.into());
110 in_use_paths.insert(STATUS_FILE.into());
114 in_use_paths.insert(format!("{}.mut", STATUS_FILE).into());
115 in_use_paths.insert(ADHOC_TEMP_DIR.into());
116 in_use_paths.insert(ACTIVATION_MARKER_FILE.into());
117 for cpm in self.checkpoint_list.iter() {
118 in_use_paths.insert(cpm.uuid.to_string().into());
119 let batches = self
120 .gather_batches_for_checkpoint(cpm)
121 .expect("Batches for a checkpoint should be discoverable");
122 for batch in batches {
123 in_use_paths.insert(batch);
124 }
125 }
126 in_use_paths.insert("coordinator".into());
128
129 fn is_feldera_filename(path: &StoragePath) -> bool {
131 path.extension()
132 .is_some_and(|extension| DBSP_FILE_EXTENSION.contains(&extension))
133 }
134
135 let mut usage = 0;
137 self.backend.list(&StoragePath::default(), &mut |path, file_type| {
138 if !in_use_paths.contains(path) && (is_feldera_filename(path) || file_type == StorageFileType::Directory) {
139 match self.backend.delete_recursive(path) {
140 Ok(_) => {
141 tracing::debug!("Removed unused {file_type:?} '{path}'");
142 }
143 Err(e) => {
144 tracing::warn!("Unable to remove old-checkpoint file {path}: {e} (the pipeline will try to delete the file again on a restart)");
145 }
146 }
147 } else if let StorageFileType::File { size } = file_type {
148 usage += size;
149 }
150 })?;
151
152 Ok(usage)
153 }
154
155 pub(super) fn checkpoint_dir(uuid: Uuid) -> StoragePath {
156 uuid.to_string().into()
157 }
158
159 pub(super) fn commit(
160 &mut self,
161 uuid: Uuid,
162 fingerprint: u64,
163 identifier: Option<String>,
164 steps: Option<u64>,
165 processed_records: Option<u64>,
166 ) -> Result<CheckpointMetadata, Error> {
167 self.backend
170 .write(&Self::checkpoint_dir(uuid).child("CHECKPOINT"), FBuf::new())?;
171
172 let mut md = CheckpointMetadata {
173 uuid,
174 identifier,
175 fingerprint,
176 size: None,
177 processed_records,
178 steps,
179 };
180
181 let batches = self.gather_batches_for_checkpoint(&md)?;
182
183 self.backend
184 .write_json(
185 &Self::checkpoint_dir(uuid).child(CHECKPOINT_DEPENDENCIES),
186 &batches.into_iter().map(|p| p.to_string()).collect_vec(),
187 )
188 .and_then(|reader| reader.commit())?;
189
190 md.size = Some(self.measure_checkpoint_storage_use(uuid)?);
191
192 self.checkpoint_list.push_back(md.clone());
193 self.update_checkpoint_file()?;
194 Ok(md)
195 }
196
197 pub(super) fn list_checkpoints(&self) -> Result<Vec<CheckpointMetadata>, Error> {
199 Ok(self.checkpoint_list.clone().into())
200 }
201
202 pub fn read_checkpoints(
204 backend: &dyn StorageBackend,
205 ) -> Result<VecDeque<CheckpointMetadata>, Error> {
206 match backend.read_json(&StoragePath::from(CHECKPOINT_FILE_NAME)) {
207 Ok(checkpoints) => Ok(checkpoints),
208 Err(error) if error.kind() == ErrorKind::NotFound => Ok(VecDeque::new()),
209 Err(error) => Err(error)?,
210 }
211 }
212
213 fn update_checkpoint_file(&self) -> Result<(), Error> {
214 Ok(self
215 .backend
216 .write_json(&CHECKPOINT_FILE_NAME.into(), &self.checkpoint_list)
217 .and_then(|reader| reader.commit())?)
218 }
219
220 fn remove_batch_file(&self, file: &StoragePath) {
222 match self.backend.delete_if_exists(file) {
223 Ok(_) => {
224 tracing::debug!("Removed file {file}");
225 }
226 Err(e) => {
227 tracing::warn!(
228 "Unable to remove old-checkpoint file {file}: {e} (the pipeline will try to delete the file again on a restart)"
229 );
230 }
231 }
232 }
233
234 fn remove_checkpoint_dir(&self, cpm: uuid::Uuid) -> Result<(), Error> {
237 assert_ne!(cpm, Uuid::nil());
238 self.backend.delete_recursive(&cpm.to_string().into())?;
239 Ok(())
240 }
241
242 pub fn gc_checkpoint(
250 &mut self,
251 except: HashSet<uuid::Uuid>,
252 ) -> Result<HashSet<uuid::Uuid>, Error> {
253 if self.checkpoint_list.len() <= Self::MIN_CHECKPOINT_THRESHOLD {
254 return Ok(HashSet::new());
255 }
256
257 let mut batch_files_to_keep: HashSet<_> = except
258 .iter()
259 .filter_map(|uuid| self.backend.gather_batches_for_checkpoint_uuid(*uuid).ok())
260 .flatten()
261 .collect();
262
263 let to_remove: HashSet<_> = self
264 .checkpoint_list
265 .iter()
266 .take(
267 self.checkpoint_list
268 .len()
269 .saturating_sub(Self::MIN_CHECKPOINT_THRESHOLD),
270 )
271 .map(|cpm| cpm.uuid)
272 .filter(|cpm| !except.contains(cpm))
273 .collect();
274
275 self.checkpoint_list
276 .retain(|cpm| !to_remove.contains(&cpm.uuid));
277
278 self.update_checkpoint_file()?;
285
286 self.checkpoint_list
288 .iter()
289 .filter(|c| !except.contains(&c.uuid))
290 .take(1)
291 .filter_map(|c| self.backend.gather_batches_for_checkpoint(c).ok())
292 .for_each(|batches| {
293 for batch in batches {
294 batch_files_to_keep.insert(batch);
295 }
296 });
297
298 for cpm in &to_remove {
299 for batch_file in self
300 .backend
301 .gather_batches_for_checkpoint_uuid(*cpm)?
302 .difference(&batch_files_to_keep)
303 {
304 self.remove_batch_file(batch_file);
305 }
306
307 self.remove_checkpoint_dir(*cpm)?;
308 }
309
310 tracing::info!(
311 "cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}",
312 to_remove.len(),
313 self.checkpoint_list
314 .iter()
315 .map(|cpm| cpm.uuid)
316 .collect::<Vec<_>>()
317 );
318
319 Ok(to_remove)
320 }
321}
322
323pub trait Checkpoint {
328 fn checkpoint(&self) -> Result<Vec<u8>, Error>;
329 fn restore(&mut self, data: &[u8]) -> Result<(), Error>;
330}
331
332impl Checkpoint for isize {
333 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
334 todo!()
335 }
336
337 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
338 todo!()
339 }
340}
341
342impl Checkpoint for usize {
343 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
344 todo!()
345 }
346
347 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
348 todo!()
349 }
350}
351
352impl Checkpoint for i32 {
353 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
354 todo!()
355 }
356
357 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
358 todo!()
359 }
360}
361
362impl<N> Checkpoint for Box<N>
363where
364 N: Checkpoint + ?Sized,
365{
366 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
367 self.as_ref().checkpoint()
368 }
369
370 fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
371 self.as_mut().restore(data)
372 }
373}
374
375impl<T> Checkpoint for Option<T>
376where
377 T: Checkpoint,
378{
379 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
380 todo!()
381 }
382
383 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
384 todo!()
385 }
386}
387
388impl<T, D: ?Sized> Checkpoint for TypedBox<T, D> {
389 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
390 todo!()
391 }
392 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
393 todo!()
394 }
395}
396
397impl Checkpoint for dyn dynamic::data::Data + 'static {
398 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
399 Ok(SerializerInner::to_fbuf_with_thread_local(|s| self.serialize(s)).into_vec())
400 }
401
402 fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
403 unsafe { self.deserialize_from_bytes(data, 0) };
404 Ok(())
405 }
406}
407
408impl Checkpoint for dyn DataTyped<Type = u64> + 'static {
409 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
410 todo!()
411 }
412
413 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
414 todo!()
415 }
416}
417
418#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash, SizeOf)]
419pub struct EmptyCheckpoint<T: Default> {
420 pub val: T,
421}
422
423impl<T> NumEntries for EmptyCheckpoint<T>
424where
425 T: Default + NumEntries,
426{
427 const CONST_NUM_ENTRIES: Option<usize> = T::CONST_NUM_ENTRIES;
428
429 fn num_entries_shallow(&self) -> usize {
430 self.val.num_entries_shallow()
431 }
432
433 fn num_entries_deep(&self) -> usize {
434 self.val.num_entries_deep()
435 }
436}
437
438impl<T: Default> EmptyCheckpoint<T> {
439 pub fn new(val: T) -> Self {
440 Self { val }
441 }
442}
443
444impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
445 fn checkpoint(&self) -> Result<Vec<u8>, Error> {
446 Ok(vec![])
447 }
448
449 fn restore(&mut self, _data: &[u8]) -> Result<(), Error> {
450 self.val = T::default();
451 Ok(())
452 }
453}
454
455#[cfg(test)]
456mod test {
457 use std::sync::Arc;
458
459 use feldera_storage::StorageBackend;
460 use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
461 use std::collections::HashSet;
462
463 use crate::storage::backend::posixio_impl::PosixBackend;
464
465 use super::Checkpointer;
466
467 struct Empty;
468 struct MinCheckpoints;
469 struct ExtraCheckpoints;
470
471 struct TestState<S> {
472 checkpointer: Checkpointer,
473 tempdir: tempfile::TempDir,
474 _phantom: std::marker::PhantomData<S>,
475 }
476
477 impl<S> TestState<S> {
478 fn extras(&self) -> Vec<uuid::Uuid> {
479 self.checkpointer
480 .checkpoint_list
481 .iter()
482 .map(|cpm| cpm.uuid)
483 .take(
484 self.checkpointer
485 .checkpoint_list
486 .len()
487 .saturating_sub(Checkpointer::MIN_CHECKPOINT_THRESHOLD),
488 )
489 .collect()
490 }
491
492 fn oldest_extra(&self) -> uuid::Uuid {
493 self.extras().first().cloned().unwrap()
494 }
495
496 fn newest_extra(&self) -> uuid::Uuid {
497 self.extras().last().cloned().unwrap()
498 }
499 }
500
501 impl TestState<Empty> {
502 fn new() -> Self {
503 let tempdir = tempfile::tempdir().unwrap();
504
505 let backend: Arc<dyn StorageBackend> = Arc::new(PosixBackend::new(
506 tempdir.path(),
507 StorageCacheConfig::default(),
508 &FileBackendConfig::default(),
509 ));
510
511 Self {
512 checkpointer: Checkpointer::new(backend).unwrap(),
513 tempdir,
514 _phantom: std::marker::PhantomData,
515 }
516 }
517
518 fn precondition(&self) {
519 assert_eq!(self.checkpointer.checkpoint_list.len(), 0);
520 }
521
522 fn checkpoint(mut self) -> TestState<MinCheckpoints> {
523 self.precondition();
524
525 for i in 0..Checkpointer::MIN_CHECKPOINT_THRESHOLD {
526 self.checkpointer
527 .commit(uuid::Uuid::now_v7(), 0, None, Some(i as u64), Some(0))
528 .unwrap();
529 }
530
531 TestState::<MinCheckpoints> {
532 checkpointer: self.checkpointer,
533 tempdir: self.tempdir,
534 _phantom: std::marker::PhantomData,
535 }
536 }
537 }
538
539 impl TestState<MinCheckpoints> {
540 fn precondition(&self) {
541 assert_eq!(
542 self.checkpointer.checkpoint_list.len(),
543 Checkpointer::MIN_CHECKPOINT_THRESHOLD
544 );
545
546 assert!(self.extras().is_empty());
547 }
548
549 fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
550 self.precondition();
551
552 let uuid = uuid::Uuid::now_v7();
553 self.checkpointer
554 .commit(uuid, 0, None, Some(2), Some(0))
555 .unwrap();
556
557 TestState::<ExtraCheckpoints> {
558 checkpointer: self.checkpointer,
559 tempdir: self.tempdir,
560 _phantom: std::marker::PhantomData,
561 }
562 }
563 }
564
565 impl TestState<ExtraCheckpoints> {
566 fn precondition(&self) {
567 assert!(
568 self.checkpointer.checkpoint_list.len() > Checkpointer::MIN_CHECKPOINT_THRESHOLD
569 );
570
571 assert!(!self.extras().is_empty());
572 }
573
574 fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
575 self.precondition();
576
577 self.checkpointer
578 .commit(uuid::Uuid::now_v7(), 0, None, Some(3), Some(0))
579 .unwrap();
580
581 TestState::<ExtraCheckpoints> {
582 checkpointer: self.checkpointer,
583 tempdir: self.tempdir,
584 _phantom: std::marker::PhantomData,
585 }
586 }
587
588 fn gc(mut self) -> TestState<MinCheckpoints> {
589 self.precondition();
590
591 let removed = self.checkpointer.gc_checkpoint(HashSet::new()).unwrap();
592 assert!(!removed.is_empty());
593
594 TestState::<MinCheckpoints> {
595 checkpointer: self.checkpointer,
596 tempdir: self.tempdir,
597 _phantom: std::marker::PhantomData,
598 }
599 }
600
601 fn gc_with_except(mut self, except: uuid::Uuid) -> TestState<ExtraCheckpoints> {
602 self.precondition();
603
604 self.checkpointer.gc_checkpoint([except].into()).unwrap();
605
606 assert!(self.extras().contains(&except));
607
608 TestState::<ExtraCheckpoints> {
609 checkpointer: self.checkpointer,
610 tempdir: self.tempdir,
611 _phantom: std::marker::PhantomData,
612 }
613 }
614 }
615
616 #[test]
617 fn test_checkpointer() {
618 let empty_checkpoints = TestState::<Empty>::new();
620
621 let min_checkpoints = empty_checkpoints.checkpoint();
623
624 let extra_checkpoints = min_checkpoints.checkpoint();
626
627 let min_checkpoints = extra_checkpoints.gc();
629
630 let one_extra = min_checkpoints.checkpoint();
632 let two_extra = one_extra.checkpoint();
633
634 let keep = two_extra.newest_extra();
636
637 let one_extra = two_extra.gc_with_except(keep);
640 assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
641
642 let two_extra = one_extra.checkpoint();
644
645 let keep = two_extra.oldest_extra();
647 let one_extra = two_extra.gc_with_except(keep);
648 assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
649
650 let min_checkpoints = one_extra.gc();
652 min_checkpoints.precondition();
654 }
655}