iter_log/
lib.rs

1#[cfg(feature = "rayon")]
2use rayon::iter::plumbing::{Consumer, Folder, UnindexedConsumer};
3#[cfg(feature = "rayon")]
4use rayon::prelude::*;
5use std::env::var;
6use std::io::{self, Write};
7#[cfg(feature = "rayon")]
8use std::sync::Arc;
9use std::sync::Mutex;
10#[cfg(feature = "rayon")]
11use std::sync::atomic::{AtomicUsize, Ordering};
12#[cfg(feature = "time")]
13use std::time::Instant;
14
15static ENABLE_LOG: Mutex<Option<bool>> = Mutex::new(None);
16
17/// Gets the current logging enabled state.
18///
19/// On first call, initializes from the `ENABLE_ITER_LOG` environment variable.
20/// Subsequently, returns the value set via `set_log_enabled()`.
21fn enable_log() -> bool {
22    let mut state = ENABLE_LOG.lock().unwrap();
23    match *state {
24        Some(enabled) => enabled,
25        None => {
26            let enabled = var("ENABLE_ITER_LOG").is_ok();
27            *state = Some(enabled);
28            enabled
29        }
30    }
31}
32
33/// Sets the logging enabled state at runtime.
34///
35/// This allows you to enable or disable logging dynamically without relying on
36/// the `ENABLE_ITER_LOG` environment variable. Once set, this value will be used
37/// for all subsequent logging decisions.
38///
39/// # Arguments
40///
41/// * `enabled` - `true` to enable logging, `false` to disable it.
42///
43/// # Examples
44///
45/// ```no_run
46/// use iter_log::set_log_enabled;
47///
48/// // Enable logging at runtime
49/// set_log_enabled(true);
50///
51/// // Disable logging at runtime
52/// set_log_enabled(false);
53/// ```
54pub fn set_log_enabled(enabled: bool) {
55    let mut state = ENABLE_LOG.lock().unwrap();
56    *state = Some(enabled);
57}
58
59const BARS: &str = "--------------------------------------------------";
60static STEP_PERCENT: usize = 2;
61
62/// Extension trait to add a `log_progress` method to regular iterators.
63pub trait LogProgressExt: Iterator + Sized {
64    /// Wraps the iterator with progress logging.
65    ///
66    /// This method will print progress updates every 2% of the way through the iteration.
67    ///
68    /// # Returns
69    ///
70    /// Returns a new `LogProgress` iterator which tracks and logs progress.
71    fn log_progress(self, name: &str) -> LogProgress<Self>;
72}
73
74impl<I> LogProgressExt for I
75where
76    I: Iterator, // Ensure `I` is an iterator
77{
78    fn log_progress(self, name: &str) -> LogProgress<Self> {
79        let total = self.size_hint().1.unwrap_or(0); // Get the total number of items (if possible)
80
81        LogProgress {
82            iter: self,
83            progress: 0,
84            total,
85            name: name.to_string(),
86            #[cfg(feature = "time")]
87            t0: Instant::now(),
88        }
89    }
90}
91
92/// A struct that wraps an iterator and tracks progress.
93pub struct LogProgress<I> {
94    iter: I,
95    progress: usize,
96    total: usize,
97    name: String,
98    #[cfg(feature = "time")]
99    t0: Instant,
100}
101
102impl<I: Iterator> Iterator for LogProgress<I> {
103    type Item = I::Item;
104
105    /// Returns the next item in the iteration, while logging progress at intervals.
106    ///
107    /// The method increments the progress count and checks if the progress reaches a new milestone
108    /// (i.e., a multiple of 2). If so, it logs the progress to the console.
109    ///
110    /// # Returns
111    ///
112    /// Returns the next item in the iterator, or `None` if the iterator is finished.
113    fn next(&mut self) -> Option<Self::Item> {
114        let item = self.iter.next()?;
115        let step = STEP_PERCENT / 2;
116
117        if enable_log() {
118            if self.progress == 0 {
119                print!(
120                    "+{}+\n| {}{}|\n+{}+\n|",
121                    BARS,
122                    self.name,
123                    " ".repeat(49 - self.name.len()),
124                    BARS,
125                );
126            }
127            self.progress += 1;
128            let old_percent = (self.progress * 100) / self.total;
129            let new_percent = ((self.progress + 1) * 100) / self.total;
130
131            // Log the progress if we hit a new milestone
132            if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
133                print!("{}", "=".repeat(step));
134                io::stdout().flush().unwrap();
135            }
136        }
137
138        Some(item)
139    }
140}
141
142impl<I> LogProgress<I> {
143    fn finish(&self) {
144        if enable_log() {
145            println!("|\n+{}+", "-".repeat(50));
146            #[cfg(feature = "time")]
147            let elapsed = self.t0.elapsed();
148            #[cfg(feature = "time")]
149            let time_str = format!("{:?}", elapsed);
150            #[cfg(feature = "time")]
151            print!(
152                "| Took {} to complete{}|\n+{}+\n",
153                time_str,
154                " ".repeat(32 - time_str.chars().count()),
155                BARS
156            );
157        }
158    }
159}
160
161impl<I> Drop for LogProgress<I> {
162    fn drop(&mut self) {
163        self.finish();
164    }
165}
166
167/// A struct to handle ordered progress logging during parallel iteration.
168#[cfg(feature = "rayon")]
169struct OrderedLogger {
170    last_logged: AtomicUsize,
171    pending_logs: Mutex<Vec<usize>>,
172    #[cfg(feature = "time")]
173    t0: Instant,
174}
175
176#[cfg(feature = "rayon")]
177impl OrderedLogger {
178    /// Creates a new `OrderedLogger` instance.
179    ///
180    /// This function initializes the logger with a fresh `AtomicUsize` and an empty vector for
181    /// pending progress updates.
182    ///
183    /// # Returns
184    ///
185    /// Returns an `Arc` (thread-safe reference) of the `OrderedLogger`.
186    #[cfg(feature = "time")]
187    fn new(t0: Instant) -> Arc<Self> {
188        Arc::new(Self {
189            last_logged: AtomicUsize::new(0),
190            pending_logs: Mutex::new(Vec::new()),
191            t0,
192        })
193    }
194
195    /// Creates a new `OrderedLogger` instance.
196    ///
197    /// This function initializes the logger with a fresh `AtomicUsize` and an empty vector for
198    /// pending progress updates.
199    ///
200    /// # Returns
201    ///
202    /// Returns an `Arc` (thread-safe reference) of the `OrderedLogger`.
203    #[cfg(not(feature = "time"))]
204    fn new() -> Arc<Self> {
205        Arc::new(Self {
206            last_logged: AtomicUsize::new(0),
207            pending_logs: Mutex::new(Vec::new()),
208        })
209    }
210
211    /// Logs the progress if it matches the expected step and ensures ordered output.
212    ///
213    /// # Arguments
214    ///
215    /// * `progress` - The current progress percentage.
216    ///
217    /// # Notes
218    ///
219    /// This method ensures that progress logs are printed in the correct order, even when the
220    /// parallel tasks report progress asynchronously.
221    fn log_progress(&self, progress: usize) {
222        if enable_log() {
223            let mut pending = self.pending_logs.lock().unwrap();
224            let step = STEP_PERCENT / 2;
225
226            if progress == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
227                // Print the progress immediately if it's the next expected one
228                print!("{}", "=".repeat(step));
229                io::stdout().flush().unwrap();
230                self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
231
232                // Print any pending logs that can now be processed in order
233                while let Some(&next) = pending.first() {
234                    if next == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
235                        print!("{}", "=".repeat(step));
236                        io::stdout().flush().unwrap();
237                        self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
238                        pending.remove(0);
239                    } else {
240                        break;
241                    }
242                }
243            } else {
244                // If progress is not expected yet, store it for later
245                pending.push(progress);
246                pending.sort_unstable(); // Sort pending progress updates
247            }
248        }
249    }
250
251    fn finish(&self) {
252        if enable_log() {
253            println!("|\n+{}+", "-".repeat(50));
254            #[cfg(feature = "time")]
255            let elapsed = self.t0.elapsed();
256            #[cfg(feature = "time")]
257            let time_str = format!("{:?}", elapsed);
258            #[cfg(feature = "time")]
259            print!(
260                "| Took {} to complete{}|\n+{}+\n",
261                time_str,
262                " ".repeat(32 - time_str.chars().count()),
263                BARS
264            );
265        }
266    }
267}
268
269#[cfg(feature = "rayon")]
270impl Drop for OrderedLogger {
271    fn drop(&mut self) {
272        self.finish();
273    }
274}
275
276/// A struct that wraps a parallel iterator and tracks progress.
277#[cfg(feature = "rayon")]
278pub struct LogProgressPar<I> {
279    iter: I,
280    progress: Arc<AtomicUsize>,
281    total: usize,
282    logger: Arc<OrderedLogger>,
283}
284
285#[cfg(feature = "rayon")]
286impl<I> ParallelIterator for LogProgressPar<I>
287where
288    I: ParallelIterator,
289{
290    type Item = I::Item;
291
292    /// Drives the parallel iteration using a custom consumer.
293    ///
294    /// This method wraps the original consumer with the `LogProgressConsumer`, which tracks the
295    /// progress and logs it at the specified intervals.
296    ///
297    /// # Arguments
298    ///
299    /// * `consumer` - The base consumer that will process the items of the parallel iterator.
300    ///
301    /// # Returns
302    ///
303    /// Returns the result of the parallel iteration after it is consumed.
304    fn drive_unindexed<C>(self, consumer: C) -> C::Result
305    where
306        C: UnindexedConsumer<Self::Item>,
307    {
308        let wrapped_consumer = LogProgressConsumer {
309            base: consumer,
310            progress: self.progress,
311            total: self.total,
312            logger: self.logger,
313        };
314        self.iter.drive_unindexed(wrapped_consumer)
315    }
316}
317
318/// A consumer for parallel iterations that tracks progress and logs it.
319#[cfg(feature = "rayon")]
320struct LogProgressConsumer<C> {
321    base: C,
322    progress: Arc<AtomicUsize>,
323    total: usize,
324    logger: Arc<OrderedLogger>,
325}
326
327#[cfg(feature = "rayon")]
328impl<C, T> Consumer<T> for LogProgressConsumer<C>
329where
330    C: Consumer<T>,
331{
332    type Folder = LogProgressFolder<C::Folder>;
333    type Reducer = C::Reducer;
334    type Result = C::Result;
335
336    /// Splits the consumer at the specified index and returns two new consumers.
337    ///
338    /// This method ensures that progress tracking is correctly propagated through both consumers.
339    ///
340    /// # Arguments
341    ///
342    /// * `index` - The index at which to split the consumer.
343    ///
344    /// # Returns
345    ///
346    /// Returns two new consumers and a reducer for parallel reduction.
347    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
348        let (left, right, reducer) = self.base.split_at(index);
349        (
350            LogProgressConsumer {
351                base: left,
352                progress: Arc::clone(&self.progress),
353                total: self.total,
354                logger: Arc::clone(&self.logger),
355            },
356            LogProgressConsumer {
357                base: right,
358                progress: Arc::clone(&self.progress),
359                total: self.total,
360                logger: Arc::clone(&self.logger),
361            },
362            reducer,
363        )
364    }
365
366    fn into_folder(self) -> Self::Folder {
367        LogProgressFolder {
368            base: self.base.into_folder(),
369            progress: self.progress,
370            total: self.total,
371            logger: Arc::clone(&self.logger),
372        }
373    }
374
375    fn full(&self) -> bool {
376        self.base.full()
377    }
378}
379
380#[cfg(feature = "rayon")]
381impl<C, T> UnindexedConsumer<T> for LogProgressConsumer<C>
382where
383    C: UnindexedConsumer<T>,
384{
385    fn split_off_left(&self) -> Self {
386        LogProgressConsumer {
387            base: self.base.split_off_left(),
388            progress: Arc::clone(&self.progress),
389            total: self.total,
390            logger: Arc::clone(&self.logger),
391        }
392    }
393
394    fn to_reducer(&self) -> Self::Reducer {
395        self.base.to_reducer()
396    }
397}
398
399/// A folder for processing items in parallel while tracking progress.
400#[cfg(feature = "rayon")]
401struct LogProgressFolder<F> {
402    base: F,
403    progress: Arc<AtomicUsize>,
404    total: usize,
405    logger: Arc<OrderedLogger>,
406}
407
408#[cfg(feature = "rayon")]
409impl<F, T> Folder<T> for LogProgressFolder<F>
410where
411    F: Folder<T>,
412{
413    type Result = F::Result;
414
415    /// Consumes an item and tracks progress.
416    ///
417    /// This method updates the progress counter and logs progress at specified intervals.
418    ///
419    /// # Arguments
420    ///
421    /// * `item` - The item to consume and process.
422    ///
423    /// # Returns
424    ///
425    /// Returns a new `LogProgressFolder` with the updated state.
426    fn consume(self, item: T) -> Self {
427        let old_count = self.progress.fetch_add(1, Ordering::Relaxed);
428        let old_percent = (old_count * 100) / self.total;
429        let new_percent = ((old_count + 1) * 100) / self.total;
430
431        if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
432            let rounded_percent = new_percent - (new_percent % STEP_PERCENT);
433            self.logger.log_progress(rounded_percent);
434        }
435
436        LogProgressFolder {
437            base: self.base.consume(item),
438            progress: self.progress,
439            total: self.total,
440            logger: self.logger,
441        }
442    }
443
444    fn complete(self) -> Self::Result {
445        self.base.complete()
446    }
447
448    fn full(&self) -> bool {
449        self.base.full()
450    }
451}
452
453/// Extension trait to add a `log_progress` method to parallel iterators.
454#[cfg(feature = "rayon")]
455pub trait LogProgressParExt: Sized + ParallelIterator {
456    /// Wraps the parallel iterator with progress logging.
457    ///
458    /// This method will print progress updates every 2% of the way through the iteration.
459    ///
460    /// # Returns
461    ///
462    /// Returns a new `LogProgressPar` iterator which tracks and logs progress.
463    fn log_progress(self, name: &str) -> LogProgressPar<Self>;
464}
465
466#[cfg(feature = "rayon")]
467impl<I> LogProgressParExt for I
468where
469    I: ParallelIterator + IndexedParallelIterator,
470{
471    fn log_progress(self, name: &str) -> LogProgressPar<Self> {
472        let total = self.len(); // Get the total number of items
473        #[cfg(feature = "time")]
474        let t0 = Instant::now();
475        #[cfg(feature = "time")]
476        let logger = OrderedLogger::new(t0); // Create the logger
477        #[cfg(not(feature = "time"))]
478        let logger = OrderedLogger::new(); // Create the logger
479
480        if enable_log() {
481            print!(
482                "+{}+\n| {}{}|\n+{}+\n|",
483                BARS,
484                name,
485                " ".repeat(49 - name.len()),
486                BARS,
487            );
488        }
489
490        LogProgressPar {
491            iter: self,
492            progress: Arc::new(AtomicUsize::new(0)),
493            total,
494            logger,
495        }
496    }
497}
498
499/// A long computation function to simulate more expensive work per item.
500pub fn long_computation(x: u32) -> u32 {
501    // Simulate a heavy calculation by introducing a delay
502    let mut result = x;
503    for _ in 0..1_000_000 {
504        result = result.saturating_mul(2);
505    }
506    result
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    #[test]
514    fn test_long_computation() {
515        assert_eq!(long_computation(2), 4294967295); // Expected result
516    }
517
518    #[test]
519    fn test_set_log_enabled_true() {
520        // Reset to initial state by setting to false
521        set_log_enabled(false);
522        assert!(!enable_log());
523
524        // Enable logging
525        set_log_enabled(true);
526        assert!(enable_log());
527    }
528
529    #[test]
530    fn test_set_log_enabled_false() {
531        // Enable logging first
532        set_log_enabled(true);
533        assert!(enable_log());
534
535        // Disable logging
536        set_log_enabled(false);
537        assert!(!enable_log());
538    }
539
540    #[test]
541    fn test_set_log_enabled_multiple_calls() {
542        // Test multiple toggles
543        set_log_enabled(true);
544        assert!(enable_log());
545
546        set_log_enabled(false);
547        assert!(!enable_log());
548
549        set_log_enabled(true);
550        assert!(enable_log());
551
552        set_log_enabled(false);
553        assert!(!enable_log());
554    }
555
556    #[test]
557    fn test_log_progress_with_logging_disabled() {
558        set_log_enabled(false);
559
560        let data = vec![1, 2, 3, 4, 5];
561        let result: Vec<_> = data.iter().log_progress("test").map(|&x| x * 2).collect();
562
563        assert_eq!(result, vec![2, 4, 6, 8, 10]);
564    }
565
566    #[test]
567    fn test_log_progress_with_logging_enabled() {
568        set_log_enabled(true);
569
570        let data = vec![1, 2, 3, 4, 5];
571        let result: Vec<_> = data.iter().log_progress("test").map(|&x| x * 2).collect();
572
573        assert_eq!(result, vec![2, 4, 6, 8, 10]);
574    }
575
576    #[cfg(feature = "rayon")]
577    #[test]
578    fn test_log_progress_par_with_logging_disabled() {
579        use rayon::prelude::*;
580
581        set_log_enabled(false);
582
583        let data = vec![1, 2, 3, 4, 5];
584        let result: Vec<_> = data
585            .par_iter()
586            .log_progress("test_par")
587            .map(|&x| x * 2)
588            .collect();
589
590        result.iter().for_each(|&x| {
591            assert!(x % 2 == 0); // All results should be even
592        });
593    }
594
595    #[cfg(feature = "rayon")]
596    #[test]
597    fn test_log_progress_par_with_logging_enabled() {
598        use rayon::prelude::*;
599
600        set_log_enabled(true);
601
602        let data = vec![1, 2, 3, 4, 5];
603        let result: Vec<_> = data
604            .par_iter()
605            .log_progress("test_par")
606            .map(|&x| x * 2)
607            .collect();
608
609        result.iter().for_each(|&x| {
610            assert!(x % 2 == 0); // All results should be even
611        });
612    }
613
614    #[test]
615    fn test_toggle_logging_within_single_run() {
616        let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
617
618        // First iteration with logging disabled
619        set_log_enabled(false);
620        assert!(!enable_log());
621        let result1: Vec<_> = data
622            .iter()
623            .log_progress("iteration_1_disabled")
624            .map(|&x| x * 2)
625            .collect();
626        assert_eq!(result1.len(), 10);
627
628        // Second iteration with logging enabled
629        set_log_enabled(true);
630        assert!(enable_log());
631        let result2: Vec<_> = data
632            .iter()
633            .log_progress("iteration_2_enabled")
634            .map(|&x| x * 2)
635            .collect();
636        assert_eq!(result2.len(), 10);
637
638        // Third iteration with logging disabled again
639        set_log_enabled(false);
640        assert!(!enable_log());
641        let result3: Vec<_> = data
642            .iter()
643            .log_progress("iteration_3_disabled")
644            .map(|&x| x * 2)
645            .collect();
646        assert_eq!(result3.len(), 10);
647
648        // Fourth iteration with logging enabled again
649        set_log_enabled(true);
650        assert!(enable_log());
651        let result4: Vec<_> = data
652            .iter()
653            .log_progress("iteration_4_enabled")
654            .map(|&x| x * 2)
655            .collect();
656        assert_eq!(result4.len(), 10);
657
658        // Verify all results are identical
659        assert_eq!(result1, result2);
660        assert_eq!(result2, result3);
661        assert_eq!(result3, result4);
662    }
663
664    #[cfg(feature = "rayon")]
665    #[test]
666    fn test_toggle_logging_parallel_within_single_run() {
667        use rayon::prelude::*;
668
669        let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
670
671        // First parallel iteration with logging disabled
672        set_log_enabled(false);
673        assert!(!enable_log());
674        let result1: Vec<_> = data
675            .par_iter()
676            .log_progress("par_iteration_1_disabled")
677            .map(|&x| x * 2)
678            .collect();
679        result1.iter().for_each(|&x| {
680            assert!(x % 2 == 0);
681        });
682
683        // Second parallel iteration with logging enabled
684        set_log_enabled(true);
685        assert!(enable_log());
686        let result2: Vec<_> = data
687            .par_iter()
688            .log_progress("par_iteration_2_enabled")
689            .map(|&x| x * 2)
690            .collect();
691        result2.iter().for_each(|&x| {
692            assert!(x % 2 == 0);
693        });
694
695        // Third parallel iteration with logging disabled again
696        set_log_enabled(false);
697        assert!(!enable_log());
698        let result3: Vec<_> = data
699            .par_iter()
700            .log_progress("par_iteration_3_disabled")
701            .map(|&x| x * 2)
702            .collect();
703        result3.iter().for_each(|&x| {
704            assert!(x % 2 == 0);
705        });
706
707        // Fourth parallel iteration with logging enabled again
708        set_log_enabled(true);
709        assert!(enable_log());
710        let result4: Vec<_> = data
711            .par_iter()
712            .log_progress("par_iteration_4_enabled")
713            .map(|&x| x * 2)
714            .collect();
715        result4.iter().for_each(|&x| {
716            assert!(x % 2 == 0);
717        });
718
719        // Verify state toggles were properly applied
720        assert!(enable_log());
721    }
722}