rsv_lib/csv/
sample.rs

1use crate::args::Sample;
2use crate::utils::cli_result::CliResult;
3use crate::utils::filename::new_path;
4use crate::utils::priority_queue::PriorityQueue;
5use crate::utils::table::Table;
6use crate::utils::writer::Writer;
7use rand::rng;
8use rand::rngs::StdRng;
9use rand::{Rng, SeedableRng};
10use std::borrow::Cow;
11use std::fs::File;
12use std::io::{BufRead, BufReader};
13use std::path::Path;
14use std::time::Instant;
15
16impl Sample {
17    pub fn csv_run(&self) -> CliResult {
18        let path = &self.path();
19        let time_limit = (self.time_limit - 0.7).clamp(0.0, f32::MAX);
20
21        // open files
22        let mut rdr = BufReader::new(File::open(path)?);
23
24        // header
25        let mut buf = vec![];
26        let header = match self.no_header {
27            true => None,
28            false => match rdr.read_until(b'\n', &mut buf) {
29                Ok(_) => Some(String::from_utf8_lossy(&buf).trim().to_string()),
30                Err(_) => return Ok(()),
31            },
32        };
33        buf.clear();
34
35        // seed
36        let mut rng = match self.seed {
37            Some(s) => StdRng::seed_from_u64(s as u64),
38            None => StdRng::from_rng(&mut rng()),
39        };
40
41        // read
42        let mut queue = PriorityQueue::with_capacity(self.n);
43        let mut line_n = 0;
44        let time = Instant::now();
45        while let Ok(bytes_read) = rdr.read_until(b'\n', &mut buf) {
46            if bytes_read == 0 {
47                break;
48            }
49
50            let priority = rng.random::<f64>();
51            if queue.can_insert(priority) {
52                let line = buf[..bytes_read].to_owned();
53                queue.push(line_n, priority, line);
54            }
55
56            buf.clear();
57            line_n += 1;
58
59            if time_limit > 0.0 && line_n % 10000 == 0 && time.elapsed().as_secs_f32() >= time_limit
60            {
61                break;
62            }
63        }
64
65        match (self.export, self.show_number) {
66            (true, _) => write_to_file(path, header, queue),
67            (false, true) => print_to_stdout(header, queue),
68            (false, false) => print_to_stdout_no_number(header, queue),
69        }
70
71        Ok(())
72    }
73}
74
75fn write_to_file(path: &Path, header: Option<String>, queue: PriorityQueue<Vec<u8>>) {
76    // new file
77    let out = new_path(path, "-sampled");
78    let mut wtr = Writer::new(&out).unwrap();
79    if let Some(r) = header {
80        wtr.write_str_unchecked(r);
81    }
82    for r in queue.into_sorted_items() {
83        wtr.write_bytes_unchecked(&r.item);
84    }
85
86    println!("Saved to file: {}", out.display());
87}
88
89fn print_to_stdout(header: Option<String>, queue: PriorityQueue<Vec<u8>>) {
90    let mut table = Table::new();
91
92    if let Some(h) = header {
93        table.add_record([Cow::Borrowed("#"), Cow::Borrowed(""), Cow::from(h)]);
94    }
95
96    queue.into_sorted_items().into_iter().for_each(|i| {
97        table.add_record([
98            Cow::from(i.line_n_as_string()),
99            Cow::Borrowed("->"),
100            Cow::from(String::from_utf8_lossy(&i.item).trim().to_string()),
101        ])
102    });
103
104    table.print_blank_unchecked();
105}
106
107fn print_to_stdout_no_number(header: Option<String>, queue: PriorityQueue<Vec<u8>>) {
108    let mut wtr = Writer::stdout().unwrap();
109
110    if let Some(h) = header {
111        wtr.write_str_unchecked(h);
112    }
113
114    queue.into_sorted_items().into_iter().for_each(|i| {
115        wtr.write_bytes_unchecked(&i.item);
116    });
117}