diskann_disk/build/chunking/checkpoint/checkpoint_record_manager.rs
1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5
6use diskann::ANNResult;
7use tracing::info;
8
9use super::{Progress, WorkStage};
10
11/// This trait provides functionalities to get and set checkpoint records
12/// ..for tracking the progress and state in a chunkable index build process.
13/// it needs to be marked as send and sync because it will be used in a multi-threaded environment.
14/// However, during the index build process DiskANN will not have parallel requests trying to access the checkpoint record.
15pub trait CheckpointManager: Send + Sync + CheckpointManagerClone {
16 /// Gets the resumption point for a given work stage.
17 ///
18 /// Returns the offset where processing should resume for the specified stage.
19 /// Returns None if:
20 /// - No checkpoint exists for the stage
21 /// - The stage has already completed
22 fn get_resumption_point(&self, stage: WorkStage) -> ANNResult<Option<usize>>;
23
24 /// Updates the checkpoint record with progress information
25 ///
26 /// # Arguments
27 ///
28 /// * `progress` - The current progress (Completed or Processed amount)
29 /// * `next_stage` - If provided and progress is Completed, advances to this stage.
30 fn update(&mut self, progress: Progress, next_stage: WorkStage) -> ANNResult<()>;
31
32 /// Marks the checkpoint as invalid for current stage.
33 ///
34 /// When a checkpoint is marked as invalid:
35 /// - Future calls to get_resumption_point(curent_stage) will return offset 0
36 /// - This forces processing to restart from the beginning of the stage
37 /// - Protects against partial/incomplete work if a crash occurs
38 fn mark_as_invalid(&mut self) -> ANNResult<()>;
39}
40
41pub trait CheckpointManagerExt {
42 fn execute_stage<F, S, U>(
43 &mut self,
44 stage: WorkStage,
45 next_stage: WorkStage,
46 operation: F,
47 skip_handler: S,
48 ) -> ANNResult<U>
49 where
50 F: FnOnce() -> ANNResult<U>,
51 S: FnOnce() -> ANNResult<U>;
52}
53
54impl<T: ?Sized> CheckpointManagerExt for T
55where
56 T: CheckpointManager,
57{
58 fn execute_stage<F, S, U>(
59 &mut self,
60 stage: WorkStage,
61 next_stage: WorkStage,
62 operation: F,
63 skip_handler: S,
64 ) -> ANNResult<U>
65 where
66 F: FnOnce() -> ANNResult<U>,
67 S: FnOnce() -> ANNResult<U>,
68 {
69 match self.get_resumption_point(stage)? {
70 Some(_) => {
71 let result = operation()?;
72 self.update(Progress::Completed, next_stage)?;
73 Ok(result)
74 }
75 None => {
76 info!("[Stage:{:?}] Skip stage - invalid checkpoint", stage);
77 skip_handler()
78 }
79 }
80 }
81}
82
83/// This trait is used to clone the Box<dyn CheckpointRecordManager>
84pub trait CheckpointManagerClone {
85 fn clone_box(&self) -> Box<dyn CheckpointManager>;
86}
87
88impl<T> CheckpointManagerClone for T
89where
90 T: 'static + CheckpointManager + Clone,
91{
92 fn clone_box(&self) -> Box<dyn CheckpointManager> {
93 Box::new(self.clone())
94 }
95}