static USAGE: &str = r#"
Randomly samples CSV data uniformly using memory proportional to the size of
the sample.
When an index is present and a seed is not specified, this command will use
random indexing if the sample size is less than 10% of the total number of records.
This allows for efficient sampling such that the entire CSV file is not parsed.
When sample-size is between 0 and 1 exclusive, it is treated as a percentage
of the CSV to sample (e.g. 0.20 is 20 percent).
This command is intended to provide a means to sample from a CSV data set that
is too big to fit into memory (for example, for use with commands like 'qsv
frequency' or 'qsv stats'). It will however visit every CSV record exactly
once, which is necessary to provide a uniform random sample. If you wish to
limit the number of records visited, use the 'qsv slice' command to pipe into
'qsv sample'.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_sample.rs.
Usage:
qsv sample [options] <sample-size> [<input>]
qsv sample --help
sample options:
--seed <number> Random number generator seed.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will be considered as part of
the population to sample from. (When not set, the
first row is the header row and will always appear
in the output.)
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
"#;
use std::io;
use log::debug;
use rand::{self, rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use serde::Deserialize;
use crate::{
config::{Config, Delimiter},
index::Indexed,
util, CliResult,
};
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
arg_sample_size: f64,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_seed: Option<usize>,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
if args.arg_sample_size.is_sign_negative() {
return fail!("Sample size cannot be negative.");
}
let rconfig = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.checkutf8(false)
.no_headers(args.flag_no_headers);
let mut sample_size = args.arg_sample_size;
let mut wtr = Config::new(&args.flag_output).writer()?;
let sampled = if let Some(mut idx) = rconfig.indexed()? {
#[allow(clippy::cast_precision_loss)]
if sample_size < 1.0 {
sample_size *= idx.count() as f64;
}
if args.flag_seed.is_none() && do_random_access(sample_size as u64, idx.count()) {
rconfig.write_headers(&mut *idx, &mut wtr)?;
sample_random_access(&mut idx, sample_size as u64)?
} else {
let mut rdr = rconfig.reader()?;
rconfig.write_headers(&mut rdr, &mut wtr)?;
sample_reservoir(&mut rdr, sample_size as u64, args.flag_seed)?
}
} else {
debug!("no index");
#[allow(clippy::cast_precision_loss)]
if sample_size < 1.0 {
let Ok(row_count) = util::count_rows(&rconfig) else {
return fail!("Cannot get rowcount. Percentage sampling requires a rowcount.");
};
sample_size *= row_count as f64;
}
let mut rdr = rconfig.reader()?;
rconfig.write_headers(&mut rdr, &mut wtr)?;
sample_reservoir(&mut rdr, sample_size as u64, args.flag_seed)?
};
for row in sampled {
wtr.write_byte_record(&row)?;
}
Ok(wtr.flush()?)
}
fn sample_random_access<R, I>(
idx: &mut Indexed<R, I>,
sample_size: u64,
) -> CliResult<Vec<csv::ByteRecord>>
where
R: io::Read + io::Seek,
I: io::Read + io::Seek,
{
debug!("doing sample_random_access");
let mut all_indices = (0..idx.count()).collect::<Vec<_>>();
let mut rng = ::rand::thread_rng();
SliceRandom::shuffle(&mut *all_indices, &mut rng);
let mut sampled = Vec::with_capacity(sample_size as usize);
for i in all_indices.into_iter().take(sample_size as usize) {
idx.seek(i)?;
sampled.push(idx.byte_records().next().unwrap()?);
}
Ok(sampled)
}
fn sample_reservoir<R: io::Read>(
rdr: &mut csv::Reader<R>,
sample_size: u64,
seed: Option<usize>,
) -> CliResult<Vec<csv::ByteRecord>> {
debug!("doing sample_reservoir");
let mut reservoir = Vec::with_capacity(sample_size as usize);
let mut records = rdr.byte_records().enumerate();
for (_, row) in records.by_ref().take(reservoir.capacity()) {
reservoir.push(row?);
}
let mut rng: StdRng = match seed {
None => StdRng::from_rng(rand::thread_rng()).unwrap(),
Some(seed) => StdRng::seed_from_u64(seed as u64), };
for (i, row) in records {
let random = rng.gen_range(0..=i);
if random < sample_size as usize {
reservoir[random] = row?;
}
}
Ok(reservoir)
}
fn do_random_access(sample_size: u64, total: u64) -> bool {
let raflag = sample_size <= (total / 10);
debug!("sample_size: {sample_size}, total: {total}, raflag: {raflag}");
raflag
}