Skip to main content

diskann_disk/build/chunking/continuation/
utils.rs

1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5
6use std::{error::Error, thread::sleep};
7
8use tracing::info;
9
10use super::continuation_tracker::{ContinuationGrant, ContinuationTrackerTrait};
11use crate::build::chunking::checkpoint::Progress;
12
13/// This takes an operation with an iterator of oprands,
14/// and processes the oprands using the operation in a loop,
15/// until the continuation_checker asks it to stop.
16/// The continuation_checker is used to get continuation grants between processing each operation.
17/// The clean_up function is called after the loop is broken and before exit.
18/// The function returns a Progress enum, which indicates the number of operations executed.
19pub fn process_while_resource_is_available<Action, ParamIter, Param, E>(
20    mut action: Action,
21    params: ParamIter,
22    continuation_checker: Box<dyn ContinuationTrackerTrait>,
23) -> Result<Progress, E>
24where
25    ParamIter: Iterator<Item = Param>,
26    Action: FnMut(Param) -> Result<(), E>,
27    E: Error,
28{
29    for (idx, param) in params.enumerate() {
30        loop {
31            match continuation_checker.get_continuation_grant() {
32                ContinuationGrant::Continue => {
33                    info!("Continue processing.");
34                    action(param)?;
35                    break;
36                }
37                ContinuationGrant::Yield(duration) => {
38                    info!(
39                        "Continuation checker asks to yield for {} ms.",
40                        duration.as_millis()
41                    );
42                    sleep(duration);
43                }
44                ContinuationGrant::Stop => {
45                    info!("Continuation checker asks to stop. Breaking the loop.");
46                    return Ok(Progress::Processed(idx));
47                }
48            }
49        }
50    }
51
52    Ok(Progress::Completed)
53}
54
55/// Asynchronous version of [`process_while_resource_is_available`].
56///
57/// Takes an async operation with an iterator of operands and processes them in a loop
58/// until the continuation_checker signals to stop.
59pub async fn process_while_resource_is_available_async<Action, ParamIter, Param, Fut, E>(
60    mut action: Action,
61    params: ParamIter,
62    continuation_checker: Box<dyn ContinuationTrackerTrait>,
63) -> Result<Progress, E>
64where
65    ParamIter: Iterator<Item = Param>,
66    Action: FnMut(Param) -> Fut,
67    Fut: core::future::Future<Output = Result<(), E>>,
68    E: Error,
69{
70    for (idx, param) in params.enumerate() {
71        loop {
72            match continuation_checker.get_continuation_grant() {
73                ContinuationGrant::Continue => {
74                    info!("Continue processing.");
75                    action(param).await?;
76                    break;
77                }
78                ContinuationGrant::Yield(duration) => {
79                    info!(
80                        "Continuation checker asks to yield for {} ms.",
81                        duration.as_millis()
82                    );
83                    sleep(duration);
84                }
85                ContinuationGrant::Stop => {
86                    info!("Continuation checker asks to stop. Breaking the loop.");
87                    return Ok(Progress::Processed(idx));
88                }
89            }
90        }
91    }
92
93    Ok(Progress::Completed)
94}