ext-sort 0.1.5

rust external sort algorithm implementation
Documentation
use std::fs;
use std::io::{self, prelude::*};
use std::path;
use std::process;

use bytesize::ByteSize;
use clap::ArgEnum;
use env_logger;
use log;

use ext_sort::buffer::mem::MemoryLimitedBufferBuilder;
use ext_sort::{ExternalSorter, ExternalSorterBuilder};

fn main() {
    let arg_parser = build_arg_parser();

    let log_level: LogLevel = arg_parser.value_of_t_or_exit("log_level");
    init_logger(log_level);

    let order: Order = arg_parser.value_of_t_or_exit("sort");
    let tmp_dir: Option<&str> = arg_parser.value_of("tmp_dir");
    let chunk_size = arg_parser.value_of("chunk_size").expect("value is required");
    let threads: Option<usize> = arg_parser
        .is_present("threads")
        .then(|| arg_parser.value_of_t_or_exit("threads"));

    let input = arg_parser.value_of("input").expect("value is required");
    let input_stream = match fs::File::open(input) {
        Ok(file) => io::BufReader::new(file),
        Err(err) => {
            log::error!("input file opening error: {}", err);
            process::exit(1);
        }
    };

    let output = arg_parser.value_of("output").expect("value is required");
    let mut output_stream = match fs::File::create(output) {
        Ok(file) => io::BufWriter::new(file),
        Err(err) => {
            log::error!("output file creation error: {}", err);
            process::exit(1);
        }
    };

    let mut sorter_builder = ExternalSorterBuilder::new();
    if let Some(threads) = threads {
        sorter_builder = sorter_builder.with_threads_number(threads);
    }

    if let Some(tmp_dir) = tmp_dir {
        sorter_builder = sorter_builder.with_tmp_dir(path::Path::new(tmp_dir));
    }

    sorter_builder = sorter_builder.with_buffer(MemoryLimitedBufferBuilder::new(
        chunk_size.parse::<ByteSize>().expect("value is pre-validated").as_u64(),
    ));

    let sorter: ExternalSorter<String, io::Error, _> = match sorter_builder.build() {
        Ok(sorter) => sorter,
        Err(err) => {
            log::error!("sorter initialization error: {}", err);
            process::exit(1);
        }
    };

    let compare = |a: &String, b: &String| {
        if order == Order::Asc {
            a.cmp(&b)
        } else {
            a.cmp(&b).reverse()
        }
    };

    let sorted_stream = match sorter.sort_by(input_stream.lines(), compare) {
        Ok(sorted_stream) => sorted_stream,
        Err(err) => {
            log::error!("data sorting error: {}", err);
            process::exit(1);
        }
    };

    for line in sorted_stream {
        let line = match line {
            Ok(line) => line,
            Err(err) => {
                log::error!("sorting stream error: {}", err);
                process::exit(1);
            }
        };
        if let Err(err) = output_stream.write_all(format!("{}\n", line).as_bytes()) {
            log::error!("data saving error: {}", err);
            process::exit(1);
        };
    }

    if let Err(err) = output_stream.flush() {
        log::error!("data flushing error: {}", err);
        process::exit(1);
    }
}

#[derive(Copy, Clone, clap::ArgEnum)]
enum LogLevel {
    Off,
    Error,
    Warn,
    Info,
    Debug,
    Trace,
}

impl LogLevel {
    pub fn possible_values() -> impl Iterator<Item = clap::PossibleValue<'static>> {
        Self::value_variants().iter().filter_map(|v| v.to_possible_value())
    }
}

impl std::str::FromStr for LogLevel {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        <LogLevel as clap::ArgEnum>::from_str(s, false)
    }
}

#[derive(Copy, Clone, PartialEq, clap::ArgEnum)]
enum Order {
    Asc,
    Desc,
}

impl Order {
    pub fn possible_values() -> impl Iterator<Item = clap::PossibleValue<'static>> {
        Order::value_variants().iter().filter_map(|v| v.to_possible_value())
    }
}

impl std::str::FromStr for Order {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        <Order as clap::ArgEnum>::from_str(s, false)
    }
}

fn build_arg_parser() -> clap::ArgMatches {
    clap::App::new("ext-sort")
        .author("Dmitry P. <dapper1291@gmail.com>")
        .about("external sorter")
        .arg(
            clap::Arg::new("input")
                .short('i')
                .long("input")
                .help("file to be sorted")
                .required(true)
                .takes_value(true),
        )
        .arg(
            clap::Arg::new("output")
                .short('o')
                .long("output")
                .help("result file")
                .required(true)
                .takes_value(true),
        )
        .arg(
            clap::Arg::new("sort")
                .short('s')
                .long("sort")
                .help("sorting order")
                .takes_value(true)
                .default_value("asc")
                .possible_values(Order::possible_values()),
        )
        .arg(
            clap::Arg::new("log_level")
                .short('l')
                .long("loglevel")
                .help("logging level")
                .takes_value(true)
                .default_value("info")
                .possible_values(LogLevel::possible_values()),
        )
        .arg(
            clap::Arg::new("threads")
                .short('t')
                .long("threads")
                .help("number of threads to use for parallel sorting")
                .takes_value(true),
        )
        .arg(
            clap::Arg::new("tmp_dir")
                .short('d')
                .long("tmp-dir")
                .help("directory to be used to store temporary data")
                .takes_value(true),
        )
        .arg(
            clap::Arg::new("chunk_size")
                .short('c')
                .long("chunk-size")
                .help("chunk size")
                .required(true)
                .takes_value(true)
                .validator(|v| match v.parse::<ByteSize>() {
                    Ok(_) => Ok(()),
                    Err(err) => Err(format!("Chunk size format incorrect: {}", err)),
                }),
        )
        .get_matches()
}

fn init_logger(log_level: LogLevel) {
    env_logger::Builder::new()
        .filter_level(match log_level {
            LogLevel::Off => log::LevelFilter::Off,
            LogLevel::Error => log::LevelFilter::Error,
            LogLevel::Warn => log::LevelFilter::Warn,
            LogLevel::Info => log::LevelFilter::Info,
            LogLevel::Debug => log::LevelFilter::Debug,
            LogLevel::Trace => log::LevelFilter::Trace,
        })
        .format_timestamp_millis()
        .init();
}