1use std::{
2 io::Read,
3 sync::{
4 atomic::{AtomicBool, AtomicUsize, Ordering},
5 Arc,
6 },
7 time::{Duration, Instant},
8};
9
10use crate::{
11 config::{
12 compression::{
13 Bzip2Args, Compression, CompressionRange, CompressionVariant, GzipArgs, XzArgs,
14 },
15 evaluate_compression::EvaluateCompressionArgs,
16 },
17 send::util::file_with_bufreader,
18};
19use anyhow::{bail, Result};
20use console::Emoji;
21use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
22use rayon::iter::{IntoParallelIterator, ParallelIterator};
23use strum::IntoEnumIterator;
24
25pub mod compression_result;
26use compression_result::{Awaiting, CompressionResult, Finished};
27
28mod print_results;
29mod test_compress;
30
31pub fn evaluate_compression(args: EvaluateCompressionArgs) -> Result<()> {
32 let EvaluateCompressionArgs {
33 input_file,
34 omit,
35 mut omit_levels,
36 threads,
37 } = args;
38
39 omit_levels.sort_unstable();
40 let compression_set: Vec<Compression> = Compression::iter().collect();
41
42 if !omit.is_empty() {
43 let mut print_str = String::from("Omitting: ");
44 for compr in &omit {
45 print_str.push_str(&format!(" {compr}"));
46 }
47 log::info!("{print_str}");
48 }
49
50 let evaluate_compressions: Vec<Compression> = compression_set
51 .into_iter()
52 .filter(|c| !omit.contains(c.into()))
53 .collect();
54
55 if !evaluate_compressions.is_empty() {
56 let mut print_str = String::from("Evaluating:");
57 for compr in &evaluate_compressions {
58 print_str.push_str(&format!(" {compr}"));
59 }
60 log::info!("{print_str}");
61 }
62
63 if !omit_levels.is_empty() {
64 let mut print_str = String::from("Omitting compression levels (where applicable):");
65 for compr_lvls in &omit_levels {
66 print_str.push_str(&format!(" {compr_lvls}"));
67 }
68 log::info!("{print_str}");
69 }
70
71 let mut bufreader = file_with_bufreader(&input_file)?;
72
73 let start = Instant::now();
74 let mut test_contents = Vec::new();
75 bufreader.read_to_end(&mut test_contents)?;
76 let elapsed = start.elapsed();
77 let test_contents_len = test_contents.len();
78 if test_contents_len == 0 {
79 bail!("Invalid content size of 0, please provide a non-empty file")
80 }
81 log::info!("Buffered reading {test_contents_len} B contents in {elapsed:?}");
82
83 let mut compression_awaiting: Vec<CompressionResult<Awaiting>> = Vec::new();
84
85 if evaluate_compressions.contains(&Compression::Lz4) {
86 compression_awaiting.push(CompressionResult::new(Compression::Lz4));
87 }
88
89 if !omit.contains(&CompressionVariant::Bzip2) {
90 for compression_level in <Bzip2Args>::range_u8_with_omit(&omit_levels) {
91 compression_awaiting.push(CompressionResult::new(Compression::Bzip2(Bzip2Args {
92 compression_level,
93 })));
94 }
95 }
96 if !omit.contains(&CompressionVariant::Gzip) {
97 for compression_level in <GzipArgs>::range_u8_with_omit(&omit_levels) {
98 compression_awaiting.push(CompressionResult::new(Compression::Gzip(GzipArgs {
99 compression_level,
100 })));
101 }
102 }
103 if !omit.contains(&CompressionVariant::Xz) {
104 for compression_level in <XzArgs>::range_u8_with_omit(&omit_levels) {
105 compression_awaiting.push(CompressionResult::new(Compression::Xz(XzArgs {
106 compression_level,
107 })));
108 }
109 }
110
111 log::info!(
112 "Evaluating {} compression combinations",
113 compression_awaiting.len()
114 );
115 if threads == 1 {
116 log::info!("Running sequentially on the main thread");
117 } else {
118 log::info!("Running sequentially with up to {threads} threads");
119 }
120 rayon::ThreadPoolBuilder::new()
121 .num_threads(threads)
122 .build_global()?;
123
124 let res = multi_progress_bar(compression_awaiting, &test_contents, threads)?;
125
126 print_results::evaluate_and_printout_results(&res);
127
128 Ok(())
129}
130
131fn multi_progress_bar(
132 compression_awaiting: Vec<CompressionResult<Awaiting>>,
133 test_contents: &Vec<u8>,
134 thread_count: usize,
135) -> anyhow::Result<Vec<CompressionResult<Finished>>> {
136 let multi_bar: MultiProgress = MultiProgress::new();
137
138 let waiting_style =
139 ProgressStyle::default_bar().template("{prefix:.bold.dim} {msg:>30.dim}")?;
140
141 let progress_counter = Arc::new(AtomicUsize::new(0));
142 let total = compression_awaiting.len();
143
144 let p_bars: Vec<_> = (0..=thread_count)
146 .map(|i| {
147 if i == thread_count {
148 let pb = multi_bar.insert_from_back(thread_count, ProgressBar::new(total as u64));
149 pb.set_style(
150 style_global_tracker().expect("Failed to set global progress bar style"),
151 );
152 pb.set_prefix(prefix_global_tracker(thread_count));
153 pb.enable_steady_tick(Duration::from_millis(200));
154 (pb, AtomicBool::new(true))
155 } else {
156 let pb = multi_bar.add(ProgressBar::new(total as u64));
157 (pb, AtomicBool::new(false))
158 }
159 })
160 .collect();
161
162 let res = std::thread::scope(|s| {
163 let res = s
164 .spawn(|| {
165 let compression_results: Vec<CompressionResult<Finished>> = compression_awaiting
166 .into_par_iter()
167 .flat_map(|cr_await| {
168 let (pb, is_active) = p_bars
169 .iter()
170 .find(|(_, is_active)| {
171 is_active
172 .compare_exchange(
173 false,
174 true,
175 Ordering::SeqCst,
176 Ordering::SeqCst,
177 )
178 .is_ok()
179 })
180 .unwrap();
181 pb.set_style(working_style());
182 pb.enable_steady_tick(Duration::from_millis(100));
183 pb.set_message(cr_await.compression.describe_str());
184
185 let compr_res = cr_await.run(test_contents).ok();
186 if let Some(ref compr_res) = compr_res {
187 {
188 let format = compr_res.compression_format();
189 let mut table: String = compr_res.summarize_as_table();
190 let mut disp_str =
191 String::with_capacity(format.len() + 1 + table.len());
192 disp_str.push_str(format);
193 disp_str.push('\n');
194 disp_str.extend(table.drain(..));
195 pb.suspend(|| {
196 log::info!("{disp_str}");
197 })
198 }
199 }
200 let current_progress = progress_counter.fetch_add(1, Ordering::SeqCst);
201 let (global_pb, _) = p_bars.last().unwrap();
202 global_pb.set_position(current_progress as u64 + 1);
203
204 let items_remaining = total - (current_progress + 1);
205
206 if items_remaining < thread_count {
207 let mut inactive_count = 0;
209 for (p, is_active) in &p_bars {
210 if !is_active.load(Ordering::SeqCst) {
211 inactive_count += 1;
212 if inactive_count > 2 {
213 is_active.store(true, Ordering::SeqCst);
214 p.finish_and_clear();
215 }
216 }
217 }
218 if items_remaining == 0 {
219 pb.finish_and_clear();
220 } else {
221 pb.set_style(waiting_style.clone());
222 pb.disable_steady_tick();
223 pb.set_message(format!("{} waiting...", Emoji("💤 ", "Zzz")));
224 is_active.store(false, Ordering::SeqCst);
225 }
226 } else {
227 pb.reset_elapsed();
228 is_active.store(false, Ordering::SeqCst);
229 }
230
231 compr_res
232 })
233 .collect();
234 compression_results
235 })
236 .join()
237 .unwrap();
238
239 res
240 });
241
242 Ok(res)
243}
244
245fn style_global_tracker() -> anyhow::Result<ProgressStyle> {
246 static CLOCK_12: Emoji<'_, '_> = Emoji("🕛", "⠁");
247 static CLOCK_1: Emoji<'_, '_> = Emoji("🕐", "⠂");
248 static CLOCK_2: Emoji<'_, '_> = Emoji("🕑", "⠄");
249 static CLOCK_3: Emoji<'_, '_> = Emoji("🕒", "⡀");
250 static CLOCK_4: Emoji<'_, '_> = Emoji("🕓", "⢀");
251 static CLOCK_5: Emoji<'_, '_> = Emoji("🕔", "⠠");
252 static CLOCK_6: Emoji<'_, '_> = Emoji("🕕", "⠐");
253 static CLOCK_7: Emoji<'_, '_> = Emoji("🕖", "⠈");
254 static CLOCK_8: Emoji<'_, '_> = Emoji("🕗", " ");
255 static CLOCK_9: Emoji<'_, '_> = Emoji("🕘", "⠁");
256 static CLOCK_10: Emoji<'_, '_> = Emoji("🕙", "⠁");
257 static CLOCK_11: Emoji<'_, '_> = Emoji("🕚", "⠁");
258 let emoji_clock_frames = [
259 format!("{}", CLOCK_12),
260 format!("{}", CLOCK_1),
261 format!("{}", CLOCK_2),
262 format!("{}", CLOCK_3),
263 format!("{}", CLOCK_4),
264 format!("{}", CLOCK_5),
265 format!("{}", CLOCK_6),
266 format!("{}", CLOCK_7),
267 format!("{}", CLOCK_8),
268 format!("{}", CLOCK_9),
269 format!("{}", CLOCK_10),
270 format!("{}", CLOCK_11),
271 ];
272 let emoji_clock_frames_str: Vec<&str> = emoji_clock_frames.iter().map(|e| e.as_str()).collect();
273
274 Ok(ProgressStyle::default_bar()
275 .template(
276 "{spinner:.blue} {prefix:.bold.dim} [{elapsed_precise:.bold.dim}]{pos:>3}/{len:3.bold.green}[{wide_bar:.blue}] ({eta})",
277 )?
278 .progress_chars("##-")
279 .tick_strings(&emoji_clock_frames_str))
280}
281
282fn prefix_global_tracker(thread_count: usize) -> std::string::String {
283 format!(
284 "{thread_count} {} (max) {abacus}",
285 if thread_count > 1 {
286 "workers"
287 } else {
288 "worker"
289 },
290 abacus = Emoji("🧮", ""),
291 )
292}
293
294fn working_style() -> ProgressStyle {
295 let moon_frames = ["🌑 ", "🌒 ", "🌓 ", "🌔 ", "🌕 ", "🌖 ", "🌗 ", "🌘 "];
296 ProgressStyle::default_bar()
297 .template("{spinner:.blue}{elapsed:>4.dim} {msg:>18.bold}")
298 .expect("Failed setting progress bar template")
299 .tick_strings(&moon_frames)
300}