1use crate::args::Split;
2use crate::utils::cli_result::CliResult;
3use crate::utils::filename::{dir_file, str_to_filename};
4use crate::utils::progress::Progress;
5use crate::utils::reader::{ChunkReader, Task};
6use crate::utils::util::{datetime_str, werr_exit};
7use crate::utils::writer::Writer;
8use crossbeam_channel::bounded;
9use dashmap::DashMap;
10use rayon::prelude::*;
11use std::fs::create_dir;
12use std::path::Path;
13use std::thread;
14
15impl Split {
16 pub fn csv_run(&self) -> CliResult {
17 let path = &self.path();
18 let is_sequential_split = self.size.is_some();
19
20 let dir = path.with_file_name(format!(
22 "{}-split-{}",
23 path.file_stem().unwrap().to_string_lossy(),
24 datetime_str()
25 ));
26 create_dir(&dir)?;
27
28 let mut rdr = ChunkReader::new(path)?;
30 let first_row = if self.no_header {
31 String::new()
32 } else {
33 let Some(r) = rdr.next() else {
34 return Ok(());
35 };
36 let r = r?;
37 if self.col >= self.row_field_count(&r) {
38 werr_exit!("column index out of range!");
39 }
40 r
41 };
42
43 let (tx, rx) = bounded(1);
45
46 let line_buffer_n = match is_sequential_split {
48 true => self.size.unwrap(),
49 false => 50_000,
50 };
51 thread::spawn(move || rdr.send_to_channel_by_chunks(tx, line_buffer_n));
52
53 let mut prog = Progress::new();
55 match is_sequential_split {
56 true => {
57 let stem = path.file_stem().unwrap().to_string_lossy();
58 let extension = path
59 .extension()
60 .and_then(|i| i.to_str())
61 .unwrap_or_default();
62
63 for task in rx {
64 let mut out = dir.to_owned();
65 out.push(format!("{}-split{}.{}", stem, task.chunk, extension));
66 sequential_task_handle(task, &mut prog, &out, &first_row)?;
67 }
68 }
69 false => {
70 let header_inserted: DashMap<String, bool> = DashMap::new();
71 for task in rx {
72 task_handle(&self, task, &mut prog, &dir, &first_row, &header_inserted)?;
73 }
74 }
75 }
76
77 println!("\nSaved to directory: {}", dir.display());
78
79 Ok(())
80 }
81}
82
83fn sequential_task_handle(
84 task: Task,
85 prog: &mut Progress,
86 out: &Path,
87 first_row: &str,
88) -> CliResult {
89 prog.add_chunks(1);
91 prog.add_bytes(task.bytes);
92
93 let mut wtr = Writer::append_to(out)?;
95 wtr.write_header(first_row)?;
96 wtr.write_strings(&task.lines)?;
97
98 prog.print();
99
100 Ok(())
101}
102
103#[allow(clippy::too_many_arguments)]
104fn task_handle(
105 args: &Split,
106 task: Task,
107 prog: &mut Progress,
108 dir: &Path,
109 first_row: &str,
110 header_inserted: &DashMap<String, bool>,
111) -> CliResult {
112 prog.add_chunks(1);
114 prog.add_bytes(task.bytes);
115
116 let batch_work = DashMap::new();
118 task.lines.par_iter().for_each(|r| {
119 let seg = args.split_row_to_vec(r);
120 if args.col >= r.len() {
121 println!("[info] ignore a bad line, content is: {r:?}!");
122 return;
123 }
124 batch_work
125 .entry(seg[args.col])
126 .or_insert_with(Vec::new)
127 .push(r);
128 });
129
130 batch_work
132 .into_iter()
133 .collect::<Vec<(_, _)>>()
134 .par_iter()
135 .for_each(|(field, rows)| {
136 let filename = str_to_filename(field) + ".csv";
138 let out = dir_file(dir, &filename);
139
140 let mut wtr = Writer::append_to(&out).unwrap();
142 if !args.no_header && !header_inserted.contains_key(&filename) {
143 header_inserted.insert(filename, true);
144 wtr.write_str(first_row).unwrap();
145 }
146 wtr.write_strings(rows).unwrap();
147 });
148
149 prog.print();
150
151 Ok(())
152}