milli_core/
progress.rs

1use enum_iterator::Sequence;
2use std::any::TypeId;
3use std::borrow::Cow;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::{Arc, RwLock};
7use std::time::{Duration, Instant};
8
9use indexmap::IndexMap;
10use itertools::Itertools;
11use serde::Serialize;
12use utoipa::ToSchema;
13
14pub trait Step: 'static + Send + Sync {
15    fn name(&self) -> Cow<'static, str>;
16    fn current(&self) -> u32;
17    fn total(&self) -> u32;
18}
19
20#[derive(Clone, Default)]
21pub struct Progress {
22    steps: Arc<RwLock<InnerProgress>>,
23}
24
25#[derive(Default)]
26struct InnerProgress {
27    /// The hierarchy of steps.
28    steps: Vec<(TypeId, Box<dyn Step>, Instant)>,
29    /// The durations associated to each steps.
30    durations: Vec<(String, Duration)>,
31}
32
33impl Progress {
34    pub fn update_progress<P: Step>(&self, sub_progress: P) {
35        let mut inner = self.steps.write().unwrap();
36        let InnerProgress { steps, durations } = &mut *inner;
37
38        let now = Instant::now();
39        let step_type = TypeId::of::<P>();
40        if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) {
41            push_steps_durations(steps, durations, now, idx);
42            steps.truncate(idx);
43        }
44
45        steps.push((step_type, Box::new(sub_progress), now));
46    }
47
48    // TODO: This code should be in meilisearch_types but cannot because milli can't depend on meilisearch_types
49    pub fn as_progress_view(&self) -> ProgressView {
50        let inner = self.steps.read().unwrap();
51        let InnerProgress { steps, .. } = &*inner;
52
53        let mut percentage = 0.0;
54        let mut prev_factors = 1.0;
55
56        let mut step_view = Vec::with_capacity(steps.len());
57        for (_, step, _) in steps.iter() {
58            prev_factors *= step.total() as f32;
59            percentage += step.current() as f32 / prev_factors;
60
61            step_view.push(ProgressStepView {
62                current_step: step.name(),
63                finished: step.current(),
64                total: step.total(),
65            });
66        }
67
68        ProgressView { steps: step_view, percentage: percentage * 100.0 }
69    }
70
71    pub fn accumulated_durations(&self) -> IndexMap<String, String> {
72        let mut inner = self.steps.write().unwrap();
73        let InnerProgress { steps, durations, .. } = &mut *inner;
74
75        let now = Instant::now();
76        push_steps_durations(steps, durations, now, 0);
77
78        durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
79    }
80
81    // TODO: ideally we should expose the progress in a way that let arroy use it directly
82    pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
83        self.update_progress(progress.main);
84        if let Some(sub) = progress.sub {
85            self.update_progress(sub);
86        }
87    }
88}
89
90/// Generate the names associated with the durations and push them.
91fn push_steps_durations(
92    steps: &[(TypeId, Box<dyn Step>, Instant)],
93    durations: &mut Vec<(String, Duration)>,
94    now: Instant,
95    idx: usize,
96) {
97    for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate().rev() {
98        let full_name = steps.iter().take(idx + i + 1).map(|(_, s, _)| s.name()).join(" > ");
99        durations.push((full_name, now.duration_since(*started_at)));
100    }
101}
102
103/// This trait lets you use the AtomicSubStep defined right below.
104/// The name must be a const that never changed but that can't be enforced by the type system because it make the trait non object-safe.
105/// By forcing the Default trait + the &'static str we make it harder to miss-use the trait.
106pub trait NamedStep: 'static + Send + Sync + Default {
107    fn name(&self) -> &'static str;
108}
109
110/// Structure to quickly define steps that need very quick, lockless updating of their current step.
111/// You can use this struct if:
112/// - The name of the step doesn't change
113/// - The total number of steps doesn't change
114pub struct AtomicSubStep<Name: NamedStep> {
115    unit_name: Name,
116    current: Arc<AtomicU32>,
117    total: u32,
118}
119
120impl<Name: NamedStep> AtomicSubStep<Name> {
121    pub fn new(total: u32) -> (Arc<AtomicU32>, Self) {
122        let current = Arc::new(AtomicU32::new(0));
123        (current.clone(), Self { current, total, unit_name: Name::default() })
124    }
125}
126
127impl<Name: NamedStep> Step for AtomicSubStep<Name> {
128    fn name(&self) -> Cow<'static, str> {
129        self.unit_name.name().into()
130    }
131
132    fn current(&self) -> u32 {
133        self.current.load(Ordering::Relaxed)
134    }
135
136    fn total(&self) -> u32 {
137        self.total
138    }
139}
140
141#[doc(hidden)]
142pub use convert_case as _private_convert_case;
143#[doc(hidden)]
144pub use enum_iterator as _private_enum_iterator;
145
146#[macro_export]
147macro_rules! make_enum_progress {
148    ($visibility:vis enum $name:ident { $($variant:ident,)+ }) => {
149        #[repr(u8)]
150        #[derive(Debug, Clone, Copy, PartialEq, Eq, $crate::progress::_private_enum_iterator::Sequence)]
151        #[allow(clippy::enum_variant_names)]
152        $visibility enum $name {
153            $($variant),+
154        }
155
156        impl $crate::progress::Step for $name {
157            fn name(&self) -> std::borrow::Cow<'static, str> {
158                use $crate::progress::_private_convert_case::Casing;
159
160                match self {
161                    $(
162                        $name::$variant => stringify!($variant).from_case(convert_case::Case::Camel).to_case(convert_case::Case::Lower).into()
163                    ),+
164                }
165            }
166
167            fn current(&self) -> u32 {
168                *self as u32
169            }
170
171            fn total(&self) -> u32 {
172                use $crate::progress::_private_enum_iterator::Sequence;
173                Self::CARDINALITY as u32
174            }
175        }
176    };
177}
178
179#[macro_export]
180macro_rules! make_atomic_progress {
181    ($struct_name:ident alias $atomic_struct_name:ident => $step_name:literal) => {
182        #[derive(Default, Debug, Clone, Copy)]
183        pub struct $struct_name {}
184        impl NamedStep for $struct_name {
185            fn name(&self) -> &'static str {
186                $step_name
187            }
188        }
189        pub type $atomic_struct_name = AtomicSubStep<$struct_name>;
190    };
191}
192
193make_atomic_progress!(Document alias AtomicDocumentStep => "document");
194make_atomic_progress!(Payload alias AtomicPayloadStep => "payload");
195
196make_enum_progress! {
197    pub enum MergingWordCache {
198        WordDocids,
199        WordFieldIdDocids,
200        ExactWordDocids,
201        WordPositionDocids,
202        FieldIdWordCountDocids,
203    }
204}
205
206#[derive(Debug, Serialize, Clone, ToSchema)]
207#[serde(rename_all = "camelCase")]
208#[schema(rename_all = "camelCase")]
209pub struct ProgressView {
210    pub steps: Vec<ProgressStepView>,
211    pub percentage: f32,
212}
213
214#[derive(Debug, Serialize, Clone, ToSchema)]
215#[serde(rename_all = "camelCase")]
216#[schema(rename_all = "camelCase")]
217pub struct ProgressStepView {
218    pub current_step: Cow<'static, str>,
219    pub finished: u32,
220    pub total: u32,
221}
222
223/// Used when the name can change but it's still the same step.
224/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step:
225/// ```text
226/// enum UpgradeVersion {}
227///
228/// progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
229///     "v1 to v2",
230///     0,
231///     10,
232/// ));
233/// ```
234pub struct VariableNameStep<U: Send + Sync + 'static> {
235    name: String,
236    current: u32,
237    total: u32,
238    phantom: PhantomData<U>,
239}
240
241impl<U: Send + Sync + 'static> VariableNameStep<U> {
242    pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
243        Self { name: name.into(), current, total, phantom: PhantomData }
244    }
245}
246
247impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
248    fn name(&self) -> Cow<'static, str> {
249        self.name.clone().into()
250    }
251
252    fn current(&self) -> u32 {
253        self.current
254    }
255
256    fn total(&self) -> u32 {
257        self.total
258    }
259}
260
261impl Step for arroy::MainStep {
262    fn name(&self) -> Cow<'static, str> {
263        match self {
264            arroy::MainStep::PreProcessingTheItems => "pre processing the items",
265            arroy::MainStep::WritingTheDescendantsAndMetadata => {
266                "writing the descendants and metadata"
267            }
268            arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
269            arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
270            arroy::MainStep::UpdatingTheTrees => "updating the trees",
271            arroy::MainStep::CreateNewTrees => "create new trees",
272            arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
273            arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
274            arroy::MainStep::WriteTheMetadata => "write the metadata",
275        }
276        .into()
277    }
278
279    fn current(&self) -> u32 {
280        *self as u32
281    }
282
283    fn total(&self) -> u32 {
284        Self::CARDINALITY as u32
285    }
286}
287
288impl Step for arroy::SubStep {
289    fn name(&self) -> Cow<'static, str> {
290        self.unit.into()
291    }
292
293    fn current(&self) -> u32 {
294        self.current.load(Ordering::Relaxed)
295    }
296
297    fn total(&self) -> u32 {
298        self.max
299    }
300}