static USAGE: &str = r#"
Generate JSON Schema from CSV data.
This command generates reference JSON Schema (Draft 7) from CSV data,
including validation rules based on data type and input data domain/range.
Running `validate` command on original input CSV with generated schema
should not flag any invalid records.
Generated schema file has `.schema.json` postfix appended. For example,
for input `mydata.csv`, schema file would be `mydata.csv.schema.json`.
If piped from stdin, then schema file would be `stdin.csv.schema.json` and
a `stdin.csv` file will created with stdin's contents as well.
Note that `stdin.csv` will be overwritten if it already exists.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_schema.rs.
Usage:
qsv schema [options] [<input>]
qsv schema --help
Schema options:
--enum-threshold NUM Cardinality threshold for adding enum constraints.
Enum constraints are compiled for String & Integer types.
[default: 50]
--strict-dates Enforce Internet Datetime format (RFC-3339) for
detected date/datetime columns. Otherwise, even if
columns are inferred as date/datetime, they are set
to type "string" in the schema instead of
"date" or "date-time".
--pattern-columns <args> Select columns to add pattern constraints
--dates-whitelist <list> The case-insensitive patterns to look for when
shortlisting fields for date inference.
i.e. if the field's name has any of these patterns,
it is shortlisted for date inferencing.
Set to "all" to inspect ALL fields for
date/datetime types.
[default: date,time,due,open,close,created]
--prefer-dmy Prefer to parse dates in dmy format.
Otherwise, use mdy format.
--stdout Send generated JSON schema file to stdout instead.
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
Common options:
-h, --help Display this message
-n, --no-headers When set, the first row will not be interpreted
as headers. Namely, it will be sorted with the rest
of the rows. Otherwise, the first row will always
appear as the header row in the output.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. [default: ,]
"#;
use std::{collections::HashSet, fs::File, io::Write, path::Path};
use ahash::AHashMap;
use csv::ByteRecord;
use grex::RegExpBuilder;
use itertools::Itertools;
use log::{debug, error, info, warn};
use serde::Deserialize;
use serde_json::{json, value::Number, Map, Value};
use stats::Frequencies;
use crate::{
cmd::stats::Stats,
config::{Config, Delimiter},
select::SelectColumns,
util, CliResult,
};
#[derive(Deserialize, Clone)]
pub struct Args {
pub flag_enum_threshold: usize,
pub flag_strict_dates: bool,
pub flag_pattern_columns: SelectColumns,
pub flag_dates_whitelist: String,
pub flag_prefer_dmy: bool,
pub flag_stdout: bool,
pub flag_jobs: Option<usize>,
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
pub arg_input: Option<String>,
}
const STDIN_CSV: &str = "stdin.csv";
pub fn run(argv: &[&str]) -> CliResult<()> {
let preargs: Args = util::get_args(USAGE, argv)?;
let mut args = preargs.clone();
let (input_path, input_filename) = if preargs.arg_input.is_none() {
let mut stdin_file = File::create(STDIN_CSV)?;
let stdin = std::io::stdin();
let mut stdin_handle = stdin.lock();
std::io::copy(&mut stdin_handle, &mut stdin_file)?;
args.arg_input = Some(STDIN_CSV.to_string());
(STDIN_CSV.to_string(), STDIN_CSV.to_string())
} else {
let filename = Path::new(args.arg_input.as_ref().unwrap())
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
(args.arg_input.clone().unwrap(), filename)
};
args.flag_prefer_dmy = args.flag_prefer_dmy || std::env::var("QSV_PREFER_DMY").is_ok();
let mut properties_map: Map<String, Value> =
match infer_schema_from_stats(&args, &input_filename) {
Ok(map) => map,
Err(e) => {
return fail_clierror!("Failed to infer schema via stats and frequency: {e}");
}
};
let pattern_map = generate_string_patterns(&args, &properties_map)?;
for (field_name, field_def) in properties_map.iter_mut() {
if pattern_map.contains_key(field_name) && should_emit_pattern_constraint(field_def) {
let field_def_map = field_def.as_object_mut().unwrap();
let pattern = Value::String(pattern_map[field_name].clone());
field_def_map.insert("pattern".to_string(), pattern);
}
}
let required_fields = get_required_fields(&properties_map);
let schema = json!({
"$schema": "https://json-schema.org/draft-07/schema",
"title": format!("JSON Schema for {input_filename}"),
"description": "Inferred JSON Schema from QSV schema command",
"type": "object",
"properties": Value::Object(properties_map),
"required": Value::Array(required_fields)
});
let schema_pretty = match serde_json::to_string_pretty(&schema) {
Ok(s) => s,
Err(e) => return fail_clierror!("Cannot prettify schema json: {e}"),
};
if args.flag_stdout {
let stdout = std::io::stdout();
let mut handle = stdout.lock();
handle.write_all(schema_pretty.as_bytes())?;
handle.flush()?;
info!("Schema written to stdout");
} else {
let schema_output_filename = input_path + ".schema.json";
let mut schema_output_file = File::create(&schema_output_filename)?;
schema_output_file.write_all(schema_pretty.as_bytes())?;
schema_output_file.flush()?;
woutinfo!("Schema written to {schema_output_filename}");
}
Ok(())
}
pub fn infer_schema_from_stats(args: &Args, input_filename: &str) -> CliResult<Map<String, Value>> {
let (csv_fields, csv_stats, stats_col_index_map) = get_stats_records(args)?;
let mut low_cardinality_column_indices: Vec<usize> =
Vec::with_capacity(args.flag_enum_threshold);
let column_select_arg: String = build_low_cardinality_column_selector_arg(
&mut low_cardinality_column_indices,
args.flag_enum_threshold,
&csv_fields,
&csv_stats,
&stats_col_index_map,
);
let unique_values_map = get_unique_values(args, &column_select_arg)?;
let mut properties_map: Map<String, Value> = Map::with_capacity(csv_fields.len());
let mut field_map: Map<String, Value> = Map::with_capacity(10);
let mut type_list: Vec<Value> = Vec::with_capacity(4);
let mut enum_list: Vec<Value> = Vec::with_capacity(args.flag_enum_threshold);
for i in 0..csv_fields.len() {
let header_byte_slice = csv_fields.get(i).unwrap();
let header_string = convert_to_string(header_byte_slice)?;
let stats_record = csv_stats.get(i).unwrap().clone().to_record(4);
if log::log_enabled!(log::Level::Debug) {
debug!("stats[{header_string}]: {stats_record:?}");
}
let col_type = stats_record.get(stats_col_index_map["type"]).unwrap();
let col_null_count = if let Some(s) = stats_record.get(stats_col_index_map["nullcount"]) {
s.parse::<usize>().unwrap_or(0_usize)
} else {
0_usize
};
field_map.clear();
let desc = format!("{header_string} column from {input_filename}");
field_map.insert("description".to_string(), Value::String(desc));
type_list.clear();
enum_list.clear();
match col_type {
"String" => {
type_list.push(Value::String("string".to_string()));
if let Some(min_length_str) = stats_record.get(stats_col_index_map["min_length"]) {
let min_length = min_length_str.parse::<u32>().unwrap();
field_map.insert(
"minLength".to_string(),
Value::Number(Number::from(min_length)),
);
};
if let Some(max_length_str) = stats_record.get(stats_col_index_map["max_length"]) {
let max_length = max_length_str.parse::<u32>().unwrap();
field_map.insert(
"maxLength".to_string(),
Value::Number(Number::from(max_length)),
);
};
if let Some(values) = unique_values_map.get(&header_string) {
for value in values {
enum_list.push(Value::String(value.to_string()));
}
}
}
"Integer" => {
type_list.push(Value::String("integer".to_string()));
if let Some(min_str) = stats_record.get(stats_col_index_map["min"]) {
let min = min_str.parse::<i64>().unwrap();
field_map.insert("minimum".to_string(), Value::Number(Number::from(min)));
};
if let Some(max_str) = stats_record.get(stats_col_index_map["max"]) {
let max = max_str.parse::<i64>().unwrap();
field_map.insert("maximum".to_string(), Value::Number(Number::from(max)));
};
if let Some(values) = unique_values_map.get(&header_string) {
for value in values {
let int_value = value.parse::<i64>().unwrap();
enum_list.push(Value::Number(Number::from(int_value)));
}
}
}
"Float" => {
type_list.push(Value::String("number".to_string()));
if let Some(min_str) = stats_record.get(stats_col_index_map["min"]) {
let min = min_str.parse::<f64>().unwrap();
field_map.insert(
"minimum".to_string(),
Value::Number(Number::from_f64(min).unwrap()),
);
};
if let Some(max_str) = stats_record.get(stats_col_index_map["max"]) {
let max = max_str.parse::<f64>().unwrap();
field_map.insert(
"maximum".to_string(),
Value::Number(Number::from_f64(max).unwrap()),
);
};
}
"NULL" => {
type_list.push(Value::String("null".to_string()));
}
"Date" => {
type_list.push(Value::String("string".to_string()));
if args.flag_strict_dates {
field_map.insert("format".to_string(), Value::String("date".to_string()));
}
}
"DateTime" => {
type_list.push(Value::String("string".to_string()));
if args.flag_strict_dates {
field_map.insert("format".to_string(), Value::String("date-time".to_string()));
}
}
_ => {
warn!("Stats gave unexpected field type '{col_type}', default to JSON String.");
type_list.push(Value::String("string".to_string()));
}
}
if col_null_count > 0 && !type_list.contains(&Value::String("null".to_string())) {
type_list.push(Value::String("null".to_string()));
}
if col_null_count > 0 && !enum_list.is_empty() {
enum_list.push(Value::Null);
}
if !type_list.is_empty() {
field_map.insert("type".to_string(), Value::Array(type_list.clone()));
}
if !enum_list.is_empty() {
field_map.insert("enum".to_string(), Value::Array(enum_list.clone()));
}
properties_map.insert(header_string, Value::Object(field_map.clone()));
}
Ok(properties_map)
}
fn get_stats_records(args: &Args) -> CliResult<(ByteRecord, Vec<Stats>, AHashMap<String, usize>)> {
let stats_args = crate::cmd::stats::Args {
arg_input: args.arg_input.clone(),
flag_select: crate::select::SelectColumns::parse("").unwrap(),
flag_everything: false,
flag_typesonly: false,
flag_mode: false,
flag_cardinality: true,
flag_median: false,
flag_quartiles: false,
flag_mad: false,
flag_nulls: false,
flag_round: 4,
flag_infer_dates: true,
flag_dates_whitelist: args.flag_dates_whitelist.to_string(),
flag_prefer_dmy: args.flag_prefer_dmy,
flag_jobs: Some(util::njobs(args.flag_jobs)),
flag_output: None,
flag_no_headers: args.flag_no_headers,
flag_delimiter: args.flag_delimiter,
};
let (csv_fields, csv_stats) = match stats_args.rconfig().indexed() {
Ok(o) => {
if let Some(idx) = o {
info!("has index, triggering parallel stats");
stats_args.parallel_stats(&stats_args.flag_dates_whitelist, &idx)
} else {
info!("no index, triggering sequential stats");
stats_args.sequential_stats(&stats_args.flag_dates_whitelist)
}
}
Err(e) => {
warn!("error determining if indexed, triggering sequential stats: {e}");
stats_args.sequential_stats(&stats_args.flag_dates_whitelist)
}
}?;
let stats_columns = stats_args.stat_headers();
debug!("stats columns: {stats_columns:?}");
let mut stats_col_index_map = AHashMap::new();
for (i, col) in stats_columns.iter().enumerate() {
if col != "field" {
stats_col_index_map.insert(col.to_owned(), i - 1);
}
}
Ok((csv_fields, csv_stats, stats_col_index_map))
}
fn build_low_cardinality_column_selector_arg(
low_cardinality_column_indices: &mut Vec<usize>,
enum_cardinality_threshold: usize,
csv_fields: &ByteRecord,
csv_stats: &[Stats],
stats_col_index_map: &AHashMap<String, usize>,
) -> String {
low_cardinality_column_indices.clear();
for i in 0..csv_fields.len() {
let stats_record = csv_stats.get(i).unwrap().clone().to_record(4);
let col_cardinality = match stats_record.get(stats_col_index_map["cardinality"]) {
Some(s) => s.parse::<usize>().unwrap_or(0_usize),
None => 0_usize,
};
if col_cardinality <= enum_cardinality_threshold {
low_cardinality_column_indices.push(i + 1);
};
}
let column_select_arg: String = low_cardinality_column_indices
.iter()
.map(ToString::to_string)
.join(",");
column_select_arg
}
fn get_unique_values(
args: &Args,
column_select_arg: &str,
) -> CliResult<AHashMap<String, Vec<String>>> {
let freq_args = crate::cmd::frequency::Args {
arg_input: args.arg_input.clone(),
flag_select: crate::select::SelectColumns::parse(column_select_arg).unwrap(),
flag_limit: args.flag_enum_threshold,
flag_asc: false,
flag_no_nulls: true,
flag_jobs: Some(util::njobs(args.flag_jobs)),
flag_output: None,
flag_no_headers: args.flag_no_headers,
flag_delimiter: args.flag_delimiter,
};
let (headers, ftables) = match freq_args.rconfig().indexed()? {
Some(ref mut idx) => freq_args.parallel_ftables(idx),
_ => freq_args.sequential_ftables(),
}?;
let unique_values_map = construct_map_of_unique_values(&headers, &ftables)?;
Ok(unique_values_map)
}
fn construct_map_of_unique_values(
freq_csv_fields: &ByteRecord,
frequency_tables: &[Frequencies<Vec<u8>>],
) -> CliResult<AHashMap<String, Vec<String>>> {
let mut unique_values_map: AHashMap<String, Vec<String>> = AHashMap::new();
for (i, header_byte_slice) in freq_csv_fields.iter().enumerate() {
let mut unique_values = Vec::new();
for (val_byte_vec, _count) in frequency_tables[i].most_frequent() {
let val_string = convert_to_string(val_byte_vec.as_slice())?;
unique_values.push(val_string);
}
let header_string = convert_to_string(header_byte_slice)?;
unique_values.sort_unstable();
if log::log_enabled!(log::Level::Debug) {
debug!(
"enum[{header_string}]: len={}, val={:?}",
unique_values.len(),
unique_values
);
}
unique_values_map.insert(header_string, unique_values);
}
Ok(unique_values_map)
}
#[inline]
fn convert_to_string(byte_slice: &[u8]) -> CliResult<String> {
match std::str::from_utf8(byte_slice) {
Ok(s) => Ok(s.to_string()),
Err(e) => {
let lossy_string = String::from_utf8_lossy(byte_slice);
fail_clierror!(
"Can't convert byte slice to utf8 string. slice={byte_slice:?}, error={e}: \
{lossy_string}"
)
}
}
}
fn get_required_fields(properties_map: &Map<String, Value>) -> Vec<Value> {
let mut fields: Vec<Value> = Vec::with_capacity(properties_map.len());
for key in properties_map.keys() {
fields.push(Value::String(key.clone()));
}
fields
}
fn generate_string_patterns(
args: &Args,
properties_map: &Map<String, Value>,
) -> CliResult<AHashMap<String, String>> {
let rconfig = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers)
.select(args.flag_pattern_columns.clone());
let mut rdr = rconfig.reader()?;
let headers = rdr.byte_headers()?.clone();
let sel = rconfig.selection(&headers)?;
let mut pattern_map: AHashMap<String, String> = AHashMap::new();
if sel.len() == 0 || sel.len() == headers.len() {
debug!("no pattern columns selected");
return Ok(pattern_map);
}
let mut unique_values_map: AHashMap<String, HashSet<String>> = AHashMap::new();
#[allow(unused_assignments)]
let mut record = csv::ByteRecord::new();
while rdr.read_byte_record(&mut record)? {
for (i, value_byte_slice) in sel.select(&record).enumerate() {
let header_byte_slice: &[u8] = headers.get(sel[i]).unwrap();
let header_string: String = convert_to_string(header_byte_slice)?;
if !should_emit_pattern_constraint(&properties_map[&header_string]) {
continue;
}
let value_string: String = convert_to_string(value_byte_slice)?;
let set = unique_values_map
.entry(header_string)
.or_insert_with(HashSet::<String>::new);
set.insert(value_string);
}
}
debug!("unique values for eligible pattern columns: {unique_values_map:?}");
pattern_map.reserve(unique_values_map.len());
for (header, value_set) in unique_values_map.iter() {
let values: Vec<&String> = Vec::from_iter(value_set);
let regexp: String = RegExpBuilder::from(&values)
.with_conversion_of_digits()
.with_conversion_of_words()
.with_conversion_of_repetitions()
.with_minimum_repetitions(2)
.build();
pattern_map.insert(header.clone(), regexp);
}
debug!("pattern map: {pattern_map:?}");
Ok(pattern_map)
}
fn should_emit_pattern_constraint(field_def: &Value) -> bool {
let type_list = field_def[&"type"].as_array().unwrap();
let has_enum = field_def.get("enum").is_some();
type_list.contains(&Value::String("string".to_string())) && !has_enum
}