use super::*;
use std::fs::OpenOptions;
use std::io::BufRead;
fn read_data_lines(path: &Path) -> Vec<Vec<u8>> {
let mut reader = utils::open_file(path).unwrap();
let mut lines = Vec::new();
let mut buf = Vec::new();
loop {
buf.clear();
match reader.read_until(b'\n', &mut buf) {
Ok(0) => break,
Ok(_) => {
let content_end = if buf.last() == Some(&b'\n') { buf.len() - 1 } else { buf.len() };
if content_end > 0 && buf[0] != b'@' {
lines.push(buf.clone());
}
}
Err(e) => panic!("read error: {}", e),
}
}
lines
}
fn stable_sort_test_case(num_lines: usize) -> Vec<Vec<u8>> {
let mut result = Vec::new();
for i in 0..num_lines {
let line = format!("{:08}\t100\t0\t100\t+\t>1>2>3\t300\t0\t300\t100\t100\t60\n", i);
result.push(line.into_bytes());
}
result
}
fn read_headers(path: &Path) -> Vec<String> {
let mut reader = utils::open_file(path).unwrap();
formats::read_gaf_header_lines(&mut reader).unwrap()
}
fn run_sort(input: &'static str, params: &SortParameters) -> PathBuf {
let input_path = utils::get_test_data(input);
let output_path = serialize::temp_file_name("gaf-sort-test");
let result = sort_gaf(&input_path, &output_path, params);
assert!(result.is_ok(), "sort_gaf failed: {}", result.err().unwrap());
let result = result.unwrap();
assert_eq!(result, RECORD_COUNT, "sort_gaf sorted {} records, expected {}", result, RECORD_COUNT);
output_path
}
fn key_of(line: &[u8], key_type: KeyType) -> u64 {
GAFRecord::new(line.to_vec(), key_type).key
}
fn assert_sorted(lines: &[Vec<u8>], key_type: KeyType) {
let mut prev = 0u64;
for (i, line) in lines.iter().enumerate() {
let k = key_of(line, key_type);
assert!(
k >= prev,
"record {} is out of order: prev_key={:#x}, key={:#x}",
i, prev, k
);
prev = k;
}
}
fn assert_lines_equal(lines1: &[Vec<u8>], lines2: &[Vec<u8>], first: &str, second: &str) {
assert_eq!(lines1.len(), lines2.len(), "line count differs between {} and {}", first, second);
for (i, (line1, line2)) in lines1.iter().zip(lines2.iter()).enumerate() {
assert_eq!(
line1, line2,
"line {} differs between {} and {}: {:?} != {:?}",
i, first, second, String::from_utf8_lossy(line1), String::from_utf8_lossy(line2)
);
}
}
const RECORD_COUNT: usize = 12439;
#[test]
fn test_node_interval_key() {
let line = b"query\t100\t0\t100\t+\t>1>2>3\t300\t0\t300\t100\t100\t60".to_vec();
let record = GAFRecord::new(line, KeyType::NodeInterval);
let min_handle = support::encode_node(1, Orientation::Forward) as u64;
let max_handle = support::encode_node(3, Orientation::Forward) as u64;
assert_eq!(record.key, (min_handle << 32) | max_handle);
let line = b"query\t100\t0\t100\t+\t>5<10>15\t300\t0\t300\t100\t100\t60".to_vec();
let record = GAFRecord::new(line, KeyType::NodeInterval);
let min_handle = support::encode_node(5, Orientation::Forward) as u64;
let max_handle = support::encode_node(15, Orientation::Forward) as u64;
assert_eq!(record.key, (min_handle << 32) | max_handle);
}
#[test]
fn test_hash_key() {
let line = b"query\t100\t0\t100\t+\t>1>2>3\t300\t0\t300\t100\t100\t60".to_vec();
let record = GAFRecord::new(line, KeyType::Hash);
assert_ne!(record.key, GAFRecord::MISSING_KEY);
}
#[test]
fn test_serialization() {
let line = b"query\t100\t0\t100\t+\t>1>2>3\t300\t0\t300\t100\t100\t60".to_vec();
let record = GAFRecord::new(line.clone(), KeyType::NodeInterval);
let mut buffer = Vec::new();
record.serialize(&mut buffer).unwrap();
let mut cursor = std::io::Cursor::new(buffer);
let deserialized = GAFRecord::deserialize(&mut cursor).unwrap();
assert_eq!(record.key, deserialized.key);
assert_eq!(record.value, deserialized.value);
}
#[test]
fn sort_single_batch() {
let params = SortParameters {
records_per_file: RECORD_COUNT + 1,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
assert_eq!(lines.len(), RECORD_COUNT);
assert_sorted(&lines, params.key_type);
}
#[test]
fn sort_multi_batch_single_merge() {
let params = SortParameters {
records_per_file: 5000,
files_per_merge: 32,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
assert_eq!(lines.len(), RECORD_COUNT);
assert_sorted(&lines, params.key_type);
}
#[test]
fn sort_multi_batch_multi_round() {
let params = SortParameters {
records_per_file: 1000,
files_per_merge: 2,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
assert_eq!(lines.len(), RECORD_COUNT);
assert_sorted(&lines, params.key_type);
}
#[test]
fn sort_multithreaded() {
let params = SortParameters {
records_per_file: 1000,
files_per_merge: 2,
threads: 2,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
assert_eq!(lines.len(), RECORD_COUNT);
assert_sorted(&lines, params.key_type);
}
#[test]
fn sort_gzipped_input() {
let params = SortParameters {
records_per_file: 5000,
..SortParameters::default()
};
let out_plain = run_sort("shuffled.gaf", ¶ms);
let out_gz = run_sort("shuffled.gaf.gz", ¶ms);
let mut lines_plain = read_data_lines(&out_plain);
let mut lines_gz = read_data_lines(&out_gz);
let _ = fs::remove_file(&out_plain);
let _ = fs::remove_file(&out_gz);
lines_plain.sort_unstable();
lines_gz.sort_unstable();
assert_lines_equal(&lines_plain, &lines_gz, "plain", "gzipped");
}
#[test]
fn sort_hash_key() {
let params = SortParameters {
key_type: KeyType::Hash,
records_per_file: 5000,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
assert_eq!(lines.len(), RECORD_COUNT);
assert_sorted(&lines, KeyType::Hash);
}
#[test]
fn sort_preserves_headers() {
let input_path = utils::get_test_data("shuffled.gaf");
let input_headers = read_headers(&input_path);
let params = SortParameters {
records_per_file: 5000,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let output_headers = read_headers(&output);
let _ = fs::remove_file(&output);
assert_eq!(input_headers, output_headers);
}
#[test]
fn sort_preserves_all_records() {
let input_path = utils::get_test_data("shuffled.gaf");
let mut input_lines = read_data_lines(&input_path);
let params = SortParameters {
records_per_file: 5000,
..SortParameters::default()
};
let output = run_sort("shuffled.gaf", ¶ms);
let mut output_lines = read_data_lines(&output);
let _ = fs::remove_file(&output);
input_lines.sort_unstable();
output_lines.sort_unstable();
assert_lines_equal(&input_lines, &output_lines, "input", "output");
}
#[test]
fn sort_consistent_across_configs() {
let params = [
SortParameters { records_per_file: RECORD_COUNT + 1, ..SortParameters::default() },
SortParameters { records_per_file: 5000, files_per_merge: 32, ..SortParameters::default() },
SortParameters { records_per_file: 1000, files_per_merge: 2, ..SortParameters::default() },
];
let outputs: Vec<PathBuf> = params.iter().map(|p| run_sort("shuffled.gaf", p)).collect();
let sorted_lines: Vec<Vec<Vec<u8>>> = outputs.iter().map(|path| {
let mut lines = read_data_lines(path);
lines.sort_unstable();
lines
}).collect();
for path in &outputs {
let _ = fs::remove_file(path);
}
assert_lines_equal(&sorted_lines[0], &sorted_lines[1], "config 0", "config 1");
assert_lines_equal(&sorted_lines[0], &sorted_lines[2], "config 0", "config 2");
}
#[test]
fn sort_stable() {
let params = SortParameters {
records_per_file: 100,
files_per_merge: 2,
stable: true,
..SortParameters::default()
};
let test_case = stable_sort_test_case(800);
let input = serialize::temp_file_name("gaf-sort-stable");
{
let mut options = OpenOptions::new();
let mut file = options.write(true).create_new(true).open(&input).unwrap();
for line in &test_case {
file.write_all(line).unwrap();
}
}
let output = serialize::temp_file_name("gaf-sort-stable");
let result = sort_gaf(&input, &output, ¶ms);
assert!(result.is_ok(), "sort_gaf failed: {}", result.err().unwrap());
let result = result.unwrap();
assert_eq!(result, test_case.len(), "sort_gaf sorted {} records, expected {}", result, test_case.len());
let lines = read_data_lines(&output);
let _ = fs::remove_file(&input);
let _ = fs::remove_file(&output);
assert_sorted(&lines, KeyType::NodeInterval);
for (i, line) in lines.iter().enumerate() {
assert_eq!(
line, &test_case[i],
"line {} differs between input and output: {:?} != {:?}",
i, String::from_utf8_lossy(line), String::from_utf8_lossy(&test_case[i])
);
}
}