use crate::utils::chunk_reader::{ChunkReader, Task};
use crate::utils::cli_result::CliResult;
use crate::utils::file::estimate_line_count_by_mb;
use crate::utils::filename::{dir_file, str_to_filename};
use crate::utils::progress::Progress;
use crate::utils::util::{datetime_str, werr};
use crate::utils::writer::Writer;
use crossbeam_channel::bounded;
use dashmap::DashMap;
use rayon::prelude::*;
use std::error::Error;
use std::fs::create_dir;
use std::path::Path;
use std::{process, thread};
pub fn run(path: &Path, no_header: bool, sep: &str, col: usize, size: &Option<usize>) -> CliResult {
let is_sequential_split = size.is_some();
let dir = path.with_file_name(format!(
"{}-split-{}",
path.file_stem().unwrap().to_string_lossy(),
datetime_str()
));
create_dir(&dir)?;
let mut rdr = ChunkReader::new(path)?;
let first_row = if no_header {
"".to_owned()
} else {
let first_row = match rdr.next() {
Some(v) => v?,
None => return Ok(()),
};
if col >= first_row.split(sep).count() {
werr!("column index out of range!");
process::exit(1)
}
first_row
};
let (tx, rx) = bounded(1);
let line_buffer_n = match is_sequential_split {
true => size.unwrap(),
false => estimate_line_count_by_mb(path, Some(50)),
};
thread::spawn(move || rdr.send_to_channel_by_chunks(tx, line_buffer_n));
let mut prog = Progress::new();
match is_sequential_split {
true => {
let stem = path.file_stem().unwrap().to_string_lossy();
let extension = path.extension().and_then(|i| i.to_str()).unwrap_or("");
for task in rx {
let mut out = dir.to_owned();
out.push(format!("{}-split{}.{}", stem, task.chunk, extension));
sequential_task_handle(task, &mut prog, &out, &first_row)?;
}
}
false => {
let header_inserted: DashMap<String, bool> = DashMap::new();
for task in rx {
task_handle(
task,
&mut prog,
sep,
no_header,
col,
&dir,
&first_row,
&header_inserted,
)?
}
}
}
println!("\nSaved to directory: {}", dir.display());
Ok(())
}
fn sequential_task_handle(
task: Task,
prog: &mut Progress,
out: &Path,
first_row: &str,
) -> Result<(), Box<dyn Error>> {
prog.add_chunks(1);
prog.add_bytes(task.bytes);
let mut wtr = Writer::append_to(out)?;
wtr.write_header(first_row)?;
wtr.write_lines(&task.lines)?;
prog.print();
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn task_handle(
task: Task,
prog: &mut Progress,
sep: &str,
no_header: bool,
col: usize,
dir: &Path,
first_row: &str,
header_inserted: &DashMap<String, bool>,
) -> Result<(), Box<dyn Error>> {
prog.add_chunks(1);
prog.add_bytes(task.bytes);
let batch_work = DashMap::new();
task.lines.par_iter().for_each(|r| {
let seg = r.split(sep).collect::<Vec<_>>();
if col >= r.len() {
println!("[info] ignore a bad line, content is: {r:?}!");
} else {
batch_work.entry(seg[col]).or_insert_with(Vec::new).push(r);
}
});
batch_work
.into_iter()
.collect::<Vec<(_, _)>>()
.par_iter()
.for_each(|(field, rows)| {
save_to_disk(dir, field, rows, no_header, header_inserted, first_row).unwrap();
});
prog.print();
Ok(())
}
fn save_to_disk(
dir: &Path,
field: &str,
rows: &[&String],
no_header: bool,
header_inserted: &DashMap<String, bool>,
first_row: &str,
) -> Result<(), Box<dyn Error>> {
let filename = str_to_filename(field) + ".csv";
let out = dir_file(dir, &filename);
let mut wtr = Writer::append_to(&out)?;
if !no_header && !header_inserted.contains_key(&filename) {
header_inserted.insert(filename, true);
wtr.write_line(first_row)?
}
wtr.write_lines(rows)?;
Ok(())
}