1use crate::args::Split;
2use crate::utils::cli_result::CliResult;
3use crate::utils::constants::COMMA;
4use crate::utils::excel::datatype_vec_to_string_vec;
5use crate::utils::filename::{dir_file, str_to_filename};
6use crate::utils::progress::Progress;
7use crate::utils::reader::{ExcelChunkTask, ExcelReader};
8use crate::utils::util::{datetime_str, werr_exit};
9use crate::utils::writer::Writer;
10use crossbeam_channel::bounded;
11use dashmap::DashMap;
12use rayon::prelude::*;
13use std::fs::create_dir;
14use std::path::Path;
15use std::thread;
16
17impl Split {
18 pub fn excel_run(&self) -> CliResult {
19 let path = &self.path();
20 let is_sequential_split = self.size.is_some();
21
22 let dir = path.with_file_name(format!(
24 "{}-split-{}",
25 path.file_stem().unwrap().to_string_lossy(),
26 datetime_str()
27 ));
28 create_dir(&dir)?;
29
30 let mut range = ExcelReader::new(path, self.sheet)?;
32 let first_row = if self.no_header {
33 String::new()
34 } else {
35 let Some(r) = range.next() else {
36 return Ok(());
37 };
38 if self.col >= r.len() {
39 werr_exit!("Error: column index out of range!");
40 };
41 datatype_vec_to_string_vec(r).join(",")
42 };
43
44 let (tx, rx) = bounded(1);
45 let buffer_size = if is_sequential_split { self.size } else { None };
47 thread::spawn(move || range.send_to_channel_by_chunk(tx, buffer_size));
48
49 let mut prog = Progress::new();
51 match is_sequential_split {
52 true => {
53 let stem = path.file_stem().unwrap().to_string_lossy();
54 for task in rx {
55 let mut out = dir.to_owned();
56 out.push(format!("{}-split{}.csv", stem, task.chunk));
57 sequential_task_handle(task, &mut prog, &out, &first_row)?;
58 }
59 }
60 false => {
61 let header_inserted: DashMap<String, bool> = DashMap::new();
62 for task in rx {
63 task_handle(&self, task, &mut prog, &dir, &first_row, &header_inserted)?;
64 }
65 }
66 }
67
68 println!("\nSaved to directory: {}", dir.display());
69
70 Ok(())
71 }
72}
73
74#[allow(clippy::too_many_arguments)]
75fn sequential_task_handle(
76 task: ExcelChunkTask,
77 prog: &mut Progress,
78 out: &Path,
79 first_row: &str,
80) -> CliResult {
81 prog.add_chunks(1);
83 prog.add_lines(task.n);
84
85 let mut wtr = Writer::append_to(out)?;
87 wtr.write_header(first_row)?;
88 wtr.write_excel_lines(&task.lines, COMMA)?;
89
90 prog.print();
91
92 Ok(())
93}
94
95#[allow(clippy::too_many_arguments)]
96fn task_handle(
97 options: &Split,
98 task: ExcelChunkTask,
99 prog: &mut Progress,
100 dir: &Path,
101 first_row: &str,
102 header_inserted: &DashMap<String, bool>,
103) -> CliResult {
104 prog.add_chunks(1);
106 prog.add_lines(task.n);
107
108 let batch_work = DashMap::new();
110 task.lines.par_iter().for_each(|r| {
111 if options.col >= r.len() {
112 println!("[info] ignore a bad line, content is: {r:?}!");
113 } else {
114 batch_work
115 .entry(r[options.col].to_string())
116 .or_insert_with(Vec::new)
117 .push(r);
118 }
119 });
120
121 batch_work
123 .into_iter()
124 .collect::<Vec<(_, _)>>()
125 .par_iter()
126 .for_each(|(field, rows)| {
127 let filename = str_to_filename(field) + ".csv";
129 let out = dir_file(dir, &filename);
130 let mut wtr = Writer::append_to(&out).unwrap();
132 if !options.no_header && !header_inserted.contains_key(&filename) {
133 header_inserted.insert(filename, true);
134 wtr.write_str(first_row).unwrap();
135 }
136 wtr.write_excel_lines_by_ref(rows, COMMA).unwrap();
137 });
138
139 prog.print();
140
141 Ok(())
142}