1#[cfg(feature = "rayon")]
2use rayon::iter::plumbing::{Consumer, Folder, UnindexedConsumer};
3#[cfg(feature = "rayon")]
4use rayon::prelude::*;
5use std::env::var;
6use std::io::{self, Write};
7#[cfg(feature = "rayon")]
8use std::sync::Arc;
9use std::sync::Mutex;
10#[cfg(feature = "rayon")]
11use std::sync::atomic::{AtomicUsize, Ordering};
12#[cfg(feature = "time")]
13use std::time::Instant;
14
15static ENABLE_LOG: Mutex<Option<bool>> = Mutex::new(None);
16
17fn enable_log() -> bool {
22 let mut state = ENABLE_LOG.lock().unwrap();
23 match *state {
24 Some(enabled) => enabled,
25 None => {
26 let enabled = var("ENABLE_ITER_LOG").is_ok();
27 *state = Some(enabled);
28 enabled
29 }
30 }
31}
32
33pub fn set_log_enabled(enabled: bool) {
55 let mut state = ENABLE_LOG.lock().unwrap();
56 *state = Some(enabled);
57}
58
59const BARS: &str = "--------------------------------------------------";
60static STEP_PERCENT: usize = 2;
61
62pub trait LogProgressExt: Iterator + Sized {
64 fn log_progress(self, name: &str) -> LogProgress<Self>;
72}
73
74impl<I> LogProgressExt for I
75where
76 I: Iterator, {
78 fn log_progress(self, name: &str) -> LogProgress<Self> {
79 let total = self.size_hint().1.unwrap_or(0); LogProgress {
82 iter: self,
83 progress: 0,
84 total,
85 name: name.to_string(),
86 #[cfg(feature = "time")]
87 t0: Instant::now(),
88 }
89 }
90}
91
92pub struct LogProgress<I> {
94 iter: I,
95 progress: usize,
96 total: usize,
97 name: String,
98 #[cfg(feature = "time")]
99 t0: Instant,
100}
101
102impl<I: Iterator> Iterator for LogProgress<I> {
103 type Item = I::Item;
104
105 fn next(&mut self) -> Option<Self::Item> {
114 let item = self.iter.next()?;
115 let step = STEP_PERCENT / 2;
116
117 if enable_log() {
118 if self.progress == 0 {
119 print!(
120 "+{}+\n| {}{}|\n+{}+\n|",
121 BARS,
122 self.name,
123 " ".repeat(49 - self.name.len()),
124 BARS,
125 );
126 }
127 self.progress += 1;
128 let old_percent = (self.progress * 100) / self.total;
129 let new_percent = ((self.progress + 1) * 100) / self.total;
130
131 if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
133 print!("{}", "=".repeat(step));
134 io::stdout().flush().unwrap();
135 }
136 }
137
138 Some(item)
139 }
140}
141
142impl<I> LogProgress<I> {
143 fn finish(&self) {
144 if enable_log() {
145 println!("|\n+{}+", "-".repeat(50));
146 #[cfg(feature = "time")]
147 let elapsed = self.t0.elapsed();
148 #[cfg(feature = "time")]
149 let time_str = format!("{:?}", elapsed);
150 #[cfg(feature = "time")]
151 print!(
152 "| Took {} to complete{}|\n+{}+\n",
153 time_str,
154 " ".repeat(32 - time_str.chars().count()),
155 BARS
156 );
157 }
158 }
159}
160
161impl<I> Drop for LogProgress<I> {
162 fn drop(&mut self) {
163 self.finish();
164 }
165}
166
167#[cfg(feature = "rayon")]
169struct OrderedLogger {
170 last_logged: AtomicUsize,
171 pending_logs: Mutex<Vec<usize>>,
172 #[cfg(feature = "time")]
173 t0: Instant,
174}
175
176#[cfg(feature = "rayon")]
177impl OrderedLogger {
178 #[cfg(feature = "time")]
187 fn new(t0: Instant) -> Arc<Self> {
188 Arc::new(Self {
189 last_logged: AtomicUsize::new(0),
190 pending_logs: Mutex::new(Vec::new()),
191 t0,
192 })
193 }
194
195 #[cfg(not(feature = "time"))]
204 fn new() -> Arc<Self> {
205 Arc::new(Self {
206 last_logged: AtomicUsize::new(0),
207 pending_logs: Mutex::new(Vec::new()),
208 })
209 }
210
211 fn log_progress(&self, progress: usize) {
222 if enable_log() {
223 let mut pending = self.pending_logs.lock().unwrap();
224 let step = STEP_PERCENT / 2;
225
226 if progress == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
227 print!("{}", "=".repeat(step));
229 io::stdout().flush().unwrap();
230 self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
231
232 while let Some(&next) = pending.first() {
234 if next == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
235 print!("{}", "=".repeat(step));
236 io::stdout().flush().unwrap();
237 self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
238 pending.remove(0);
239 } else {
240 break;
241 }
242 }
243 } else {
244 pending.push(progress);
246 pending.sort_unstable(); }
248 }
249 }
250
251 fn finish(&self) {
252 if enable_log() {
253 println!("|\n+{}+", "-".repeat(50));
254 #[cfg(feature = "time")]
255 let elapsed = self.t0.elapsed();
256 #[cfg(feature = "time")]
257 let time_str = format!("{:?}", elapsed);
258 #[cfg(feature = "time")]
259 print!(
260 "| Took {} to complete{}|\n+{}+\n",
261 time_str,
262 " ".repeat(32 - time_str.chars().count()),
263 BARS
264 );
265 }
266 }
267}
268
269#[cfg(feature = "rayon")]
270impl Drop for OrderedLogger {
271 fn drop(&mut self) {
272 self.finish();
273 }
274}
275
276#[cfg(feature = "rayon")]
278pub struct LogProgressPar<I> {
279 iter: I,
280 progress: Arc<AtomicUsize>,
281 total: usize,
282 logger: Arc<OrderedLogger>,
283}
284
285#[cfg(feature = "rayon")]
286impl<I> ParallelIterator for LogProgressPar<I>
287where
288 I: ParallelIterator,
289{
290 type Item = I::Item;
291
292 fn drive_unindexed<C>(self, consumer: C) -> C::Result
305 where
306 C: UnindexedConsumer<Self::Item>,
307 {
308 let wrapped_consumer = LogProgressConsumer {
309 base: consumer,
310 progress: self.progress,
311 total: self.total,
312 logger: self.logger,
313 };
314 self.iter.drive_unindexed(wrapped_consumer)
315 }
316}
317
318#[cfg(feature = "rayon")]
320struct LogProgressConsumer<C> {
321 base: C,
322 progress: Arc<AtomicUsize>,
323 total: usize,
324 logger: Arc<OrderedLogger>,
325}
326
327#[cfg(feature = "rayon")]
328impl<C, T> Consumer<T> for LogProgressConsumer<C>
329where
330 C: Consumer<T>,
331{
332 type Folder = LogProgressFolder<C::Folder>;
333 type Reducer = C::Reducer;
334 type Result = C::Result;
335
336 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
348 let (left, right, reducer) = self.base.split_at(index);
349 (
350 LogProgressConsumer {
351 base: left,
352 progress: Arc::clone(&self.progress),
353 total: self.total,
354 logger: Arc::clone(&self.logger),
355 },
356 LogProgressConsumer {
357 base: right,
358 progress: Arc::clone(&self.progress),
359 total: self.total,
360 logger: Arc::clone(&self.logger),
361 },
362 reducer,
363 )
364 }
365
366 fn into_folder(self) -> Self::Folder {
367 LogProgressFolder {
368 base: self.base.into_folder(),
369 progress: self.progress,
370 total: self.total,
371 logger: Arc::clone(&self.logger),
372 }
373 }
374
375 fn full(&self) -> bool {
376 self.base.full()
377 }
378}
379
380#[cfg(feature = "rayon")]
381impl<C, T> UnindexedConsumer<T> for LogProgressConsumer<C>
382where
383 C: UnindexedConsumer<T>,
384{
385 fn split_off_left(&self) -> Self {
386 LogProgressConsumer {
387 base: self.base.split_off_left(),
388 progress: Arc::clone(&self.progress),
389 total: self.total,
390 logger: Arc::clone(&self.logger),
391 }
392 }
393
394 fn to_reducer(&self) -> Self::Reducer {
395 self.base.to_reducer()
396 }
397}
398
399#[cfg(feature = "rayon")]
401struct LogProgressFolder<F> {
402 base: F,
403 progress: Arc<AtomicUsize>,
404 total: usize,
405 logger: Arc<OrderedLogger>,
406}
407
408#[cfg(feature = "rayon")]
409impl<F, T> Folder<T> for LogProgressFolder<F>
410where
411 F: Folder<T>,
412{
413 type Result = F::Result;
414
415 fn consume(self, item: T) -> Self {
427 let old_count = self.progress.fetch_add(1, Ordering::Relaxed);
428 let old_percent = (old_count * 100) / self.total;
429 let new_percent = ((old_count + 1) * 100) / self.total;
430
431 if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
432 let rounded_percent = new_percent - (new_percent % STEP_PERCENT);
433 self.logger.log_progress(rounded_percent);
434 }
435
436 LogProgressFolder {
437 base: self.base.consume(item),
438 progress: self.progress,
439 total: self.total,
440 logger: self.logger,
441 }
442 }
443
444 fn complete(self) -> Self::Result {
445 self.base.complete()
446 }
447
448 fn full(&self) -> bool {
449 self.base.full()
450 }
451}
452
453#[cfg(feature = "rayon")]
455pub trait LogProgressParExt: Sized + ParallelIterator {
456 fn log_progress(self, name: &str) -> LogProgressPar<Self>;
464}
465
466#[cfg(feature = "rayon")]
467impl<I> LogProgressParExt for I
468where
469 I: ParallelIterator + IndexedParallelIterator,
470{
471 fn log_progress(self, name: &str) -> LogProgressPar<Self> {
472 let total = self.len(); #[cfg(feature = "time")]
474 let t0 = Instant::now();
475 #[cfg(feature = "time")]
476 let logger = OrderedLogger::new(t0); #[cfg(not(feature = "time"))]
478 let logger = OrderedLogger::new(); if enable_log() {
481 print!(
482 "+{}+\n| {}{}|\n+{}+\n|",
483 BARS,
484 name,
485 " ".repeat(49 - name.len()),
486 BARS,
487 );
488 }
489
490 LogProgressPar {
491 iter: self,
492 progress: Arc::new(AtomicUsize::new(0)),
493 total,
494 logger,
495 }
496 }
497}
498
499pub fn long_computation(x: u32) -> u32 {
501 let mut result = x;
503 for _ in 0..1_000_000 {
504 result = result.saturating_mul(2);
505 }
506 result
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[test]
514 fn test_long_computation() {
515 assert_eq!(long_computation(2), 4294967295); }
517
518 #[test]
519 fn test_set_log_enabled_true() {
520 set_log_enabled(false);
522 assert!(!enable_log());
523
524 set_log_enabled(true);
526 assert!(enable_log());
527 }
528
529 #[test]
530 fn test_set_log_enabled_false() {
531 set_log_enabled(true);
533 assert!(enable_log());
534
535 set_log_enabled(false);
537 assert!(!enable_log());
538 }
539
540 #[test]
541 fn test_set_log_enabled_multiple_calls() {
542 set_log_enabled(true);
544 assert!(enable_log());
545
546 set_log_enabled(false);
547 assert!(!enable_log());
548
549 set_log_enabled(true);
550 assert!(enable_log());
551
552 set_log_enabled(false);
553 assert!(!enable_log());
554 }
555
556 #[test]
557 fn test_log_progress_with_logging_disabled() {
558 set_log_enabled(false);
559
560 let data = vec![1, 2, 3, 4, 5];
561 let result: Vec<_> = data.iter().log_progress("test").map(|&x| x * 2).collect();
562
563 assert_eq!(result, vec![2, 4, 6, 8, 10]);
564 }
565
566 #[test]
567 fn test_log_progress_with_logging_enabled() {
568 set_log_enabled(true);
569
570 let data = vec![1, 2, 3, 4, 5];
571 let result: Vec<_> = data.iter().log_progress("test").map(|&x| x * 2).collect();
572
573 assert_eq!(result, vec![2, 4, 6, 8, 10]);
574 }
575
576 #[cfg(feature = "rayon")]
577 #[test]
578 fn test_log_progress_par_with_logging_disabled() {
579 use rayon::prelude::*;
580
581 set_log_enabled(false);
582
583 let data = vec![1, 2, 3, 4, 5];
584 let result: Vec<_> = data
585 .par_iter()
586 .log_progress("test_par")
587 .map(|&x| x * 2)
588 .collect();
589
590 result.iter().for_each(|&x| {
591 assert!(x % 2 == 0); });
593 }
594
595 #[cfg(feature = "rayon")]
596 #[test]
597 fn test_log_progress_par_with_logging_enabled() {
598 use rayon::prelude::*;
599
600 set_log_enabled(true);
601
602 let data = vec![1, 2, 3, 4, 5];
603 let result: Vec<_> = data
604 .par_iter()
605 .log_progress("test_par")
606 .map(|&x| x * 2)
607 .collect();
608
609 result.iter().for_each(|&x| {
610 assert!(x % 2 == 0); });
612 }
613
614 #[test]
615 fn test_toggle_logging_within_single_run() {
616 let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
617
618 set_log_enabled(false);
620 assert!(!enable_log());
621 let result1: Vec<_> = data
622 .iter()
623 .log_progress("iteration_1_disabled")
624 .map(|&x| x * 2)
625 .collect();
626 assert_eq!(result1.len(), 10);
627
628 set_log_enabled(true);
630 assert!(enable_log());
631 let result2: Vec<_> = data
632 .iter()
633 .log_progress("iteration_2_enabled")
634 .map(|&x| x * 2)
635 .collect();
636 assert_eq!(result2.len(), 10);
637
638 set_log_enabled(false);
640 assert!(!enable_log());
641 let result3: Vec<_> = data
642 .iter()
643 .log_progress("iteration_3_disabled")
644 .map(|&x| x * 2)
645 .collect();
646 assert_eq!(result3.len(), 10);
647
648 set_log_enabled(true);
650 assert!(enable_log());
651 let result4: Vec<_> = data
652 .iter()
653 .log_progress("iteration_4_enabled")
654 .map(|&x| x * 2)
655 .collect();
656 assert_eq!(result4.len(), 10);
657
658 assert_eq!(result1, result2);
660 assert_eq!(result2, result3);
661 assert_eq!(result3, result4);
662 }
663
664 #[cfg(feature = "rayon")]
665 #[test]
666 fn test_toggle_logging_parallel_within_single_run() {
667 use rayon::prelude::*;
668
669 let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
670
671 set_log_enabled(false);
673 assert!(!enable_log());
674 let result1: Vec<_> = data
675 .par_iter()
676 .log_progress("par_iteration_1_disabled")
677 .map(|&x| x * 2)
678 .collect();
679 result1.iter().for_each(|&x| {
680 assert!(x % 2 == 0);
681 });
682
683 set_log_enabled(true);
685 assert!(enable_log());
686 let result2: Vec<_> = data
687 .par_iter()
688 .log_progress("par_iteration_2_enabled")
689 .map(|&x| x * 2)
690 .collect();
691 result2.iter().for_each(|&x| {
692 assert!(x % 2 == 0);
693 });
694
695 set_log_enabled(false);
697 assert!(!enable_log());
698 let result3: Vec<_> = data
699 .par_iter()
700 .log_progress("par_iteration_3_disabled")
701 .map(|&x| x * 2)
702 .collect();
703 result3.iter().for_each(|&x| {
704 assert!(x % 2 == 0);
705 });
706
707 set_log_enabled(true);
709 assert!(enable_log());
710 let result4: Vec<_> = data
711 .par_iter()
712 .log_progress("par_iteration_4_enabled")
713 .map(|&x| x * 2)
714 .collect();
715 result4.iter().for_each(|&x| {
716 assert!(x % 2 == 0);
717 });
718
719 assert!(enable_log());
721 }
722}