iter_log/
lib.rs

1#[cfg(feature = "rayon")]
2use rayon::iter::plumbing::{Consumer, Folder, UnindexedConsumer};
3#[cfg(feature = "rayon")]
4use rayon::prelude::*;
5use std::env::var;
6use std::io::{self, Write};
7use std::sync::OnceLock;
8#[cfg(feature = "rayon")]
9use std::sync::atomic::{AtomicUsize, Ordering};
10#[cfg(feature = "rayon")]
11use std::sync::{Arc, Mutex};
12#[cfg(feature = "time")]
13use std::time::Instant;
14
15fn enable_log() -> &'static bool {
16    static ENABLE_LOG: OnceLock<bool> = OnceLock::new();
17    ENABLE_LOG.get_or_init(|| var("ENABLE_ITER_LOG").is_ok())
18}
19
20const BARS: &str = "--------------------------------------------------";
21static STEP_PERCENT: usize = 2;
22
23/// Extension trait to add a `log_progress` method to regular iterators.
24pub trait LogProgressExt: Iterator + Sized {
25    /// Wraps the iterator with progress logging.
26    ///
27    /// This method will print progress updates every 2% of the way through the iteration.
28    ///
29    /// # Returns
30    ///
31    /// Returns a new `LogProgress` iterator which tracks and logs progress.
32    fn log_progress(self, name: &str) -> LogProgress<Self>;
33}
34
35impl<I> LogProgressExt for I
36where
37    I: Iterator, // Ensure `I` is an iterator
38{
39    fn log_progress(self, name: &str) -> LogProgress<Self> {
40        let total = self.size_hint().1.unwrap_or(0); // Get the total number of items (if possible)
41
42        LogProgress {
43            iter: self,
44            progress: 0,
45            total,
46            name: name.to_string(),
47            #[cfg(feature = "time")]
48            t0: Instant::now(),
49        }
50    }
51}
52
53/// A struct that wraps an iterator and tracks progress.
54pub struct LogProgress<I> {
55    iter: I,
56    progress: usize,
57    total: usize,
58    name: String,
59    #[cfg(feature = "time")]
60    t0: Instant,
61}
62
63impl<I: Iterator> Iterator for LogProgress<I> {
64    type Item = I::Item;
65
66    /// Returns the next item in the iteration, while logging progress at intervals.
67    ///
68    /// The method increments the progress count and checks if the progress reaches a new milestone
69    /// (i.e., a multiple of 2). If so, it logs the progress to the console.
70    ///
71    /// # Returns
72    ///
73    /// Returns the next item in the iterator, or `None` if the iterator is finished.
74    fn next(&mut self) -> Option<Self::Item> {
75        let item = self.iter.next()?;
76        let step = STEP_PERCENT / 2;
77
78        if *enable_log() {
79            if self.progress == 0 {
80                print!(
81                    "+{}+\n| {}{}|\n+{}+\n|",
82                    BARS,
83                    self.name,
84                    " ".repeat(49 - self.name.len()),
85                    BARS,
86                );
87            }
88            self.progress += 1;
89            let old_percent = (self.progress * 100) / self.total;
90            let new_percent = ((self.progress + 1) * 100) / self.total;
91
92            // Log the progress if we hit a new milestone
93            if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
94                print!("{}", "=".repeat(step));
95                io::stdout().flush().unwrap();
96            }
97        }
98
99        Some(item)
100    }
101}
102
103impl<I> LogProgress<I> {
104    fn finish(&self) {
105        if *enable_log() {
106            println!("|\n+{}+", "-".repeat(50));
107            #[cfg(feature = "time")]
108            let elapsed = self.t0.elapsed();
109            #[cfg(feature = "time")]
110            let time_str = format!("{:?}", elapsed);
111            #[cfg(feature = "time")]
112            print!(
113                "| Took {} to complete{}|\n+{}+\n",
114                time_str,
115                " ".repeat(32 - time_str.chars().count()),
116                BARS
117            );
118        }
119    }
120}
121
122impl<I> Drop for LogProgress<I> {
123    fn drop(&mut self) {
124        self.finish();
125    }
126}
127
128/// A struct to handle ordered progress logging during parallel iteration.
129#[cfg(feature = "rayon")]
130struct OrderedLogger {
131    last_logged: AtomicUsize,
132    pending_logs: Mutex<Vec<usize>>,
133    #[cfg(feature = "time")]
134    t0: Instant,
135}
136
137#[cfg(feature = "rayon")]
138impl OrderedLogger {
139    /// Creates a new `OrderedLogger` instance.
140    ///
141    /// This function initializes the logger with a fresh `AtomicUsize` and an empty vector for
142    /// pending progress updates.
143    ///
144    /// # Returns
145    ///
146    /// Returns an `Arc` (thread-safe reference) of the `OrderedLogger`.
147    #[cfg(feature = "time")]
148    fn new(t0: Instant) -> Arc<Self> {
149        Arc::new(Self {
150            last_logged: AtomicUsize::new(0),
151            pending_logs: Mutex::new(Vec::new()),
152            t0,
153        })
154    }
155
156    /// Creates a new `OrderedLogger` instance.
157    ///
158    /// This function initializes the logger with a fresh `AtomicUsize` and an empty vector for
159    /// pending progress updates.
160    ///
161    /// # Returns
162    ///
163    /// Returns an `Arc` (thread-safe reference) of the `OrderedLogger`.
164    #[cfg(not(feature = "time"))]
165    fn new() -> Arc<Self> {
166        Arc::new(Self {
167            last_logged: AtomicUsize::new(0),
168            pending_logs: Mutex::new(Vec::new()),
169        })
170    }
171
172    /// Logs the progress if it matches the expected step and ensures ordered output.
173    ///
174    /// # Arguments
175    ///
176    /// * `progress` - The current progress percentage.
177    ///
178    /// # Notes
179    ///
180    /// This method ensures that progress logs are printed in the correct order, even when the
181    /// parallel tasks report progress asynchronously.
182    fn log_progress(&self, progress: usize) {
183        if *enable_log() {
184            let mut pending = self.pending_logs.lock().unwrap();
185            let step = STEP_PERCENT / 2;
186
187            if progress == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
188                // Print the progress immediately if it's the next expected one
189                print!("{}", "=".repeat(step));
190                io::stdout().flush().unwrap();
191                self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
192
193                // Print any pending logs that can now be processed in order
194                while let Some(&next) = pending.first() {
195                    if next == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
196                        print!("{}", "=".repeat(step));
197                        io::stdout().flush().unwrap();
198                        self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
199                        pending.remove(0);
200                    } else {
201                        break;
202                    }
203                }
204            } else {
205                // If progress is not expected yet, store it for later
206                pending.push(progress);
207                pending.sort_unstable(); // Sort pending progress updates
208            }
209        }
210    }
211
212    fn finish(&self) {
213        if *enable_log() {
214            println!("|\n+{}+", "-".repeat(50));
215            #[cfg(feature = "time")]
216            let elapsed = self.t0.elapsed();
217            #[cfg(feature = "time")]
218            let time_str = format!("{:?}", elapsed);
219            #[cfg(feature = "time")]
220            print!(
221                "| Took {} to complete{}|\n+{}+\n",
222                time_str,
223                " ".repeat(32 - time_str.chars().count()),
224                BARS
225            );
226        }
227    }
228}
229
230#[cfg(feature = "rayon")]
231impl Drop for OrderedLogger {
232    fn drop(&mut self) {
233        self.finish();
234    }
235}
236
237/// A struct that wraps a parallel iterator and tracks progress.
238#[cfg(feature = "rayon")]
239pub struct LogProgressPar<I> {
240    iter: I,
241    progress: Arc<AtomicUsize>,
242    total: usize,
243    logger: Arc<OrderedLogger>,
244}
245
246#[cfg(feature = "rayon")]
247impl<I> ParallelIterator for LogProgressPar<I>
248where
249    I: ParallelIterator,
250{
251    type Item = I::Item;
252
253    /// Drives the parallel iteration using a custom consumer.
254    ///
255    /// This method wraps the original consumer with the `LogProgressConsumer`, which tracks the
256    /// progress and logs it at the specified intervals.
257    ///
258    /// # Arguments
259    ///
260    /// * `consumer` - The base consumer that will process the items of the parallel iterator.
261    ///
262    /// # Returns
263    ///
264    /// Returns the result of the parallel iteration after it is consumed.
265    fn drive_unindexed<C>(self, consumer: C) -> C::Result
266    where
267        C: UnindexedConsumer<Self::Item>,
268    {
269        let wrapped_consumer = LogProgressConsumer {
270            base: consumer,
271            progress: self.progress,
272            total: self.total,
273            logger: self.logger,
274        };
275        self.iter.drive_unindexed(wrapped_consumer)
276    }
277}
278
279/// A consumer for parallel iterations that tracks progress and logs it.
280#[cfg(feature = "rayon")]
281struct LogProgressConsumer<C> {
282    base: C,
283    progress: Arc<AtomicUsize>,
284    total: usize,
285    logger: Arc<OrderedLogger>,
286}
287
288#[cfg(feature = "rayon")]
289impl<C, T> Consumer<T> for LogProgressConsumer<C>
290where
291    C: Consumer<T>,
292{
293    type Folder = LogProgressFolder<C::Folder>;
294    type Reducer = C::Reducer;
295    type Result = C::Result;
296
297    /// Splits the consumer at the specified index and returns two new consumers.
298    ///
299    /// This method ensures that progress tracking is correctly propagated through both consumers.
300    ///
301    /// # Arguments
302    ///
303    /// * `index` - The index at which to split the consumer.
304    ///
305    /// # Returns
306    ///
307    /// Returns two new consumers and a reducer for parallel reduction.
308    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
309        let (left, right, reducer) = self.base.split_at(index);
310        (
311            LogProgressConsumer {
312                base: left,
313                progress: Arc::clone(&self.progress),
314                total: self.total,
315                logger: Arc::clone(&self.logger),
316            },
317            LogProgressConsumer {
318                base: right,
319                progress: Arc::clone(&self.progress),
320                total: self.total,
321                logger: Arc::clone(&self.logger),
322            },
323            reducer,
324        )
325    }
326
327    fn into_folder(self) -> Self::Folder {
328        LogProgressFolder {
329            base: self.base.into_folder(),
330            progress: self.progress,
331            total: self.total,
332            logger: Arc::clone(&self.logger),
333        }
334    }
335
336    fn full(&self) -> bool {
337        self.base.full()
338    }
339}
340
341#[cfg(feature = "rayon")]
342impl<C, T> UnindexedConsumer<T> for LogProgressConsumer<C>
343where
344    C: UnindexedConsumer<T>,
345{
346    fn split_off_left(&self) -> Self {
347        LogProgressConsumer {
348            base: self.base.split_off_left(),
349            progress: Arc::clone(&self.progress),
350            total: self.total,
351            logger: Arc::clone(&self.logger),
352        }
353    }
354
355    fn to_reducer(&self) -> Self::Reducer {
356        self.base.to_reducer()
357    }
358}
359
360/// A folder for processing items in parallel while tracking progress.
361#[cfg(feature = "rayon")]
362struct LogProgressFolder<F> {
363    base: F,
364    progress: Arc<AtomicUsize>,
365    total: usize,
366    logger: Arc<OrderedLogger>,
367}
368
369#[cfg(feature = "rayon")]
370impl<F, T> Folder<T> for LogProgressFolder<F>
371where
372    F: Folder<T>,
373{
374    type Result = F::Result;
375
376    /// Consumes an item and tracks progress.
377    ///
378    /// This method updates the progress counter and logs progress at specified intervals.
379    ///
380    /// # Arguments
381    ///
382    /// * `item` - The item to consume and process.
383    ///
384    /// # Returns
385    ///
386    /// Returns a new `LogProgressFolder` with the updated state.
387    fn consume(self, item: T) -> Self {
388        let old_count = self.progress.fetch_add(1, Ordering::Relaxed);
389        let old_percent = (old_count * 100) / self.total;
390        let new_percent = ((old_count + 1) * 100) / self.total;
391
392        if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
393            let rounded_percent = new_percent - (new_percent % STEP_PERCENT);
394            self.logger.log_progress(rounded_percent);
395        }
396
397        LogProgressFolder {
398            base: self.base.consume(item),
399            progress: self.progress,
400            total: self.total,
401            logger: self.logger,
402        }
403    }
404
405    fn complete(self) -> Self::Result {
406        self.base.complete()
407    }
408
409    fn full(&self) -> bool {
410        self.base.full()
411    }
412}
413
414/// Extension trait to add a `log_progress` method to parallel iterators.
415#[cfg(feature = "rayon")]
416pub trait LogProgressParExt: Sized + ParallelIterator {
417    /// Wraps the parallel iterator with progress logging.
418    ///
419    /// This method will print progress updates every 2% of the way through the iteration.
420    ///
421    /// # Returns
422    ///
423    /// Returns a new `LogProgressPar` iterator which tracks and logs progress.
424    fn log_progress(self, name: &str) -> LogProgressPar<Self>;
425}
426
427#[cfg(feature = "rayon")]
428impl<I> LogProgressParExt for I
429where
430    I: ParallelIterator + IndexedParallelIterator,
431{
432    fn log_progress(self, name: &str) -> LogProgressPar<Self> {
433        let total = self.len(); // Get the total number of items
434        #[cfg(feature = "time")]
435        let t0 = Instant::now();
436        #[cfg(feature = "time")]
437        let logger = OrderedLogger::new(t0); // Create the logger
438        #[cfg(not(feature = "time"))]
439        let logger = OrderedLogger::new(); // Create the logger
440
441        if *enable_log() {
442            print!(
443                "+{}+\n| {}{}|\n+{}+\n|",
444                BARS,
445                name,
446                " ".repeat(49 - name.len()),
447                BARS,
448            );
449        }
450
451        LogProgressPar {
452            iter: self,
453            progress: Arc::new(AtomicUsize::new(0)),
454            total,
455            logger,
456        }
457    }
458}
459
460/// A long computation function to simulate more expensive work per item.
461pub fn long_computation(x: u32) -> u32 {
462    // Simulate a heavy calculation by introducing a delay
463    let mut result = x;
464    for _ in 0..1_000_000 {
465        result = result.saturating_mul(2);
466    }
467    result
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_long_computation() {
476        assert_eq!(long_computation(2), 4294967295); // Expected result
477    }
478}