quick_file_transfer/
evaluate_compression.rs

1use std::{
2    io::Read,
3    sync::{
4        atomic::{AtomicBool, AtomicUsize, Ordering},
5        Arc,
6    },
7    time::{Duration, Instant},
8};
9
10use crate::{
11    config::{
12        compression::{
13            Bzip2Args, Compression, CompressionRange, CompressionVariant, GzipArgs, XzArgs,
14        },
15        evaluate_compression::EvaluateCompressionArgs,
16    },
17    send::util::file_with_bufreader,
18};
19use anyhow::{bail, Result};
20use console::Emoji;
21use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
22use rayon::iter::{IntoParallelIterator, ParallelIterator};
23use strum::IntoEnumIterator;
24
25pub mod compression_result;
26use compression_result::{Awaiting, CompressionResult, Finished};
27
28mod print_results;
29mod test_compress;
30
31pub fn evaluate_compression(args: EvaluateCompressionArgs) -> Result<()> {
32    let EvaluateCompressionArgs {
33        input_file,
34        omit,
35        mut omit_levels,
36        threads,
37    } = args;
38
39    omit_levels.sort_unstable();
40    let compression_set: Vec<Compression> = Compression::iter().collect();
41
42    if !omit.is_empty() {
43        let mut print_str = String::from("Omitting:  ");
44        for compr in &omit {
45            print_str.push_str(&format!(" {compr}"));
46        }
47        log::info!("{print_str}");
48    }
49
50    let evaluate_compressions: Vec<Compression> = compression_set
51        .into_iter()
52        .filter(|c| !omit.contains(c.into()))
53        .collect();
54
55    if !evaluate_compressions.is_empty() {
56        let mut print_str = String::from("Evaluating:");
57        for compr in &evaluate_compressions {
58            print_str.push_str(&format!(" {compr}"));
59        }
60        log::info!("{print_str}");
61    }
62
63    if !omit_levels.is_empty() {
64        let mut print_str = String::from("Omitting compression levels (where applicable):");
65        for compr_lvls in &omit_levels {
66            print_str.push_str(&format!(" {compr_lvls}"));
67        }
68        log::info!("{print_str}");
69    }
70
71    let mut bufreader = file_with_bufreader(&input_file)?;
72
73    let start = Instant::now();
74    let mut test_contents = Vec::new();
75    bufreader.read_to_end(&mut test_contents)?;
76    let elapsed = start.elapsed();
77    let test_contents_len = test_contents.len();
78    if test_contents_len == 0 {
79        bail!("Invalid content size of 0, please provide a non-empty file")
80    }
81    log::info!("Buffered reading {test_contents_len} B contents in {elapsed:?}");
82
83    let mut compression_awaiting: Vec<CompressionResult<Awaiting>> = Vec::new();
84
85    if evaluate_compressions.contains(&Compression::Lz4) {
86        compression_awaiting.push(CompressionResult::new(Compression::Lz4));
87    }
88
89    if !omit.contains(&CompressionVariant::Bzip2) {
90        for compression_level in <Bzip2Args>::range_u8_with_omit(&omit_levels) {
91            compression_awaiting.push(CompressionResult::new(Compression::Bzip2(Bzip2Args {
92                compression_level,
93            })));
94        }
95    }
96    if !omit.contains(&CompressionVariant::Gzip) {
97        for compression_level in <GzipArgs>::range_u8_with_omit(&omit_levels) {
98            compression_awaiting.push(CompressionResult::new(Compression::Gzip(GzipArgs {
99                compression_level,
100            })));
101        }
102    }
103    if !omit.contains(&CompressionVariant::Xz) {
104        for compression_level in <XzArgs>::range_u8_with_omit(&omit_levels) {
105            compression_awaiting.push(CompressionResult::new(Compression::Xz(XzArgs {
106                compression_level,
107            })));
108        }
109    }
110
111    log::info!(
112        "Evaluating {} compression combinations",
113        compression_awaiting.len()
114    );
115    if threads == 1 {
116        log::info!("Running sequentially on the main thread");
117    } else {
118        log::info!("Running sequentially with up to {threads} threads");
119    }
120    rayon::ThreadPoolBuilder::new()
121        .num_threads(threads)
122        .build_global()?;
123
124    let res = multi_progress_bar(compression_awaiting, &test_contents, threads)?;
125
126    print_results::evaluate_and_printout_results(&res);
127
128    Ok(())
129}
130
131fn multi_progress_bar(
132    compression_awaiting: Vec<CompressionResult<Awaiting>>,
133    test_contents: &Vec<u8>,
134    thread_count: usize,
135) -> anyhow::Result<Vec<CompressionResult<Finished>>> {
136    let multi_bar: MultiProgress = MultiProgress::new();
137
138    let waiting_style =
139        ProgressStyle::default_bar().template("{prefix:.bold.dim} {msg:>30.dim}")?;
140
141    let progress_counter = Arc::new(AtomicUsize::new(0));
142    let total = compression_awaiting.len();
143
144    // Create and manage progress bars for each task
145    let p_bars: Vec<_> = (0..=thread_count)
146        .map(|i| {
147            if i == thread_count {
148                let pb = multi_bar.insert_from_back(thread_count, ProgressBar::new(total as u64));
149                pb.set_style(
150                    style_global_tracker().expect("Failed to set global progress bar style"),
151                );
152                pb.set_prefix(prefix_global_tracker(thread_count));
153                pb.enable_steady_tick(Duration::from_millis(200));
154                (pb, AtomicBool::new(true))
155            } else {
156                let pb = multi_bar.add(ProgressBar::new(total as u64));
157                (pb, AtomicBool::new(false))
158            }
159        })
160        .collect();
161
162    let res = std::thread::scope(|s| {
163        let res = s
164            .spawn(|| {
165                let compression_results: Vec<CompressionResult<Finished>> = compression_awaiting
166                    .into_par_iter()
167                    .flat_map(|cr_await| {
168                        let (pb, is_active) = p_bars
169                            .iter()
170                            .find(|(_, is_active)| {
171                                is_active
172                                    .compare_exchange(
173                                        false,
174                                        true,
175                                        Ordering::SeqCst,
176                                        Ordering::SeqCst,
177                                    )
178                                    .is_ok()
179                            })
180                            .unwrap();
181                        pb.set_style(working_style());
182                        pb.enable_steady_tick(Duration::from_millis(100));
183                        pb.set_message(cr_await.compression.describe_str());
184
185                        let compr_res = cr_await.run(test_contents).ok();
186                        if let Some(ref compr_res) = compr_res {
187                            {
188                                let format = compr_res.compression_format();
189                                let mut table: String = compr_res.summarize_as_table();
190                                let mut disp_str =
191                                    String::with_capacity(format.len() + 1 + table.len());
192                                disp_str.push_str(format);
193                                disp_str.push('\n');
194                                disp_str.extend(table.drain(..));
195                                pb.suspend(|| {
196                                    log::info!("{disp_str}");
197                                })
198                            }
199                        }
200                        let current_progress = progress_counter.fetch_add(1, Ordering::SeqCst);
201                        let (global_pb, _) = p_bars.last().unwrap();
202                        global_pb.set_position(current_progress as u64 + 1);
203
204                        let items_remaining = total - (current_progress + 1);
205
206                        if items_remaining < thread_count {
207                            // Look for progress bars to clean up
208                            let mut inactive_count = 0;
209                            for (p, is_active) in &p_bars {
210                                if !is_active.load(Ordering::SeqCst) {
211                                    inactive_count += 1;
212                                    if inactive_count > 2 {
213                                        is_active.store(true, Ordering::SeqCst);
214                                        p.finish_and_clear();
215                                    }
216                                }
217                            }
218                            if items_remaining == 0 {
219                                pb.finish_and_clear();
220                            } else {
221                                pb.set_style(waiting_style.clone());
222                                pb.disable_steady_tick();
223                                pb.set_message(format!("{} waiting...", Emoji("💤 ", "Zzz")));
224                                is_active.store(false, Ordering::SeqCst);
225                            }
226                        } else {
227                            pb.reset_elapsed();
228                            is_active.store(false, Ordering::SeqCst);
229                        }
230
231                        compr_res
232                    })
233                    .collect();
234                compression_results
235            })
236            .join()
237            .unwrap();
238
239        res
240    });
241
242    Ok(res)
243}
244
245fn style_global_tracker() -> anyhow::Result<ProgressStyle> {
246    static CLOCK_12: Emoji<'_, '_> = Emoji("🕛", "⠁");
247    static CLOCK_1: Emoji<'_, '_> = Emoji("🕐", "⠂");
248    static CLOCK_2: Emoji<'_, '_> = Emoji("🕑", "⠄");
249    static CLOCK_3: Emoji<'_, '_> = Emoji("🕒", "⡀");
250    static CLOCK_4: Emoji<'_, '_> = Emoji("🕓", "⢀");
251    static CLOCK_5: Emoji<'_, '_> = Emoji("🕔", "⠠");
252    static CLOCK_6: Emoji<'_, '_> = Emoji("🕕", "⠐");
253    static CLOCK_7: Emoji<'_, '_> = Emoji("🕖", "⠈");
254    static CLOCK_8: Emoji<'_, '_> = Emoji("🕗", " ");
255    static CLOCK_9: Emoji<'_, '_> = Emoji("🕘", "⠁");
256    static CLOCK_10: Emoji<'_, '_> = Emoji("🕙", "⠁");
257    static CLOCK_11: Emoji<'_, '_> = Emoji("🕚", "⠁");
258    let emoji_clock_frames = [
259        format!("{}", CLOCK_12),
260        format!("{}", CLOCK_1),
261        format!("{}", CLOCK_2),
262        format!("{}", CLOCK_3),
263        format!("{}", CLOCK_4),
264        format!("{}", CLOCK_5),
265        format!("{}", CLOCK_6),
266        format!("{}", CLOCK_7),
267        format!("{}", CLOCK_8),
268        format!("{}", CLOCK_9),
269        format!("{}", CLOCK_10),
270        format!("{}", CLOCK_11),
271    ];
272    let emoji_clock_frames_str: Vec<&str> = emoji_clock_frames.iter().map(|e| e.as_str()).collect();
273
274    Ok(ProgressStyle::default_bar()
275        .template(
276            "{spinner:.blue} {prefix:.bold.dim} [{elapsed_precise:.bold.dim}]{pos:>3}/{len:3.bold.green}[{wide_bar:.blue}] ({eta})",
277        )?
278        .progress_chars("##-")
279        .tick_strings(&emoji_clock_frames_str))
280}
281
282fn prefix_global_tracker(thread_count: usize) -> std::string::String {
283    format!(
284        "{thread_count} {} (max) {abacus}",
285        if thread_count > 1 {
286            "workers"
287        } else {
288            "worker"
289        },
290        abacus = Emoji("🧮", ""),
291    )
292}
293
294fn working_style() -> ProgressStyle {
295    let moon_frames = ["🌑 ", "🌒 ", "🌓 ", "🌔 ", "🌕 ", "🌖 ", "🌗 ", "🌘 "];
296    ProgressStyle::default_bar()
297        .template("{spinner:.blue}{elapsed:>4.dim} {msg:>18.bold}")
298        .expect("Failed setting progress bar template")
299        .tick_strings(&moon_frames)
300}