diskann_disk/build/chunking/continuation/
utils.rs1use std::{error::Error, thread::sleep};
7
8use tracing::info;
9
10use super::continuation_tracker::{ContinuationGrant, ContinuationTrackerTrait};
11use crate::build::chunking::checkpoint::Progress;
12
13pub fn process_while_resource_is_available<Action, ParamIter, Param, E>(
20 mut action: Action,
21 params: ParamIter,
22 continuation_checker: Box<dyn ContinuationTrackerTrait>,
23) -> Result<Progress, E>
24where
25 ParamIter: Iterator<Item = Param>,
26 Action: FnMut(Param) -> Result<(), E>,
27 E: Error,
28{
29 for (idx, param) in params.enumerate() {
30 loop {
31 match continuation_checker.get_continuation_grant() {
32 ContinuationGrant::Continue => {
33 info!("Continue processing.");
34 action(param)?;
35 break;
36 }
37 ContinuationGrant::Yield(duration) => {
38 info!(
39 "Continuation checker asks to yield for {} ms.",
40 duration.as_millis()
41 );
42 sleep(duration);
43 }
44 ContinuationGrant::Stop => {
45 info!("Continuation checker asks to stop. Breaking the loop.");
46 return Ok(Progress::Processed(idx));
47 }
48 }
49 }
50 }
51
52 Ok(Progress::Completed)
53}
54
55pub async fn process_while_resource_is_available_async<Action, ParamIter, Param, Fut, E>(
60 mut action: Action,
61 params: ParamIter,
62 continuation_checker: Box<dyn ContinuationTrackerTrait>,
63) -> Result<Progress, E>
64where
65 ParamIter: Iterator<Item = Param>,
66 Action: FnMut(Param) -> Fut,
67 Fut: core::future::Future<Output = Result<(), E>>,
68 E: Error,
69{
70 for (idx, param) in params.enumerate() {
71 loop {
72 match continuation_checker.get_continuation_grant() {
73 ContinuationGrant::Continue => {
74 info!("Continue processing.");
75 action(param).await?;
76 break;
77 }
78 ContinuationGrant::Yield(duration) => {
79 info!(
80 "Continuation checker asks to yield for {} ms.",
81 duration.as_millis()
82 );
83 sleep(duration);
84 }
85 ContinuationGrant::Stop => {
86 info!("Continuation checker asks to stop. Breaking the loop.");
87 return Ok(Progress::Processed(idx));
88 }
89 }
90 }
91 }
92
93 Ok(Progress::Completed)
94}