1use enum_iterator::Sequence;
2use std::any::TypeId;
3use std::borrow::Cow;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::{Arc, RwLock};
7use std::time::{Duration, Instant};
8
9use indexmap::IndexMap;
10use itertools::Itertools;
11use serde::Serialize;
12use utoipa::ToSchema;
13
14pub trait Step: 'static + Send + Sync {
15 fn name(&self) -> Cow<'static, str>;
16 fn current(&self) -> u32;
17 fn total(&self) -> u32;
18}
19
20#[derive(Clone, Default)]
21pub struct Progress {
22 steps: Arc<RwLock<InnerProgress>>,
23}
24
25#[derive(Default)]
26struct InnerProgress {
27 steps: Vec<(TypeId, Box<dyn Step>, Instant)>,
29 durations: Vec<(String, Duration)>,
31}
32
33impl Progress {
34 pub fn update_progress<P: Step>(&self, sub_progress: P) {
35 let mut inner = self.steps.write().unwrap();
36 let InnerProgress { steps, durations } = &mut *inner;
37
38 let now = Instant::now();
39 let step_type = TypeId::of::<P>();
40 if let Some(idx) = steps.iter().position(|(id, _, _)| *id == step_type) {
41 push_steps_durations(steps, durations, now, idx);
42 steps.truncate(idx);
43 }
44
45 steps.push((step_type, Box::new(sub_progress), now));
46 }
47
48 pub fn as_progress_view(&self) -> ProgressView {
50 let inner = self.steps.read().unwrap();
51 let InnerProgress { steps, .. } = &*inner;
52
53 let mut percentage = 0.0;
54 let mut prev_factors = 1.0;
55
56 let mut step_view = Vec::with_capacity(steps.len());
57 for (_, step, _) in steps.iter() {
58 prev_factors *= step.total() as f32;
59 percentage += step.current() as f32 / prev_factors;
60
61 step_view.push(ProgressStepView {
62 current_step: step.name(),
63 finished: step.current(),
64 total: step.total(),
65 });
66 }
67
68 ProgressView { steps: step_view, percentage: percentage * 100.0 }
69 }
70
71 pub fn accumulated_durations(&self) -> IndexMap<String, String> {
72 let mut inner = self.steps.write().unwrap();
73 let InnerProgress { steps, durations, .. } = &mut *inner;
74
75 let now = Instant::now();
76 push_steps_durations(steps, durations, now, 0);
77
78 durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
79 }
80
81 pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
83 self.update_progress(progress.main);
84 if let Some(sub) = progress.sub {
85 self.update_progress(sub);
86 }
87 }
88}
89
90fn push_steps_durations(
92 steps: &[(TypeId, Box<dyn Step>, Instant)],
93 durations: &mut Vec<(String, Duration)>,
94 now: Instant,
95 idx: usize,
96) {
97 for (i, (_, _, started_at)) in steps.iter().skip(idx).enumerate().rev() {
98 let full_name = steps.iter().take(idx + i + 1).map(|(_, s, _)| s.name()).join(" > ");
99 durations.push((full_name, now.duration_since(*started_at)));
100 }
101}
102
103pub trait NamedStep: 'static + Send + Sync + Default {
107 fn name(&self) -> &'static str;
108}
109
110pub struct AtomicSubStep<Name: NamedStep> {
115 unit_name: Name,
116 current: Arc<AtomicU32>,
117 total: u32,
118}
119
120impl<Name: NamedStep> AtomicSubStep<Name> {
121 pub fn new(total: u32) -> (Arc<AtomicU32>, Self) {
122 let current = Arc::new(AtomicU32::new(0));
123 (current.clone(), Self { current, total, unit_name: Name::default() })
124 }
125}
126
127impl<Name: NamedStep> Step for AtomicSubStep<Name> {
128 fn name(&self) -> Cow<'static, str> {
129 self.unit_name.name().into()
130 }
131
132 fn current(&self) -> u32 {
133 self.current.load(Ordering::Relaxed)
134 }
135
136 fn total(&self) -> u32 {
137 self.total
138 }
139}
140
141#[doc(hidden)]
142pub use convert_case as _private_convert_case;
143#[doc(hidden)]
144pub use enum_iterator as _private_enum_iterator;
145
146#[macro_export]
147macro_rules! make_enum_progress {
148 ($visibility:vis enum $name:ident { $($variant:ident,)+ }) => {
149 #[repr(u8)]
150 #[derive(Debug, Clone, Copy, PartialEq, Eq, $crate::progress::_private_enum_iterator::Sequence)]
151 #[allow(clippy::enum_variant_names)]
152 $visibility enum $name {
153 $($variant),+
154 }
155
156 impl $crate::progress::Step for $name {
157 fn name(&self) -> std::borrow::Cow<'static, str> {
158 use $crate::progress::_private_convert_case::Casing;
159
160 match self {
161 $(
162 $name::$variant => stringify!($variant).from_case(convert_case::Case::Camel).to_case(convert_case::Case::Lower).into()
163 ),+
164 }
165 }
166
167 fn current(&self) -> u32 {
168 *self as u32
169 }
170
171 fn total(&self) -> u32 {
172 use $crate::progress::_private_enum_iterator::Sequence;
173 Self::CARDINALITY as u32
174 }
175 }
176 };
177}
178
179#[macro_export]
180macro_rules! make_atomic_progress {
181 ($struct_name:ident alias $atomic_struct_name:ident => $step_name:literal) => {
182 #[derive(Default, Debug, Clone, Copy)]
183 pub struct $struct_name {}
184 impl NamedStep for $struct_name {
185 fn name(&self) -> &'static str {
186 $step_name
187 }
188 }
189 pub type $atomic_struct_name = AtomicSubStep<$struct_name>;
190 };
191}
192
193make_atomic_progress!(Document alias AtomicDocumentStep => "document");
194make_atomic_progress!(Payload alias AtomicPayloadStep => "payload");
195
196make_enum_progress! {
197 pub enum MergingWordCache {
198 WordDocids,
199 WordFieldIdDocids,
200 ExactWordDocids,
201 WordPositionDocids,
202 FieldIdWordCountDocids,
203 }
204}
205
206#[derive(Debug, Serialize, Clone, ToSchema)]
207#[serde(rename_all = "camelCase")]
208#[schema(rename_all = "camelCase")]
209pub struct ProgressView {
210 pub steps: Vec<ProgressStepView>,
211 pub percentage: f32,
212}
213
214#[derive(Debug, Serialize, Clone, ToSchema)]
215#[serde(rename_all = "camelCase")]
216#[schema(rename_all = "camelCase")]
217pub struct ProgressStepView {
218 pub current_step: Cow<'static, str>,
219 pub finished: u32,
220 pub total: u32,
221}
222
223pub struct VariableNameStep<U: Send + Sync + 'static> {
235 name: String,
236 current: u32,
237 total: u32,
238 phantom: PhantomData<U>,
239}
240
241impl<U: Send + Sync + 'static> VariableNameStep<U> {
242 pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
243 Self { name: name.into(), current, total, phantom: PhantomData }
244 }
245}
246
247impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
248 fn name(&self) -> Cow<'static, str> {
249 self.name.clone().into()
250 }
251
252 fn current(&self) -> u32 {
253 self.current
254 }
255
256 fn total(&self) -> u32 {
257 self.total
258 }
259}
260
261impl Step for arroy::MainStep {
262 fn name(&self) -> Cow<'static, str> {
263 match self {
264 arroy::MainStep::PreProcessingTheItems => "pre processing the items",
265 arroy::MainStep::WritingTheDescendantsAndMetadata => {
266 "writing the descendants and metadata"
267 }
268 arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
269 arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
270 arroy::MainStep::UpdatingTheTrees => "updating the trees",
271 arroy::MainStep::CreateNewTrees => "create new trees",
272 arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
273 arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
274 arroy::MainStep::WriteTheMetadata => "write the metadata",
275 }
276 .into()
277 }
278
279 fn current(&self) -> u32 {
280 *self as u32
281 }
282
283 fn total(&self) -> u32 {
284 Self::CARDINALITY as u32
285 }
286}
287
288impl Step for arroy::SubStep {
289 fn name(&self) -> Cow<'static, str> {
290 self.unit.into()
291 }
292
293 fn current(&self) -> u32 {
294 self.current.load(Ordering::Relaxed)
295 }
296
297 fn total(&self) -> u32 {
298 self.max
299 }
300}