static USAGE: &str = r#"
Sort an arbitrarily large CSV/text file using a multithreaded external sort algorithm.
This command does not work with <stdin>/<stdout>. Valid input, and output
files are expected.
Also, this command is not specific to CSV data, it sorts any text file on a
line-by-line basis. If sorting a non-CSV file, be sure to set --no-headers,
otherwise, the first line will not be included in the external sort.
Usage:
qsv extsort [options] <input> <output>
qsv extsort --help
External sort option:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
Common options:
-h, --help Display this message
-n, --no-headers When set, the first row will not be interpreted
as headers and will be sorted with the rest
of the rows. Otherwise, the first row will always
appear as the header row in the output.
"#;
use std::{
fs,
io::{self, prelude::*},
path,
};
use ext_sort::{buffer::mem::MemoryLimitedBufferBuilder, ExternalSorter, ExternalSorterBuilder};
use serde::Deserialize;
use sysinfo::{System, SystemExt};
use crate::{util, CliResult};
#[derive(Deserialize)]
struct Args {
arg_input: String,
arg_output: String,
flag_jobs: Option<usize>,
flag_no_headers: bool,
}
const MEMORY_LIMITED_BUFFER: u64 = 100 * 1_000_000; const RW_BUFFER_CAPACITY: usize = 1_000_000;
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let mem_limited_buffer = if System::IS_SUPPORTED {
let mut sys = System::new_all();
sys.refresh_memory();
(sys.total_memory() * 1000) / 10 } else {
MEMORY_LIMITED_BUFFER
};
log::info!("{mem_limited_buffer} bytes used for in memory mergesort buffer...");
let mut input_reader = io::BufReader::new(match fs::File::open(&args.arg_input) {
Ok(f) => f,
Err(e) => return fail_clierror!("Cannot read input file {e}"),
});
let mut output_writer = io::BufWriter::new(match fs::File::create(&args.arg_output) {
Ok(f) => f,
Err(e) => return fail_clierror!("Cannot create output file: {e}"),
});
let sorter: ExternalSorter<String, io::Error, MemoryLimitedBufferBuilder> =
match ExternalSorterBuilder::new()
.with_tmp_dir(path::Path::new("./"))
.with_buffer(MemoryLimitedBufferBuilder::new(mem_limited_buffer))
.with_rw_buf_size(RW_BUFFER_CAPACITY)
.with_threads_number(util::njobs(args.flag_jobs))
.build()
{
Ok(sorter) => sorter,
Err(e) => {
return fail_clierror!("cannot create external sorter: {e}");
}
};
let mut header = String::new();
if !args.flag_no_headers {
input_reader.read_line(&mut header)?;
}
let Ok(sorted) = sorter.sort(input_reader.lines()) else {
return fail!("cannot do external sort");
};
if !header.is_empty() {
output_writer.write_all(format!("{}\n", header.trim_end()).as_bytes())?;
}
for item in sorted.map(Result::unwrap) {
output_writer.write_all(format!("{item}\n").as_bytes())?;
}
output_writer.flush()?;
Ok(())
}
#[test]
fn test_mem_check() {
let mut sys = System::new_all();
sys.refresh_memory();
let mem10percent = (sys.total_memory() * 1000) / 10; assert!(mem10percent > 0);
}