#![allow(clippy::unwrap_used, clippy::expect_used)]
use assert_cmd::Command;
use predicates::prelude::*;
fn shuflr() -> Command {
Command::cargo_bin("shuflr").expect("binary must build")
}
#[test]
fn bare_invocation_prints_help_and_exits_nonzero() {
shuflr()
.assert()
.failure()
.stderr(predicate::str::contains("Stream large JSONL"));
}
#[test]
fn help_flag_lists_all_subcommands() {
let assert = shuflr().arg("--help").assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
for sub in ["stream", "analyze", "index", "verify", "completions"] {
assert!(
out.contains(sub),
"--help output missing subcommand '{sub}':\n{out}"
);
}
}
#[test]
fn version_flag_emits_a_version() {
shuflr()
.arg("--version")
.assert()
.success()
.stdout(predicate::str::contains("shuflr"));
}
#[test]
fn implicit_stream_dispatch_on_bare_path() {
shuflr()
.arg("nonexistent-file.jsonl")
.assert()
.code(66) .stderr(predicate::str::contains("no such file"));
}
#[test]
fn explicit_stream_with_stdin_rejects_chunk_shuffled() {
shuflr()
.args(["stream", "-"])
.write_stdin("one\ntwo\n")
.assert()
.code(64) .stderr(predicate::str::contains("--shuffle=buffer"));
}
#[test]
fn completions_subcommand_emits_a_script() {
let assert = shuflr().args(["completions", "bash"]).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("_shuflr"),
"bash completion missing function marker:\n{out}"
);
assert!(
out.contains("stream"),
"bash completion missing subcommand:\n{out}"
);
}
#[test]
fn man_emits_roff_for_top_level() {
let assert = shuflr().args(["man"]).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains(".TH"),
"missing roff .TH header:\n{}",
&out[..200.min(out.len())]
);
assert!(out.contains("shuflr"), "missing command name");
assert!(
out.contains(".SH NAME") || out.contains(r"\fBNAME\fR"),
"missing NAME section"
);
}
#[test]
fn man_per_subcommand() {
let assert = shuflr().args(["man", "convert"]).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains(".TH"), "missing roff header");
assert!(
out.contains("zstd-seekable") || out.contains("frame"),
"convert man page doesn't mention its core concepts:\n{out}"
);
}
#[test]
fn man_unknown_subcommand_errors_cleanly() {
shuflr()
.args(["man", "nonexistent"])
.assert()
.code(64) .stderr(predicate::str::contains("no subcommand named"));
}
#[test]
fn completions_supports_zsh_and_fish() {
for shell in ["zsh", "fish"] {
shuflr()
.args(["completions", shell])
.assert()
.success()
.stdout(predicate::str::is_empty().not());
}
}
#[test]
fn unknown_subcommand_fails_cleanly() {
shuflr()
.arg("--not-a-real-flag")
.assert()
.failure()
.stderr(predicate::str::contains("error:"));
}
#[test]
fn byte_suffix_parsing_in_convert_help() {
let assert = shuflr().args(["convert", "--help"]).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("frame-size"),
"convert --help missing --frame-size:\n{out}"
);
assert!(
out.contains("2MiB"),
"convert --help missing default 2MiB:\n{out}"
);
}
#[test]
fn seed_env_var_is_documented() {
let assert = shuflr().args(["stream", "--help"]).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("SHUFLR_SEED"),
"stream --help missing SHUFLR_SEED env doc:\n{out}"
);
}
#[test]
fn rank_requires_world_size() {
shuflr()
.args(["stream", "--rank", "0", "data.jsonl"])
.assert()
.failure()
.stderr(predicate::str::contains("world-size"));
}
#[test]
fn rank_world_size_produces_disjoint_partitions() {
use std::collections::HashSet;
let input: String = (0..400).map(|i| format!("rec_{i:03}\n")).collect();
let w = 4u32;
let mut union: HashSet<String> = HashSet::new();
let mut total_out = 0usize;
for rank in 0..w {
let assert = shuflr()
.args([
"stream",
"--shuffle",
"none",
"--rank",
&rank.to_string(),
"--world-size",
&w.to_string(),
"--log-level",
"warn",
])
.write_stdin(input.clone())
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
for line in out.lines() {
assert!(
union.insert(line.to_string()),
"record {line} appeared in two ranks"
);
total_out += 1;
}
}
assert_eq!(total_out, 400, "all records accounted for across ranks");
let in_set: HashSet<&str> = input.lines().collect();
let out_set: HashSet<&str> = union.iter().map(|s| s.as_str()).collect();
assert_eq!(in_set, out_set);
}
fn tiny_corpus() -> std::path::PathBuf {
let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
let workspace = manifest.ancestors().nth(2).unwrap();
workspace.join("tests/corpora/tiny.jsonl")
}
#[test]
fn stream_none_roundtrips_tiny_fixture() {
let path = tiny_corpus();
let assert = shuflr()
.args(["stream", "--shuffle", "none"])
.arg(&path)
.assert()
.success();
let expected = std::fs::read(&path).unwrap();
assert.stdout(expected);
}
#[test]
fn implicit_stream_on_plain_file_with_default_mode_points_user_to_convert() {
shuflr()
.arg(tiny_corpus())
.assert()
.code(64)
.stderr(predicate::str::contains("shuflr convert"));
}
#[test]
fn chunk_shuffled_on_seekable_zstd_works_end_to_end() {
use std::collections::BTreeSet;
let tmp = tempfile::tempdir().unwrap();
let in_path = tmp.path().join("in.jsonl");
let seekable_path = tmp.path().join("in.jsonl.zst");
let records: Vec<String> = (0..500).map(|i| format!("{{\"i\":{i}}}\n")).collect();
std::fs::write(&in_path, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable_path)
.arg(&in_path)
.assert()
.success();
let assert_a = shuflr()
.args([
"stream",
"--shuffle",
"chunk-shuffled",
"--seed",
"7",
"--log-level",
"warn",
])
.arg(&seekable_path)
.assert()
.success();
let out_a = String::from_utf8(assert_a.get_output().stdout.clone()).unwrap();
let assert_b = shuflr()
.args([
"stream",
"--shuffle",
"chunk-shuffled",
"--seed",
"7",
"--log-level",
"warn",
])
.arg(&seekable_path)
.assert()
.success();
let out_b = String::from_utf8(assert_b.get_output().stdout.clone()).unwrap();
assert_eq!(out_a, out_b, "same seed must give byte-identical output");
let assert_c = shuflr()
.args([
"stream",
"--shuffle",
"chunk-shuffled",
"--seed",
"8",
"--log-level",
"warn",
])
.arg(&seekable_path)
.assert()
.success();
let out_c = String::from_utf8(assert_c.get_output().stdout.clone()).unwrap();
assert_ne!(out_a, out_c);
let in_set: BTreeSet<&str> = records.iter().map(|s| s.trim_end()).collect();
let out_set: BTreeSet<&str> = out_a.lines().collect();
assert_eq!(in_set, out_set, "multiset preserved under shuffle");
assert_ne!(out_a, records.concat());
}
#[test]
fn index_perm_on_seekable_zstd_builds_sidecar_and_is_deterministic() {
use std::collections::BTreeSet;
let tmp = tempfile::tempdir().unwrap();
let in_path = tmp.path().join("in.jsonl");
let seekable_path = tmp.path().join("in.jsonl.zst");
let sidecar_path = tmp.path().join("in.jsonl.zst.shuflr-idx-zst");
let records: Vec<String> = (0..400).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&in_path, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable_path)
.arg(&in_path)
.assert()
.success();
assert!(
!sidecar_path.exists(),
"sidecar must not exist before first index-perm run"
);
let run = |seed: &str| {
let out = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
seed,
"--log-level",
"warn",
])
.arg(&seekable_path)
.assert()
.success();
String::from_utf8(out.get_output().stdout.clone()).unwrap()
};
let out_a = run("7");
assert!(
sidecar_path.exists(),
"sidecar must be created on first run: {}",
sidecar_path.display()
);
let sidecar_mtime_a = std::fs::metadata(&sidecar_path)
.unwrap()
.modified()
.unwrap();
let out_b = run("7");
assert_eq!(
out_a, out_b,
"same seed must give byte-identical output across runs"
);
let sidecar_mtime_b = std::fs::metadata(&sidecar_path)
.unwrap()
.modified()
.unwrap();
assert_eq!(
sidecar_mtime_a, sidecar_mtime_b,
"fresh sidecar must not be overwritten on a cache-hit run"
);
let out_c = run("8");
assert_ne!(out_a, out_c);
let in_set: BTreeSet<&str> = records.iter().map(|s| s.trim_end()).collect();
let out_set: BTreeSet<&str> = out_a.lines().collect();
assert_eq!(in_set, out_set, "multiset preserved under index-perm");
assert_ne!(out_a, records.concat(), "index-perm must actually reorder");
}
#[test]
fn stream_none_honors_sample() {
let path = tiny_corpus();
let assert = shuflr()
.args(["stream", "--shuffle", "none", "--sample", "2"])
.arg(&path)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert_eq!(out.lines().count(), 2, "expected exactly 2 records:\n{out}");
}
#[test]
fn stream_none_decodes_gzip_transparently() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write as _;
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("data.jsonl.gz");
let mut enc = GzEncoder::new(
std::fs::File::create(&path).unwrap(),
Compression::default(),
);
enc.write_all(b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n").unwrap();
enc.finish().unwrap();
shuflr()
.args(["stream", "--shuffle", "none", "--log-level", "warn"])
.arg(&path)
.assert()
.success()
.stdout("{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n");
}
#[test]
fn stream_none_decodes_zstd_transparently() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("data.jsonl.zst");
let bytes = b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n";
let compressed = zstd::stream::encode_all(&bytes[..], 3).unwrap();
std::fs::write(&path, compressed).unwrap();
shuflr()
.args(["stream", "--shuffle", "none", "--log-level", "warn"])
.arg(&path)
.assert()
.success()
.stdout("{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n");
}
#[test]
fn stream_none_decodes_gzip_via_stdin() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write as _;
let mut enc = GzEncoder::new(Vec::new(), Compression::default());
enc.write_all(b"one\ntwo\n").unwrap();
let compressed = enc.finish().unwrap();
shuflr()
.args(["stream", "--shuffle", "none", "--log-level", "warn"])
.write_stdin(compressed)
.assert()
.success()
.stdout("one\ntwo\n");
}
#[test]
fn stream_none_stdin_works() {
shuflr()
.args(["stream", "--shuffle", "none"])
.write_stdin("one\ntwo\nthree\n")
.assert()
.success()
.stdout("one\ntwo\nthree\n");
}
#[test]
fn stream_none_patches_missing_trailing_newline() {
shuflr()
.args(["stream", "--shuffle", "none"])
.write_stdin("a\nb")
.assert()
.success()
.stdout("a\nb\n");
}
#[test]
fn stream_none_exit_65_on_fail_policy() {
shuflr()
.args([
"stream",
"--shuffle",
"none",
"--max-line",
"5",
"--on-error",
"fail",
])
.write_stdin("ok\nWAY_TOO_LONG\n")
.assert()
.code(65) .stderr(predicate::str::contains("oversized"));
}
#[test]
fn convert_plain_jsonl_roundtrips_via_zstdcat() {
let tmp = tempfile::tempdir().unwrap();
let input_path = tmp.path().join("in.jsonl");
let output_path = tmp.path().join("out.jsonl.zst");
let content = (0..500)
.map(|i| format!("{{\"i\":{i},\"t\":\"record number {i}\"}}\n"))
.collect::<String>();
std::fs::write(&input_path, &content).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output_path)
.arg(&input_path)
.assert()
.success();
let compressed = std::fs::read(&output_path).unwrap();
let decoded = zstd::stream::decode_all(&compressed[..]).unwrap();
assert_eq!(decoded, content.as_bytes());
}
#[test]
fn info_reports_seekable_table_of_converted_file() {
let tmp = tempfile::tempdir().unwrap();
let input_path = tmp.path().join("in.jsonl");
let output_path = tmp.path().join("out.jsonl.zst");
let mut content = String::new();
for i in 0..100_000 {
content.push_str(&format!(
"{{\"i\":{i},\"pad\":\"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"}}\n"
));
}
std::fs::write(&input_path, &content).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output_path)
.arg(&input_path)
.assert()
.success();
let assert = shuflr().args(["info"]).arg(&output_path).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("format:"), "info missing 'format:':\n{out}");
assert!(
out.contains("zstd-seekable"),
"info missing codec name:\n{out}"
);
assert!(out.contains("frames:"), "info missing frames count:\n{out}");
assert!(
out.contains("XXH64"),
"checksums should be on by default:\n{out}"
);
}
#[test]
fn info_json_mode_parses_cleanly() {
let tmp = tempfile::tempdir().unwrap();
let input_path = tmp.path().join("in.jsonl");
let output_path = tmp.path().join("out.jsonl.zst");
std::fs::write(&input_path, "a\nb\nc\n").unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output_path)
.arg(&input_path)
.assert()
.success();
let assert = shuflr()
.args(["info", "--json"])
.arg(&output_path)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.trim().starts_with('{') && out.trim().ends_with('}'));
assert!(out.contains("\"format\":\"zstd-seekable\""));
assert!(out.contains("\"frames\":"));
}
#[test]
fn chunk_shuffled_parallel_emit_matches_sequential() {
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("in.jsonl");
let seekable = tmp.path().join("in.jsonl.zst");
let records: Vec<String> = (0..800).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&plain, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&plain)
.assert()
.success();
let run = |threads: &str, prefetch: &str| {
let out = shuflr()
.args([
"stream",
"--shuffle",
"chunk-shuffled",
"--seed",
"19",
"--emit-threads",
threads,
"--emit-prefetch",
prefetch,
"--log-level",
"warn",
])
.arg(&seekable)
.assert()
.success();
String::from_utf8(out.get_output().stdout.clone()).unwrap()
};
let seq = run("1", "8");
let par2 = run("2", "4");
let par4 = run("4", "16");
assert_eq!(seq, par2);
assert_eq!(seq, par4);
}
#[test]
fn index_perm_zstd_parallel_emit_matches_sequential() {
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("in.jsonl");
let seekable = tmp.path().join("in.jsonl.zst");
let records: Vec<String> = (0..800).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&plain, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&plain)
.assert()
.success();
let run = |emit_threads: &str, prefetch: &str| {
let out = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
"42",
"--emit-threads",
emit_threads,
"--emit-prefetch",
prefetch,
"--log-level",
"warn",
])
.arg(&seekable)
.assert()
.success();
String::from_utf8(out.get_output().stdout.clone()).unwrap()
};
let seq = run("1", "32");
let par2 = run("2", "4");
let par8 = run("4", "64");
assert_eq!(seq, par2, "2-thread emit must match sequential");
assert_eq!(
seq, par8,
"4-thread + larger prefetch must match sequential"
);
}
#[test]
fn index_perm_zstd_parallel_build_matches_sequential() {
use std::collections::BTreeSet;
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("in.jsonl");
let seekable = tmp.path().join("in.jsonl.zst");
let records: Vec<String> = (0..600).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&plain, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&plain)
.assert()
.success();
let run = |threads: &str| {
std::fs::remove_file(tmp.path().join("in.jsonl.zst.shuflr-idx-zst")).ok();
let out = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
"42",
"--build-threads",
threads,
"--log-level",
"warn",
])
.arg(&seekable)
.assert()
.success();
String::from_utf8(out.get_output().stdout.clone()).unwrap()
};
let seq = run("1");
let par = run("4");
assert_eq!(
seq, par,
"parallel build must produce byte-identical output to sequential"
);
let expected: BTreeSet<&str> = records.iter().map(|s| s.trim_end()).collect();
let got: BTreeSet<&str> = par.lines().collect();
assert_eq!(got, expected);
}
#[test]
fn info_json_escapes_quotes_and_backslashes_in_path() {
#[cfg(unix)]
{
let tmp = tempfile::tempdir().unwrap();
let bad_name = r#"weird" \name.jsonl"#;
let input = tmp.path().join(bad_name);
let seekable = tmp.path().join(format!("{bad_name}.zst"));
std::fs::write(&input, b"a\nb\n").unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&input)
.assert()
.success();
let assert = shuflr()
.args(["info", "--json"])
.arg(&seekable)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
let trimmed = out.trim();
assert!(
trimmed.contains(r#"\""#),
"expected escaped quote in output:\n{out}"
);
assert!(
trimmed.contains(r"\\"),
"expected escaped backslash in output:\n{out}"
);
}
}
#[test]
fn analyze_strict_does_not_change_report() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("a.jsonl");
let seekable = tmp.path().join("a.jsonl.zst");
let body: String = (0..400).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&input, body).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&input)
.assert()
.success();
let run = |strict: bool| {
let mut cmd = shuflr();
cmd.args(["analyze", "--json", "--sample-chunks", "8"]);
if strict {
cmd.arg("--strict");
}
let assert = cmd.arg(&seekable).assert();
String::from_utf8(assert.get_output().stdout.clone()).unwrap()
};
assert_eq!(
run(false),
run(true),
"--strict must not change the JSON report"
);
}
#[test]
fn analyze_json_mode_emits_parseable_report() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let seekable = tmp.path().join("in.jsonl.zst");
let body: String = (0..200).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&input, body).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&input)
.assert()
.success();
let assert = shuflr()
.args(["analyze", "--json"])
.arg(&seekable)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
let trimmed = out.trim();
assert!(
trimmed.starts_with('{') && trimmed.ends_with('}'),
"not a JSON object:\n{out}"
);
for key in [
"\"file\":",
"\"total_frames\":",
"\"sampled_frames\":",
"\"total_records_sampled\":",
"\"mean_record_len_bytes\":",
"\"byte_kl_max\":",
"\"byte_kl_mean\":",
"\"byte_js_max\":",
"\"byte_js_mean\":",
"\"frame_entropy_mean\":",
"\"reclen_cv\":",
"\"thresholds\":{",
"\"byte_kl_unsafe\":",
"\"byte_js_unsafe\":",
"\"reclen_cv_unsafe\":",
"\"verdict\":",
] {
assert!(out.contains(key), "missing JSON key {key}:\n{out}");
}
assert!(
out.contains("\"verdict\":\"safe\"") || out.contains("\"verdict\":\"unsafe\""),
"verdict not a documented value:\n{out}"
);
assert!(
!out.contains("recommendation:"),
"human text leaked into JSON output:\n{out}"
);
}
#[test]
fn convert_respects_input_format_override() {
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("pretend.jsonl");
std::fs::write(&plain, b"{\"a\":1}\n{\"a\":2}\n").unwrap();
let out = tmp.path().join("out.jsonl.zst");
shuflr()
.args([
"convert",
"--log-level",
"warn",
"--input-format",
"zstd",
"-o",
])
.arg(&out)
.arg(&plain)
.assert()
.failure();
let out2 = tmp.path().join("out-auto.jsonl.zst");
shuflr()
.args([
"convert",
"--log-level",
"warn",
"--input-format",
"auto",
"-o",
])
.arg(&out2)
.arg(&plain)
.assert()
.success();
}
#[test]
fn convert_rejects_multi_input_in_pr4() {
let tmp = tempfile::tempdir().unwrap();
let a = tmp.path().join("a.jsonl");
let b = tmp.path().join("b.jsonl");
let o = tmp.path().join("o.zst");
std::fs::write(&a, "x\n").unwrap();
std::fs::write(&b, "y\n").unwrap();
shuflr()
.args(["convert", "-o"])
.arg(&o)
.arg(&a)
.arg(&b)
.assert()
.failure()
.stderr(predicate::str::contains("PR-4"));
}
#[test]
fn stream_buffer_preserves_record_multiset_and_is_deterministic() {
let input_content: String = (0..200).map(|i| format!("record_{i:03}\n")).collect();
let run_once = |seed: &str| {
let assert = shuflr()
.args([
"stream",
"--shuffle",
"buffer",
"--buffer-size",
"32",
"--seed",
seed,
"--log-level",
"warn",
])
.write_stdin(input_content.clone())
.assert()
.success();
String::from_utf8(assert.get_output().stdout.clone()).unwrap()
};
let out_a = run_once("42");
let out_b = run_once("42");
assert_eq!(out_a, out_b, "same seed must produce byte-identical output");
let out_c = run_once("43");
assert_ne!(
out_a, out_c,
"different seeds must produce different orderings"
);
let mut input_lines: Vec<&str> = input_content.lines().collect();
let mut out_lines: Vec<&str> = out_a.lines().collect();
input_lines.sort_unstable();
out_lines.sort_unstable();
assert_eq!(input_lines, out_lines);
assert_ne!(out_a, input_content);
}
#[test]
fn stream_buffer_on_gzip_input_works() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write as _;
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("data.jsonl.gz");
let records: String = (0..50).map(|i| format!("rec_{i:02}\n")).collect();
let mut enc = GzEncoder::new(
std::fs::File::create(&path).unwrap(),
Compression::default(),
);
enc.write_all(records.as_bytes()).unwrap();
enc.finish().unwrap();
let assert = shuflr()
.args([
"stream",
"--shuffle",
"buffer",
"--buffer-size",
"16",
"--seed",
"7",
"--log-level",
"warn",
])
.arg(&path)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
let mut in_sorted: Vec<&str> = records.lines().collect();
let mut out_sorted: Vec<&str> = out.lines().collect();
in_sorted.sort_unstable();
out_sorted.sort_unstable();
assert_eq!(in_sorted, out_sorted);
}
#[test]
fn convert_with_verify_passes_on_clean_output() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let output = tmp.path().join("out.jsonl.zst");
let content: String = (0..300)
.map(|i| format!("{{\"i\":{i},\"pad\":\"xxxxxxxxxxxx\"}}\n"))
.collect();
std::fs::write(&input, &content).unwrap();
shuflr()
.args(["convert", "--verify", "--log-level", "warn", "-o"])
.arg(&output)
.arg(&input)
.assert()
.success();
}
#[test]
fn convert_with_verify_fails_when_output_is_corrupted() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let output = tmp.path().join("out.jsonl.zst");
std::fs::write(&input, "a\nb\nc\nd\ne\n").unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output)
.arg(&input)
.assert()
.success();
let mut bytes = std::fs::read(&output).unwrap();
let len = bytes.len();
bytes[len - 3] ^= 0xff;
std::fs::write(&output, &bytes).unwrap();
shuflr()
.args(["info"])
.arg(&output)
.assert()
.failure()
.stderr(predicate::str::contains("not a zstd-seekable"));
}
#[test]
fn index_subcommand_builds_sidecar() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("data.jsonl");
std::fs::write(&input, "a\nb\nc\nd\n").unwrap();
shuflr().args(["index"]).arg(&input).assert().success();
let sidecar = tmp.path().join("data.jsonl.shuflr-idx");
assert!(sidecar.exists(), "sidecar must be written");
let bytes = std::fs::read(&sidecar).unwrap();
assert_eq!(
bytes.len(),
96,
"unexpected sidecar layout: {} bytes",
bytes.len()
);
assert_eq!(&bytes[..8], b"SHUFLIDX");
}
#[test]
fn index_subcommand_builds_seekable_zstd_sidecar() {
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("data.jsonl");
let seekable = tmp.path().join("data.jsonl.zst");
let records: Vec<String> = (0..50).map(|i| format!("{{\"i\":{i:03}}}\n")).collect();
std::fs::write(&plain, records.concat()).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&plain)
.assert()
.success();
shuflr().args(["index"]).arg(&seekable).assert().success();
let sidecar = tmp.path().join("data.jsonl.zst.shuflr-idx-zst");
assert!(
sidecar.exists(),
"seekable-zstd sidecar must exist: {}",
sidecar.display()
);
let bytes = std::fs::read(&sidecar).unwrap();
assert_eq!(
bytes.len(),
656,
"unexpected seekable-zstd sidecar size: {} bytes (expected 656)",
bytes.len(),
);
assert_eq!(&bytes[..8], b"SHUFLRZI");
}
#[test]
fn stream_index_perm_builds_index_on_demand() {
use std::collections::BTreeSet;
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("c.jsonl");
let records: Vec<String> = (0..200).map(|i| format!("{{\"i\":{i}}}\n")).collect();
std::fs::write(&input, records.concat()).unwrap();
let assert_a = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
"17",
"--log-level",
"warn",
])
.arg(&input)
.assert()
.success();
let out_a = String::from_utf8(assert_a.get_output().stdout.clone()).unwrap();
let sidecar = tmp.path().join("c.jsonl.shuflr-idx");
assert!(sidecar.exists());
let assert_b = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
"17",
"--log-level",
"warn",
])
.arg(&input)
.assert()
.success();
let out_b = String::from_utf8(assert_b.get_output().stdout.clone()).unwrap();
assert_eq!(out_a, out_b, "same seed must give byte-identical output");
let assert_c = shuflr()
.args([
"stream",
"--shuffle",
"index-perm",
"--seed",
"18",
"--log-level",
"warn",
])
.arg(&input)
.assert()
.success();
let out_c = String::from_utf8(assert_c.get_output().stdout.clone()).unwrap();
assert_ne!(out_a, out_c);
let in_set: BTreeSet<&str> = records.iter().map(|s| s.trim_end()).collect();
let out_set: BTreeSet<&str> = out_a.lines().collect();
assert_eq!(in_set, out_set, "multiset preserved");
let original: String = records.concat();
assert_ne!(out_a, original);
}
#[test]
fn stream_index_perm_rejects_compressed_input_with_hint() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write as _;
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("data.jsonl.gz");
let mut enc = GzEncoder::new(
std::fs::File::create(&path).unwrap(),
Compression::default(),
);
enc.write_all(b"a\nb\nc\n").unwrap();
enc.finish().unwrap();
shuflr()
.args(["stream", "--shuffle", "index-perm"])
.arg(&path)
.assert()
.code(64) .stderr(predicate::str::contains("shuflr convert"));
}
#[test]
fn stream_index_perm_rebuilds_when_fingerprint_mismatches() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("c.jsonl");
std::fs::write(&input, "one\ntwo\nthree\n").unwrap();
shuflr().args(["index"]).arg(&input).assert().success();
let sidecar = tmp.path().join("c.jsonl.shuflr-idx");
assert!(sidecar.exists());
std::thread::sleep(std::time::Duration::from_millis(1100)); std::fs::write(&input, "one\ntwo\nthree\nFOUR\nFIVE\n").unwrap();
let assert = shuflr()
.args(["stream", "--shuffle", "index-perm", "--seed", "1"])
.arg(&input)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert_eq!(
out.lines().count(),
5,
"should now see all 5 records:\n{out}"
);
}
#[test]
fn verify_plain_jsonl_reports_ok() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("clean.jsonl");
std::fs::write(&input, "a\nb\nc\nd\n").unwrap();
let assert = shuflr().args(["verify"]).arg(&input).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("verdict: OK"),
"missing OK verdict:\n{out}"
);
assert!(
out.contains("records: 4"),
"missing record count:\n{out}"
);
}
#[test]
fn verify_deep_plain_catches_invalid_json() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("bad.jsonl");
std::fs::write(
&input,
r#"{"ok":1}
not json here
{"ok":2}
"#,
)
.unwrap();
shuflr().args(["verify"]).arg(&input).assert().success();
let assert = shuflr()
.args(["verify", "--deep"])
.arg(&input)
.assert()
.code(65); let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("deep-json:"), "missing deep-json line:\n{out}");
assert!(
out.contains("verdict: ISSUES"),
"expected ISSUES verdict:\n{out}"
);
}
#[test]
fn verify_deep_plain_passes_on_valid_jsonl() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("good.jsonl");
std::fs::write(
&input,
r#"{"i":0}
{"nested":{"a":[1,2,3],"b":null}}
42
true
"yep"
"#,
)
.unwrap();
let assert = shuflr()
.args(["verify", "--deep"])
.arg(&input)
.assert()
.success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("deep-json: 0 invalid"), "{out}");
assert!(out.contains("verdict: OK"), "{out}");
}
#[test]
fn verify_deep_seekable_catches_invalid_json() {
let tmp = tempfile::tempdir().unwrap();
let plain = tmp.path().join("mix.jsonl");
let seekable = tmp.path().join("mix.jsonl.zst");
std::fs::write(
&plain,
r#"{"ok":1}
<<<not json>>>
{"ok":2}
"#,
)
.unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&seekable)
.arg(&plain)
.assert()
.success();
let assert = shuflr().args(["verify"]).arg(&seekable).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("verdict: OK"), "{out}");
let assert = shuflr()
.args(["verify", "--deep"])
.arg(&seekable)
.assert()
.code(65);
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("deep-json:"), "{out}");
assert!(
out.contains("verdict: FAILED"),
"expected FAILED verdict:\n{out}"
);
}
#[test]
fn verify_seekable_ok_after_convert() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let output = tmp.path().join("out.jsonl.zst");
let body: String = (0..300).map(|i| format!("{{\"i\":{i}}}\n")).collect();
std::fs::write(&input, &body).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output)
.arg(&input)
.assert()
.success();
let assert = shuflr().args(["verify"]).arg(&output).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("format: zstd-seekable"), "\n{out}");
assert!(out.contains("verdict: OK"), "\n{out}");
}
#[test]
fn verify_seekable_falls_back_to_streaming_when_trailer_broken() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let output = tmp.path().join("out.jsonl.zst");
std::fs::write(&input, "a\nb\nc\nd\ne\n").unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output)
.arg(&input)
.assert()
.success();
let mut bytes = std::fs::read(&output).unwrap();
let n = bytes.len();
bytes[n - 3] ^= 0xff;
std::fs::write(&output, &bytes).unwrap();
let assert = shuflr().args(["verify"]).arg(&output).assert().success();
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("zstd (streaming)"),
"expected streaming fallback when trailer is broken:\n{out}"
);
}
#[test]
fn verify_seekable_fails_when_frame_body_corrupted() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("in.jsonl");
let output = tmp.path().join("out.jsonl.zst");
let body: String = (0..3000)
.map(|i| format!("padding_padding_padding_padding_padding_{i:05}\n"))
.collect();
std::fs::write(&input, &body).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output)
.arg(&input)
.assert()
.success();
let mut bytes = std::fs::read(&output).unwrap();
for b in bytes.iter_mut().take(80).skip(16) {
*b = 0x55;
}
std::fs::write(&output, &bytes).unwrap();
let assert = shuflr().args(["verify"]).arg(&output).assert().code(65); let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(
out.contains("verdict: FAILED"),
"expected FAILED verdict:\n{out}"
);
}
#[test]
fn verify_flags_trailing_partial_record() {
let tmp = tempfile::tempdir().unwrap();
let input = tmp.path().join("partial.jsonl");
std::fs::write(&input, "one\ntwo\nthree").unwrap();
let assert = shuflr()
.args(["verify"])
.arg(&input)
.assert()
.code(65) ;
let out = String::from_utf8(assert.get_output().stdout.clone()).unwrap();
assert!(out.contains("trailing-partial: true"), "\n{out}");
assert!(out.contains("verdict: ISSUES"), "\n{out}");
}
#[test]
fn convert_preserves_crlf_and_nul() {
let tmp = tempfile::tempdir().unwrap();
let input_path = tmp.path().join("in.jsonl");
let output_path = tmp.path().join("out.jsonl.zst");
let original: &[u8] = b"one\r\nt\0wo\n\xe2\x98\x83snowman\n";
std::fs::write(&input_path, original).unwrap();
shuflr()
.args(["convert", "--log-level", "warn", "-o"])
.arg(&output_path)
.arg(&input_path)
.assert()
.success();
let compressed = std::fs::read(&output_path).unwrap();
let decoded = zstd::stream::decode_all(&compressed[..]).unwrap();
assert_eq!(decoded, original);
}
#[test]
fn stream_none_exit_66_on_missing_input() {
shuflr()
.args(["stream", "--shuffle", "none", "/does/not/exist.jsonl"])
.assert()
.code(66) .stderr(predicate::str::contains("no such file"));
}