rsv_lib/io/
split.rs

1use crate::args::Split;
2use crate::utils::cli_result::CliResult;
3use crate::utils::filename::{dir_file, new_file, str_to_filename};
4use crate::utils::util::datetime_str;
5use crate::utils::writer::Writer;
6use dashmap::DashMap;
7use rayon::prelude::*;
8use std::fs::create_dir;
9use std::io::{stdin, BufRead};
10use std::path::Path;
11
12impl Split {
13    pub fn io_run(&self) -> CliResult {
14        let is_sequential_split = self.size.is_some();
15
16        // new directory
17        let dir = format!("split-{}", datetime_str());
18        let dir = new_file(&dir);
19        create_dir(&dir)?;
20
21        // open file and header
22        let mut rdr = stdin().lock().lines();
23        let first_row = if self.no_header {
24            String::new()
25        } else {
26            let Some(r) = rdr.next() else { return Ok(()) };
27            r?
28        };
29
30        let header_inserted: DashMap<String, bool> = DashMap::new();
31        let mut n = 0;
32        let buffer = if is_sequential_split {
33            self.size.unwrap()
34        } else {
35            10000
36        };
37        let mut chunk = 1;
38        let mut lines = Vec::with_capacity(buffer);
39        for r in rdr {
40            let r = r?;
41            n += 1;
42            lines.push(r);
43            if n >= buffer {
44                task_handle(self, chunk, lines, &dir, &first_row, &header_inserted)?;
45                lines = Vec::with_capacity(buffer);
46                n = 0;
47                chunk += 1;
48            }
49        }
50
51        if !lines.is_empty() {
52            task_handle(self, chunk, lines, &dir, &first_row, &header_inserted)?;
53        }
54
55        println!("Saved to directory: {}", dir.display());
56
57        Ok(())
58    }
59}
60
61#[allow(clippy::too_many_arguments)]
62fn task_handle(
63    args: &Split,
64    chunk: usize,
65    lines: Vec<String>,
66    dir: &Path,
67    first_row: &str,
68    header_inserted: &DashMap<String, bool>,
69) -> CliResult {
70    match args.size.is_some() {
71        true => sequential_task_handle(chunk, lines, dir, first_row)?,
72        false => col_split_task_handle(args, lines, dir, first_row, header_inserted)?,
73    };
74
75    Ok(())
76}
77
78fn sequential_task_handle(
79    chunk: usize,
80    lines: Vec<String>,
81    dir: &Path,
82    first_row: &str,
83) -> CliResult {
84    let mut out = dir.to_owned();
85    out.push(format!("split{}.csv", chunk));
86
87    // write
88    let mut wtr = Writer::append_to(&out)?;
89    wtr.write_header(first_row)?;
90    wtr.write_strings(&lines)?;
91
92    Ok(())
93}
94
95#[allow(clippy::too_many_arguments)]
96fn col_split_task_handle(
97    args: &Split,
98    lines: Vec<String>,
99    dir: &Path,
100    first_row: &str,
101    header_inserted: &DashMap<String, bool>,
102) -> CliResult {
103    // parallel process
104    let batch_work = DashMap::new();
105
106    lines.par_iter().for_each(|r| {
107        let seg = args.split_row_to_vec(r);
108        if args.col >= r.len() {
109            println!("[info] ignore a bad line, content is: {r:?}!");
110            return;
111        }
112        batch_work
113            .entry(seg[args.col])
114            .or_insert_with(Vec::new)
115            .push(r)
116    });
117
118    // parallel save to disk
119    batch_work
120        .into_iter()
121        .collect::<Vec<(_, _)>>()
122        .par_iter()
123        .for_each(|(field, rows)| {
124            // file path
125            let filename = str_to_filename(field) + ".csv";
126            let out = dir_file(dir, &filename);
127
128            // write
129            let mut wtr = Writer::append_to(&out).unwrap();
130            if !args.no_header && !header_inserted.contains_key(&filename) {
131                header_inserted.insert(filename, true);
132                wtr.write_str(first_row).unwrap();
133            }
134            wtr.write_strings(rows).unwrap();
135        });
136
137    Ok(())
138}