1#[cfg(feature = "rayon")]
2use rayon::iter::plumbing::{Consumer, Folder, UnindexedConsumer};
3#[cfg(feature = "rayon")]
4use rayon::prelude::*;
5use std::env::var;
6use std::io::{self, Write};
7use std::sync::OnceLock;
8#[cfg(feature = "rayon")]
9use std::sync::atomic::{AtomicUsize, Ordering};
10#[cfg(feature = "rayon")]
11use std::sync::{Arc, Mutex};
12#[cfg(feature = "time")]
13use std::time::Instant;
14
15fn enable_log() -> &'static bool {
16 static ENABLE_LOG: OnceLock<bool> = OnceLock::new();
17 ENABLE_LOG.get_or_init(|| var("ENABLE_ITER_LOG").is_ok())
18}
19
20const BARS: &str = "--------------------------------------------------";
21static STEP_PERCENT: usize = 2;
22
23pub trait LogProgressExt: Iterator + Sized {
25 fn log_progress(self, name: &str) -> LogProgress<Self>;
33}
34
35impl<I> LogProgressExt for I
36where
37 I: Iterator, {
39 fn log_progress(self, name: &str) -> LogProgress<Self> {
40 let total = self.size_hint().1.unwrap_or(0); LogProgress {
43 iter: self,
44 progress: 0,
45 total,
46 name: name.to_string(),
47 #[cfg(feature = "time")]
48 t0: Instant::now(),
49 }
50 }
51}
52
53pub struct LogProgress<I> {
55 iter: I,
56 progress: usize,
57 total: usize,
58 name: String,
59 #[cfg(feature = "time")]
60 t0: Instant,
61}
62
63impl<I: Iterator> Iterator for LogProgress<I> {
64 type Item = I::Item;
65
66 fn next(&mut self) -> Option<Self::Item> {
75 let item = self.iter.next()?;
76 let step = STEP_PERCENT / 2;
77
78 if *enable_log() {
79 if self.progress == 0 {
80 print!(
81 "+{}+\n| {}{}|\n+{}+\n|",
82 BARS,
83 self.name,
84 " ".repeat(49 - self.name.len()),
85 BARS,
86 );
87 }
88 self.progress += 1;
89 let old_percent = (self.progress * 100) / self.total;
90 let new_percent = ((self.progress + 1) * 100) / self.total;
91
92 if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
94 print!("{}", "=".repeat(step));
95 io::stdout().flush().unwrap();
96 }
97 }
98
99 Some(item)
100 }
101}
102
103impl<I> LogProgress<I> {
104 fn finish(&self) {
105 if *enable_log() {
106 println!("|\n+{}+", "-".repeat(50));
107 #[cfg(feature = "time")]
108 let elapsed = self.t0.elapsed();
109 #[cfg(feature = "time")]
110 let time_str = format!("{:?}", elapsed);
111 #[cfg(feature = "time")]
112 print!(
113 "| Took {} to complete{}|\n+{}+\n",
114 time_str,
115 " ".repeat(32 - time_str.chars().count()),
116 BARS
117 );
118 }
119 }
120}
121
122impl<I> Drop for LogProgress<I> {
123 fn drop(&mut self) {
124 self.finish();
125 }
126}
127
128#[cfg(feature = "rayon")]
130struct OrderedLogger {
131 last_logged: AtomicUsize,
132 pending_logs: Mutex<Vec<usize>>,
133 #[cfg(feature = "time")]
134 t0: Instant,
135}
136
137#[cfg(feature = "rayon")]
138impl OrderedLogger {
139 #[cfg(feature = "time")]
148 fn new(t0: Instant) -> Arc<Self> {
149 Arc::new(Self {
150 last_logged: AtomicUsize::new(0),
151 pending_logs: Mutex::new(Vec::new()),
152 t0,
153 })
154 }
155
156 #[cfg(not(feature = "time"))]
165 fn new() -> Arc<Self> {
166 Arc::new(Self {
167 last_logged: AtomicUsize::new(0),
168 pending_logs: Mutex::new(Vec::new()),
169 })
170 }
171
172 fn log_progress(&self, progress: usize) {
183 if *enable_log() {
184 let mut pending = self.pending_logs.lock().unwrap();
185 let step = STEP_PERCENT / 2;
186
187 if progress == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
188 print!("{}", "=".repeat(step));
190 io::stdout().flush().unwrap();
191 self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
192
193 while let Some(&next) = pending.first() {
195 if next == self.last_logged.load(Ordering::Relaxed) + STEP_PERCENT {
196 print!("{}", "=".repeat(step));
197 io::stdout().flush().unwrap();
198 self.last_logged.fetch_add(STEP_PERCENT, Ordering::Relaxed);
199 pending.remove(0);
200 } else {
201 break;
202 }
203 }
204 } else {
205 pending.push(progress);
207 pending.sort_unstable(); }
209 }
210 }
211
212 fn finish(&self) {
213 if *enable_log() {
214 println!("|\n+{}+", "-".repeat(50));
215 #[cfg(feature = "time")]
216 let elapsed = self.t0.elapsed();
217 #[cfg(feature = "time")]
218 let time_str = format!("{:?}", elapsed);
219 #[cfg(feature = "time")]
220 print!(
221 "| Took {} to complete{}|\n+{}+\n",
222 time_str,
223 " ".repeat(32 - time_str.chars().count()),
224 BARS
225 );
226 }
227 }
228}
229
230#[cfg(feature = "rayon")]
231impl Drop for OrderedLogger {
232 fn drop(&mut self) {
233 self.finish();
234 }
235}
236
237#[cfg(feature = "rayon")]
239pub struct LogProgressPar<I> {
240 iter: I,
241 progress: Arc<AtomicUsize>,
242 total: usize,
243 logger: Arc<OrderedLogger>,
244}
245
246#[cfg(feature = "rayon")]
247impl<I> ParallelIterator for LogProgressPar<I>
248where
249 I: ParallelIterator,
250{
251 type Item = I::Item;
252
253 fn drive_unindexed<C>(self, consumer: C) -> C::Result
266 where
267 C: UnindexedConsumer<Self::Item>,
268 {
269 let wrapped_consumer = LogProgressConsumer {
270 base: consumer,
271 progress: self.progress,
272 total: self.total,
273 logger: self.logger,
274 };
275 self.iter.drive_unindexed(wrapped_consumer)
276 }
277}
278
279#[cfg(feature = "rayon")]
281struct LogProgressConsumer<C> {
282 base: C,
283 progress: Arc<AtomicUsize>,
284 total: usize,
285 logger: Arc<OrderedLogger>,
286}
287
288#[cfg(feature = "rayon")]
289impl<C, T> Consumer<T> for LogProgressConsumer<C>
290where
291 C: Consumer<T>,
292{
293 type Folder = LogProgressFolder<C::Folder>;
294 type Reducer = C::Reducer;
295 type Result = C::Result;
296
297 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
309 let (left, right, reducer) = self.base.split_at(index);
310 (
311 LogProgressConsumer {
312 base: left,
313 progress: Arc::clone(&self.progress),
314 total: self.total,
315 logger: Arc::clone(&self.logger),
316 },
317 LogProgressConsumer {
318 base: right,
319 progress: Arc::clone(&self.progress),
320 total: self.total,
321 logger: Arc::clone(&self.logger),
322 },
323 reducer,
324 )
325 }
326
327 fn into_folder(self) -> Self::Folder {
328 LogProgressFolder {
329 base: self.base.into_folder(),
330 progress: self.progress,
331 total: self.total,
332 logger: Arc::clone(&self.logger),
333 }
334 }
335
336 fn full(&self) -> bool {
337 self.base.full()
338 }
339}
340
341#[cfg(feature = "rayon")]
342impl<C, T> UnindexedConsumer<T> for LogProgressConsumer<C>
343where
344 C: UnindexedConsumer<T>,
345{
346 fn split_off_left(&self) -> Self {
347 LogProgressConsumer {
348 base: self.base.split_off_left(),
349 progress: Arc::clone(&self.progress),
350 total: self.total,
351 logger: Arc::clone(&self.logger),
352 }
353 }
354
355 fn to_reducer(&self) -> Self::Reducer {
356 self.base.to_reducer()
357 }
358}
359
360#[cfg(feature = "rayon")]
362struct LogProgressFolder<F> {
363 base: F,
364 progress: Arc<AtomicUsize>,
365 total: usize,
366 logger: Arc<OrderedLogger>,
367}
368
369#[cfg(feature = "rayon")]
370impl<F, T> Folder<T> for LogProgressFolder<F>
371where
372 F: Folder<T>,
373{
374 type Result = F::Result;
375
376 fn consume(self, item: T) -> Self {
388 let old_count = self.progress.fetch_add(1, Ordering::Relaxed);
389 let old_percent = (old_count * 100) / self.total;
390 let new_percent = ((old_count + 1) * 100) / self.total;
391
392 if new_percent / STEP_PERCENT > old_percent / STEP_PERCENT {
393 let rounded_percent = new_percent - (new_percent % STEP_PERCENT);
394 self.logger.log_progress(rounded_percent);
395 }
396
397 LogProgressFolder {
398 base: self.base.consume(item),
399 progress: self.progress,
400 total: self.total,
401 logger: self.logger,
402 }
403 }
404
405 fn complete(self) -> Self::Result {
406 self.base.complete()
407 }
408
409 fn full(&self) -> bool {
410 self.base.full()
411 }
412}
413
414#[cfg(feature = "rayon")]
416pub trait LogProgressParExt: Sized + ParallelIterator {
417 fn log_progress(self, name: &str) -> LogProgressPar<Self>;
425}
426
427#[cfg(feature = "rayon")]
428impl<I> LogProgressParExt for I
429where
430 I: ParallelIterator + IndexedParallelIterator,
431{
432 fn log_progress(self, name: &str) -> LogProgressPar<Self> {
433 let total = self.len(); #[cfg(feature = "time")]
435 let t0 = Instant::now();
436 #[cfg(feature = "time")]
437 let logger = OrderedLogger::new(t0); #[cfg(not(feature = "time"))]
439 let logger = OrderedLogger::new(); if *enable_log() {
442 print!(
443 "+{}+\n| {}{}|\n+{}+\n|",
444 BARS,
445 name,
446 " ".repeat(49 - name.len()),
447 BARS,
448 );
449 }
450
451 LogProgressPar {
452 iter: self,
453 progress: Arc::new(AtomicUsize::new(0)),
454 total,
455 logger,
456 }
457 }
458}
459
460pub fn long_computation(x: u32) -> u32 {
462 let mut result = x;
464 for _ in 0..1_000_000 {
465 result = result.saturating_mul(2);
466 }
467 result
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_long_computation() {
476 assert_eq!(long_computation(2), 4294967295); }
478}