rsv_lib/csv/
split.rs

1use crate::args::Split;
2use crate::utils::cli_result::CliResult;
3use crate::utils::filename::{dir_file, str_to_filename};
4use crate::utils::progress::Progress;
5use crate::utils::reader::{ChunkReader, Task};
6use crate::utils::util::{datetime_str, werr_exit};
7use crate::utils::writer::Writer;
8use crossbeam_channel::bounded;
9use dashmap::DashMap;
10use rayon::prelude::*;
11use std::fs::create_dir;
12use std::path::Path;
13use std::thread;
14
15impl Split {
16    pub fn csv_run(&self) -> CliResult {
17        let path = &self.path();
18        let is_sequential_split = self.size.is_some();
19
20        // new directory
21        let dir = path.with_file_name(format!(
22            "{}-split-{}",
23            path.file_stem().unwrap().to_string_lossy(),
24            datetime_str()
25        ));
26        create_dir(&dir)?;
27
28        // open file and header
29        let mut rdr = ChunkReader::new(path)?;
30        let first_row = if self.no_header {
31            String::new()
32        } else {
33            let Some(r) = rdr.next() else {
34                return Ok(());
35            };
36            let r = r?;
37            if self.col >= self.row_field_count(&r) {
38                werr_exit!("column index out of range!");
39            }
40            r
41        };
42
43        // work pip
44        let (tx, rx) = bounded(1);
45
46        // read
47        let line_buffer_n = match is_sequential_split {
48            true => self.size.unwrap(),
49            false => 50_000,
50        };
51        thread::spawn(move || rdr.send_to_channel_by_chunks(tx, line_buffer_n));
52
53        // process batch work
54        let mut prog = Progress::new();
55        match is_sequential_split {
56            true => {
57                let stem = path.file_stem().unwrap().to_string_lossy();
58                let extension = path
59                    .extension()
60                    .and_then(|i| i.to_str())
61                    .unwrap_or_default();
62
63                for task in rx {
64                    let mut out = dir.to_owned();
65                    out.push(format!("{}-split{}.{}", stem, task.chunk, extension));
66                    sequential_task_handle(task, &mut prog, &out, &first_row)?;
67                }
68            }
69            false => {
70                let header_inserted: DashMap<String, bool> = DashMap::new();
71                for task in rx {
72                    task_handle(&self, task, &mut prog, &dir, &first_row, &header_inserted)?;
73                }
74            }
75        }
76
77        println!("\nSaved to directory: {}", dir.display());
78
79        Ok(())
80    }
81}
82
83fn sequential_task_handle(
84    task: Task,
85    prog: &mut Progress,
86    out: &Path,
87    first_row: &str,
88) -> CliResult {
89    // progress
90    prog.add_chunks(1);
91    prog.add_bytes(task.bytes);
92
93    // write
94    let mut wtr = Writer::append_to(out)?;
95    wtr.write_header(first_row)?;
96    wtr.write_strings(&task.lines)?;
97
98    prog.print();
99
100    Ok(())
101}
102
103#[allow(clippy::too_many_arguments)]
104fn task_handle(
105    args: &Split,
106    task: Task,
107    prog: &mut Progress,
108    dir: &Path,
109    first_row: &str,
110    header_inserted: &DashMap<String, bool>,
111) -> CliResult {
112    // progress
113    prog.add_chunks(1);
114    prog.add_bytes(task.bytes);
115
116    // parallel process
117    let batch_work = DashMap::new();
118    task.lines.par_iter().for_each(|r| {
119        let seg = args.split_row_to_vec(r);
120        if args.col >= r.len() {
121            println!("[info] ignore a bad line, content is: {r:?}!");
122            return;
123        }
124        batch_work
125            .entry(seg[args.col])
126            .or_insert_with(Vec::new)
127            .push(r);
128    });
129
130    // parallel save to disk
131    batch_work
132        .into_iter()
133        .collect::<Vec<(_, _)>>()
134        .par_iter()
135        .for_each(|(field, rows)| {
136            // file path
137            let filename = str_to_filename(field) + ".csv";
138            let out = dir_file(dir, &filename);
139
140            // write
141            let mut wtr = Writer::append_to(&out).unwrap();
142            if !args.no_header && !header_inserted.contains_key(&filename) {
143                header_inserted.insert(filename, true);
144                wtr.write_str(first_row).unwrap();
145            }
146            wtr.write_strings(rows).unwrap();
147        });
148
149    prog.print();
150
151    Ok(())
152}