Skip to main content

diskann_disk/build/chunking/checkpoint/
checkpoint_record_manager.rs

1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5
6use diskann::ANNResult;
7use tracing::info;
8
9use super::{Progress, WorkStage};
10
11/// This trait provides functionalities to get and set checkpoint records
12/// ..for tracking the progress and state in a chunkable index build process.
13/// it needs to be marked as send and sync because it will be used in a multi-threaded environment.
14/// However, during the index build process DiskANN will not have parallel requests trying to access the checkpoint record.
15pub trait CheckpointManager: Send + Sync + CheckpointManagerClone {
16    /// Gets the resumption point for a given work stage.
17    ///
18    /// Returns the offset where processing should resume for the specified stage.
19    /// Returns None if:
20    /// - No checkpoint exists for the stage
21    /// - The stage has already completed
22    fn get_resumption_point(&self, stage: WorkStage) -> ANNResult<Option<usize>>;
23
24    /// Updates the checkpoint record with progress information
25    ///
26    /// # Arguments
27    ///
28    /// * `progress` - The current progress (Completed or Processed amount)
29    /// * `next_stage` - If provided and progress is Completed, advances to this stage.
30    fn update(&mut self, progress: Progress, next_stage: WorkStage) -> ANNResult<()>;
31
32    /// Marks the checkpoint as invalid for current stage.
33    ///
34    /// When a checkpoint is marked as invalid:
35    /// - Future calls to get_resumption_point(curent_stage) will return offset 0
36    /// - This forces processing to restart from the beginning of the stage
37    /// - Protects against partial/incomplete work if a crash occurs
38    fn mark_as_invalid(&mut self) -> ANNResult<()>;
39}
40
41pub trait CheckpointManagerExt {
42    fn execute_stage<F, S, U>(
43        &mut self,
44        stage: WorkStage,
45        next_stage: WorkStage,
46        operation: F,
47        skip_handler: S,
48    ) -> ANNResult<U>
49    where
50        F: FnOnce() -> ANNResult<U>,
51        S: FnOnce() -> ANNResult<U>;
52}
53
54impl<T: ?Sized> CheckpointManagerExt for T
55where
56    T: CheckpointManager,
57{
58    fn execute_stage<F, S, U>(
59        &mut self,
60        stage: WorkStage,
61        next_stage: WorkStage,
62        operation: F,
63        skip_handler: S,
64    ) -> ANNResult<U>
65    where
66        F: FnOnce() -> ANNResult<U>,
67        S: FnOnce() -> ANNResult<U>,
68    {
69        match self.get_resumption_point(stage)? {
70            Some(_) => {
71                let result = operation()?;
72                self.update(Progress::Completed, next_stage)?;
73                Ok(result)
74            }
75            None => {
76                info!("[Stage:{:?}] Skip stage - invalid checkpoint", stage);
77                skip_handler()
78            }
79        }
80    }
81}
82
83/// This trait is used to clone the Box<dyn CheckpointRecordManager>
84pub trait CheckpointManagerClone {
85    fn clone_box(&self) -> Box<dyn CheckpointManager>;
86}
87
88impl<T> CheckpointManagerClone for T
89where
90    T: 'static + CheckpointManager + Clone,
91{
92    fn clone_box(&self) -> Box<dyn CheckpointManager> {
93        Box::new(self.clone())
94    }
95}