1use crate::args::Split;
2use crate::utils::cli_result::CliResult;
3use crate::utils::filename::{dir_file, new_file, str_to_filename};
4use crate::utils::util::datetime_str;
5use crate::utils::writer::Writer;
6use dashmap::DashMap;
7use rayon::prelude::*;
8use std::fs::create_dir;
9use std::io::{stdin, BufRead};
10use std::path::Path;
11
12impl Split {
13 pub fn io_run(&self) -> CliResult {
14 let is_sequential_split = self.size.is_some();
15
16 let dir = format!("split-{}", datetime_str());
18 let dir = new_file(&dir);
19 create_dir(&dir)?;
20
21 let mut rdr = stdin().lock().lines();
23 let first_row = if self.no_header {
24 String::new()
25 } else {
26 let Some(r) = rdr.next() else { return Ok(()) };
27 r?
28 };
29
30 let header_inserted: DashMap<String, bool> = DashMap::new();
31 let mut n = 0;
32 let buffer = if is_sequential_split {
33 self.size.unwrap()
34 } else {
35 10000
36 };
37 let mut chunk = 1;
38 let mut lines = Vec::with_capacity(buffer);
39 for r in rdr {
40 let r = r?;
41 n += 1;
42 lines.push(r);
43 if n >= buffer {
44 task_handle(self, chunk, lines, &dir, &first_row, &header_inserted)?;
45 lines = Vec::with_capacity(buffer);
46 n = 0;
47 chunk += 1;
48 }
49 }
50
51 if !lines.is_empty() {
52 task_handle(self, chunk, lines, &dir, &first_row, &header_inserted)?;
53 }
54
55 println!("Saved to directory: {}", dir.display());
56
57 Ok(())
58 }
59}
60
61#[allow(clippy::too_many_arguments)]
62fn task_handle(
63 args: &Split,
64 chunk: usize,
65 lines: Vec<String>,
66 dir: &Path,
67 first_row: &str,
68 header_inserted: &DashMap<String, bool>,
69) -> CliResult {
70 match args.size.is_some() {
71 true => sequential_task_handle(chunk, lines, dir, first_row)?,
72 false => col_split_task_handle(args, lines, dir, first_row, header_inserted)?,
73 };
74
75 Ok(())
76}
77
78fn sequential_task_handle(
79 chunk: usize,
80 lines: Vec<String>,
81 dir: &Path,
82 first_row: &str,
83) -> CliResult {
84 let mut out = dir.to_owned();
85 out.push(format!("split{}.csv", chunk));
86
87 let mut wtr = Writer::append_to(&out)?;
89 wtr.write_header(first_row)?;
90 wtr.write_strings(&lines)?;
91
92 Ok(())
93}
94
95#[allow(clippy::too_many_arguments)]
96fn col_split_task_handle(
97 args: &Split,
98 lines: Vec<String>,
99 dir: &Path,
100 first_row: &str,
101 header_inserted: &DashMap<String, bool>,
102) -> CliResult {
103 let batch_work = DashMap::new();
105
106 lines.par_iter().for_each(|r| {
107 let seg = args.split_row_to_vec(r);
108 if args.col >= r.len() {
109 println!("[info] ignore a bad line, content is: {r:?}!");
110 return;
111 }
112 batch_work
113 .entry(seg[args.col])
114 .or_insert_with(Vec::new)
115 .push(r)
116 });
117
118 batch_work
120 .into_iter()
121 .collect::<Vec<(_, _)>>()
122 .par_iter()
123 .for_each(|(field, rows)| {
124 let filename = str_to_filename(field) + ".csv";
126 let out = dir_file(dir, &filename);
127
128 let mut wtr = Writer::append_to(&out).unwrap();
130 if !args.no_header && !header_inserted.contains_key(&filename) {
131 header_inserted.insert(filename, true);
132 wtr.write_str(first_row).unwrap();
133 }
134 wtr.write_strings(rows).unwrap();
135 });
136
137 Ok(())
138}