use anyhow::{anyhow, Context, Result};
use arrow::array::{ArrayRef, Float64Builder, RecordBatch, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use flate2::write::GzEncoder;
use flate2::Compression;
use parquet::arrow::ArrowWriter;
use parquet::basic::ZstdLevel;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
const DEFAULT_PARQUET_OUTPUT_BATCH_SIZE: usize = 10_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OutputFormat {
Tsv,
TsvGz,
Parquet,
}
impl OutputFormat {
pub fn detect(path: Option<&PathBuf>) -> Self {
let Some(p) = path else {
return OutputFormat::Tsv;
};
if p.as_os_str() == "-" {
return OutputFormat::Tsv;
}
let ext = p.extension().and_then(|e| e.to_str());
match ext {
Some("parquet") => OutputFormat::Parquet,
Some("gz") => OutputFormat::TsvGz,
_ => OutputFormat::Tsv,
}
}
pub fn is_stdout(path: Option<&PathBuf>) -> bool {
match path {
None => true,
Some(p) => p.as_os_str() == "-",
}
}
}
#[allow(clippy::large_enum_variant)]
pub enum OutputWriter {
Tsv(BufWriter<Box<dyn Write + Send>>),
TsvGz(Option<BufWriter<GzEncoder<File>>>),
Parquet {
writer: ArrowWriter<File>,
read_ids: Vec<String>,
bucket_names: Vec<String>,
scores: Vec<f64>,
fast_paths: Option<Vec<String>>,
batch_size: usize,
},
ParquetWide {
writer: ArrowWriter<File>,
bucket_col_names: Vec<String>,
read_ids: Vec<String>,
scores: Vec<f64>,
batch_size: usize,
},
}
impl OutputWriter {
pub fn new(
format: OutputFormat,
path: Option<&PathBuf>,
parquet_batch_size: Option<usize>,
) -> Result<Self> {
Self::new_long(format, path, parquet_batch_size, false)
}
pub fn new_long(
format: OutputFormat,
path: Option<&PathBuf>,
parquet_batch_size: Option<usize>,
include_fast_path: bool,
) -> Result<Self> {
let is_stdout = OutputFormat::is_stdout(path);
match format {
OutputFormat::Tsv => {
let output: Box<dyn Write + Send> = if is_stdout {
Box::new(io::stdout())
} else {
let p = path.unwrap(); Box::new(
File::create(p)
.with_context(|| format!("Failed to create output file: {:?}", p))?,
)
};
Ok(OutputWriter::Tsv(BufWriter::new(output)))
}
OutputFormat::TsvGz => {
if is_stdout {
return Err(anyhow!("Gzip output requires a file path, not stdout"));
}
let path = path.unwrap(); let file = File::create(path)
.with_context(|| format!("Failed to create output file: {:?}", path))?;
let encoder = GzEncoder::new(file, Compression::default());
Ok(OutputWriter::TsvGz(Some(BufWriter::new(encoder))))
}
OutputFormat::Parquet => {
if is_stdout {
return Err(anyhow!("Parquet output requires a file path, not stdout"));
}
let path = path.unwrap(); let file = File::create(path)
.with_context(|| format!("Failed to create output file: {:?}", path))?;
let mut fields = vec![
Field::new("read_id", DataType::Utf8, false),
Field::new("bucket_name", DataType::Utf8, false),
Field::new("score", DataType::Float64, false),
];
if include_fast_path {
fields.push(Field::new("fast_path", DataType::Utf8, false));
}
let schema = Arc::new(Schema::new(fields));
let bucket_name_col = ColumnPath::new(vec!["bucket_name".to_string()]);
let mut props_builder = WriterProperties::builder()
.set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
.set_column_dictionary_enabled(bucket_name_col, true);
if include_fast_path {
let fast_path_col = ColumnPath::new(vec!["fast_path".to_string()]);
props_builder =
props_builder.set_column_dictionary_enabled(fast_path_col, true);
}
let props = props_builder.build();
let writer = ArrowWriter::try_new(file, schema, Some(props))
.context("Failed to create Parquet writer")?;
let batch_size = parquet_batch_size.unwrap_or(DEFAULT_PARQUET_OUTPUT_BATCH_SIZE);
Ok(OutputWriter::Parquet {
writer,
read_ids: Vec::with_capacity(batch_size),
bucket_names: Vec::with_capacity(batch_size),
scores: Vec::with_capacity(batch_size),
fast_paths: if include_fast_path {
Some(Vec::with_capacity(batch_size))
} else {
None
},
batch_size,
})
}
}
}
pub fn new_wide(
format: OutputFormat,
path: Option<&PathBuf>,
bucket_names: &HashMap<u32, String>,
parquet_batch_size: Option<usize>,
) -> Result<Self> {
match format {
OutputFormat::Tsv | OutputFormat::TsvGz => {
Self::new(format, path, parquet_batch_size)
}
OutputFormat::Parquet => {
let is_stdout = OutputFormat::is_stdout(path);
if is_stdout {
return Err(anyhow!("Parquet output requires a file path, not stdout"));
}
let path = path.unwrap(); let file = File::create(path)
.with_context(|| format!("Failed to create output file: {:?}", path))?;
let mut bucket_ids: Vec<u32> = bucket_names.keys().copied().collect();
bucket_ids.sort_unstable();
let mut fields = vec![Field::new("read_id", DataType::Utf8, false)];
for bucket_id in &bucket_ids {
let name = bucket_names
.get(bucket_id)
.map(|s| s.as_str())
.unwrap_or("unknown");
fields.push(Field::new(name, DataType::Float64, false));
}
let schema = Arc::new(Schema::new(fields));
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
.build();
let writer = ArrowWriter::try_new(file, schema, Some(props))
.context("Failed to create Parquet writer")?;
let batch_size = parquet_batch_size.unwrap_or(DEFAULT_PARQUET_OUTPUT_BATCH_SIZE);
let num_buckets = bucket_ids.len();
let bucket_col_names: Vec<String> = bucket_ids
.iter()
.map(|id| {
bucket_names
.get(id)
.cloned()
.unwrap_or_else(|| format!("bucket_{}", id))
})
.collect();
Ok(OutputWriter::ParquetWide {
writer,
bucket_col_names,
read_ids: Vec::with_capacity(batch_size),
scores: Vec::with_capacity(batch_size * num_buckets),
batch_size,
})
}
}
}
pub fn write_header(&mut self, header: &[u8]) -> Result<()> {
match self {
OutputWriter::Tsv(w) => {
w.write_all(header)?;
Ok(())
}
OutputWriter::TsvGz(Some(w)) => {
w.write_all(header)?;
Ok(())
}
OutputWriter::TsvGz(None) => Err(anyhow!("Writer already finished")),
OutputWriter::Parquet { .. } | OutputWriter::ParquetWide { .. } => {
Ok(())
}
}
}
#[allow(dead_code)]
pub fn write_record(&mut self, read_id: &str, bucket_name: &str, score: f64) -> Result<()> {
match self {
OutputWriter::Tsv(w) => {
writeln!(w, "{}\t{}\t{:.4}", read_id, bucket_name, score)?;
Ok(())
}
OutputWriter::TsvGz(Some(w)) => {
writeln!(w, "{}\t{}\t{:.4}", read_id, bucket_name, score)?;
Ok(())
}
OutputWriter::TsvGz(None) => Err(anyhow!("Writer already finished")),
OutputWriter::Parquet {
writer,
read_ids,
bucket_names,
scores,
fast_paths,
batch_size,
} => {
read_ids.push(read_id.to_string());
bucket_names.push(bucket_name.to_string());
scores.push(score);
if read_ids.len() >= *batch_size {
Self::flush_parquet_batch(writer, read_ids, bucket_names, scores, fast_paths)?;
}
Ok(())
}
OutputWriter::ParquetWide { .. } => Err(anyhow!(
"write_record() is not supported for wide-format Parquet; use write_chunk()"
)),
}
}
pub fn write_chunk(&mut self, data: Vec<u8>) -> Result<()> {
match self {
OutputWriter::Tsv(w) => {
w.write_all(&data)?;
Ok(())
}
OutputWriter::TsvGz(Some(w)) => {
w.write_all(&data)?;
Ok(())
}
OutputWriter::TsvGz(None) => Err(anyhow!("Writer already finished")),
OutputWriter::Parquet {
writer,
read_ids,
bucket_names,
scores,
fast_paths,
batch_size,
} => {
let expected_cols = if fast_paths.is_some() { 4 } else { 3 };
let text = String::from_utf8_lossy(&data);
for line in text.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('\t').collect();
if parts.len() != expected_cols {
return Err(anyhow!(
"Long format line has {} columns, expected {}. \
Line starts with: '{}'",
parts.len(),
expected_cols,
parts.first().unwrap_or(&"<empty>")
));
}
read_ids.push(parts[0].to_string());
bucket_names.push(parts[1].to_string());
let score: f64 = parts[2].parse().with_context(|| {
format!("Invalid score value '{}' for read '{}'", parts[2], parts[0])
})?;
scores.push(score);
if let Some(ref mut fps) = fast_paths {
fps.push(parts[3].to_string());
}
if read_ids.len() >= *batch_size {
Self::flush_parquet_batch(
writer,
read_ids,
bucket_names,
scores,
fast_paths,
)?;
}
}
Ok(())
}
OutputWriter::ParquetWide {
writer,
bucket_col_names,
read_ids,
scores,
batch_size,
} => {
let text = String::from_utf8_lossy(&data);
let num_buckets = bucket_col_names.len();
let expected_cols = 1 + num_buckets;
for line in text.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('\t').collect();
if parts.len() != expected_cols {
return Err(anyhow!(
"Wide format line has {} columns, expected {} (read_id + {} buckets). \
Line starts with: '{}'",
parts.len(),
expected_cols,
num_buckets,
parts.first().unwrap_or(&"<empty>")
));
}
read_ids.push(parts[0].to_string());
for (i, part) in parts.iter().skip(1).enumerate() {
let score: f64 = part.parse().with_context(|| {
format!(
"Invalid score value '{}' at column {} for read '{}'",
part,
i + 1,
parts[0]
)
})?;
scores.push(score);
}
if read_ids.len() >= *batch_size {
Self::flush_parquet_wide_batch(writer, bucket_col_names, read_ids, scores)?;
}
}
Ok(())
}
}
}
fn flush_parquet_batch(
writer: &mut ArrowWriter<File>,
read_ids: &mut Vec<String>,
bucket_names: &mut Vec<String>,
scores: &mut Vec<f64>,
fast_paths: &mut Option<Vec<String>>,
) -> Result<()> {
if read_ids.is_empty() {
return Ok(());
}
let mut read_id_builder = StringBuilder::with_capacity(read_ids.len(), read_ids.len() * 32);
let mut bucket_name_builder =
StringBuilder::with_capacity(bucket_names.len(), bucket_names.len() * 64);
let mut score_builder = Float64Builder::with_capacity(scores.len());
for read_id in read_ids.iter() {
read_id_builder.append_value(read_id);
}
for bucket_name in bucket_names.iter() {
bucket_name_builder.append_value(bucket_name);
}
for &score in scores.iter() {
score_builder.append_value(score);
}
let mut columns: Vec<(&str, ArrayRef)> = vec![
("read_id", Arc::new(read_id_builder.finish()) as ArrayRef),
(
"bucket_name",
Arc::new(bucket_name_builder.finish()) as ArrayRef,
),
("score", Arc::new(score_builder.finish()) as ArrayRef),
];
if let Some(ref mut fps) = fast_paths {
let mut fp_builder = StringBuilder::with_capacity(fps.len(), fps.len() * 16);
for fp in fps.iter() {
fp_builder.append_value(fp);
}
columns.push(("fast_path", Arc::new(fp_builder.finish()) as ArrayRef));
}
let batch = RecordBatch::try_from_iter(columns).context("Failed to create RecordBatch")?;
writer
.write(&batch)
.context("Failed to write Parquet batch")?;
read_ids.clear();
bucket_names.clear();
scores.clear();
if let Some(ref mut fps) = fast_paths {
fps.clear();
}
Ok(())
}
fn flush_parquet_wide_batch(
writer: &mut ArrowWriter<File>,
bucket_col_names: &[String],
read_ids: &mut Vec<String>,
scores: &mut Vec<f64>,
) -> Result<()> {
if read_ids.is_empty() {
return Ok(());
}
let num_rows = read_ids.len();
let num_buckets = bucket_col_names.len();
let mut read_id_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for read_id in read_ids.iter() {
read_id_builder.append_value(read_id);
}
let mut columns: Vec<(&str, ArrayRef)> =
vec![("read_id", Arc::new(read_id_builder.finish()) as ArrayRef)];
for (bucket_col, col_name) in bucket_col_names.iter().enumerate() {
let mut score_builder = Float64Builder::with_capacity(num_rows);
for row in 0..num_rows {
let idx = row * num_buckets + bucket_col;
score_builder.append_value(scores[idx]);
}
columns.push((
col_name.as_str(),
Arc::new(score_builder.finish()) as ArrayRef,
));
}
let batch =
RecordBatch::try_from_iter(columns).context("Failed to create wide RecordBatch")?;
writer
.write(&batch)
.context("Failed to write Parquet wide batch")?;
read_ids.clear();
scores.clear();
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
match self {
OutputWriter::Tsv(w) => {
w.flush()?;
Ok(())
}
OutputWriter::TsvGz(opt_writer) => {
let writer = opt_writer
.take()
.ok_or_else(|| anyhow!("Writer already finished"))?;
let encoder = writer
.into_inner()
.map_err(|e| anyhow!("Failed to flush buffer: {}", e))?;
encoder.finish().context("Failed to finish gzip stream")?;
Ok(())
}
OutputWriter::Parquet {
writer,
read_ids,
bucket_names,
scores,
fast_paths,
..
} => {
Self::flush_parquet_batch(writer, read_ids, bucket_names, scores, fast_paths)?;
writer.finish().context("Failed to finish Parquet file")?;
Ok(())
}
OutputWriter::ParquetWide {
writer,
bucket_col_names,
read_ids,
scores,
..
} => {
Self::flush_parquet_wide_batch(writer, bucket_col_names, read_ids, scores)?;
writer.finish().context("Failed to finish Parquet file")?;
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{RecordBatchReader as _, StringArray};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::io::Read as _;
use tempfile::NamedTempFile;
#[test]
fn test_output_format_detect_none() {
assert_eq!(OutputFormat::detect(None), OutputFormat::Tsv);
}
#[test]
fn test_output_format_detect_stdout() {
let path = PathBuf::from("-");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::Tsv);
}
#[test]
fn test_output_format_detect_tsv() {
let path = PathBuf::from("output.tsv");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::Tsv);
}
#[test]
fn test_output_format_detect_tsv_gz() {
let path = PathBuf::from("output.tsv.gz");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::TsvGz);
}
#[test]
fn test_output_format_detect_gz() {
let path = PathBuf::from("output.gz");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::TsvGz);
}
#[test]
fn test_output_format_detect_parquet() {
let path = PathBuf::from("output.parquet");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::Parquet);
}
#[test]
fn test_output_format_detect_no_extension() {
let path = PathBuf::from("output");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::Tsv);
}
#[test]
fn test_output_format_detect_unknown_extension() {
let path = PathBuf::from("output.csv");
assert_eq!(OutputFormat::detect(Some(&path)), OutputFormat::Tsv);
}
#[test]
fn test_output_format_is_stdout() {
assert!(OutputFormat::is_stdout(None));
assert!(OutputFormat::is_stdout(Some(&PathBuf::from("-"))));
assert!(!OutputFormat::is_stdout(Some(&PathBuf::from("file.tsv"))));
}
#[test]
fn test_output_writer_tsv() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = OutputWriter::new(OutputFormat::Tsv, Some(&path), None).unwrap();
writer.write_header(b"col1\tcol2\tcol3\n").unwrap();
writer.write_record("read1", "bucket1", 0.95).unwrap();
writer.write_record("read2", "bucket2", 0.85).unwrap();
writer.finish().unwrap();
let mut content = String::new();
std::fs::File::open(&path)
.unwrap()
.read_to_string(&mut content)
.unwrap();
assert!(content.contains("col1\tcol2\tcol3"));
assert!(content.contains("read1\tbucket1\t0.9500"));
assert!(content.contains("read2\tbucket2\t0.8500"));
}
#[test]
fn test_output_writer_gzip() {
let tmp = NamedTempFile::with_suffix(".gz").unwrap();
let path = tmp.path().to_path_buf();
let mut writer = OutputWriter::new(OutputFormat::TsvGz, Some(&path), None).unwrap();
writer.write_header(b"col1\tcol2\tcol3\n").unwrap();
writer.write_record("read1", "bucket1", 0.95).unwrap();
writer.finish().unwrap();
let file = std::fs::File::open(&path).unwrap();
let mut decoder = flate2::read::GzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content).unwrap();
assert!(content.contains("col1\tcol2\tcol3"));
assert!(content.contains("read1\tbucket1\t0.9500"));
}
#[test]
fn test_output_writer_gzip_requires_path() {
let result = OutputWriter::new(OutputFormat::TsvGz, None, None);
assert!(result.is_err());
let err_msg = result.err().unwrap().to_string();
assert!(err_msg.contains("file path"), "Error was: {}", err_msg);
}
#[test]
fn test_output_writer_parquet_requires_path() {
let result = OutputWriter::new(OutputFormat::Parquet, None, None);
assert!(result.is_err());
let err_msg = result.err().unwrap().to_string();
assert!(err_msg.contains("file path"), "Error was: {}", err_msg);
}
#[test]
fn test_output_writer_new_wide_creates_parquet_with_dynamic_schema() {
let tmp = NamedTempFile::with_suffix(".parquet").unwrap();
let path = tmp.path().to_path_buf();
let mut bucket_names = HashMap::new();
bucket_names.insert(1, "Bucket_A".to_string());
bucket_names.insert(2, "Bucket_B".to_string());
bucket_names.insert(3, "Bucket_C".to_string());
let mut writer =
OutputWriter::new_wide(OutputFormat::Parquet, Some(&path), &bucket_names, None)
.unwrap();
let wide_data = b"read_1\t0.8500\t0.7500\t0.6500\nread_2\t0.9000\t0.0000\t0.3000\n";
writer.write_chunk(wide_data.to_vec()).unwrap();
writer.finish().unwrap();
let file = File::open(&path).unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let schema = reader.schema();
assert_eq!(schema.fields().len(), 4); assert_eq!(schema.field(0).name(), "read_id");
assert_eq!(schema.field(1).name(), "Bucket_A");
assert_eq!(schema.field(2).name(), "Bucket_B");
assert_eq!(schema.field(3).name(), "Bucket_C");
}
#[test]
fn test_output_writer_new_wide_parquet_schema_has_correct_types() {
let tmp = NamedTempFile::with_suffix(".parquet").unwrap();
let path = tmp.path().to_path_buf();
let mut bucket_names = HashMap::new();
bucket_names.insert(1, "Score1".to_string());
bucket_names.insert(2, "Score2".to_string());
let mut writer =
OutputWriter::new_wide(OutputFormat::Parquet, Some(&path), &bucket_names, None)
.unwrap();
writer.finish().unwrap();
let file = File::open(&path).unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let schema = reader.schema();
assert_eq!(*schema.field(0).data_type(), DataType::Utf8); assert_eq!(*schema.field(1).data_type(), DataType::Float64); assert_eq!(*schema.field(2).data_type(), DataType::Float64); }
#[test]
fn test_output_writer_new_wide_parquet_columns_ordered_by_bucket_id() {
let tmp = NamedTempFile::with_suffix(".parquet").unwrap();
let path = tmp.path().to_path_buf();
let mut bucket_names = HashMap::new();
bucket_names.insert(10, "Z_last".to_string());
bucket_names.insert(1, "A_first".to_string());
bucket_names.insert(5, "M_middle".to_string());
let mut writer =
OutputWriter::new_wide(OutputFormat::Parquet, Some(&path), &bucket_names, None)
.unwrap();
writer.finish().unwrap();
let file = File::open(&path).unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let schema = reader.schema();
assert_eq!(schema.field(0).name(), "read_id");
assert_eq!(schema.field(1).name(), "A_first"); assert_eq!(schema.field(2).name(), "M_middle"); assert_eq!(schema.field(3).name(), "Z_last"); }
#[test]
fn test_output_writer_new_wide_parquet_write_chunk_and_read_back() {
let tmp = NamedTempFile::with_suffix(".parquet").unwrap();
let path = tmp.path().to_path_buf();
let mut bucket_names = HashMap::new();
bucket_names.insert(1, "Bucket_A".to_string());
bucket_names.insert(2, "Bucket_B".to_string());
let mut writer =
OutputWriter::new_wide(OutputFormat::Parquet, Some(&path), &bucket_names, None)
.unwrap();
let wide_data = b"read_1\t0.8500\t0.7500\nread_2\t0.9100\t0.0000\n";
writer.write_chunk(wide_data.to_vec()).unwrap();
writer.finish().unwrap();
let file = File::open(&path).unwrap();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
let read_ids = batch
.column_by_name("read_id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(read_ids.value(0), "read_1");
assert_eq!(read_ids.value(1), "read_2");
let scores_a = batch
.column_by_name("Bucket_A")
.unwrap()
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.unwrap();
assert!((scores_a.value(0) - 0.85).abs() < 0.0001);
assert!((scores_a.value(1) - 0.91).abs() < 0.0001);
let scores_b = batch
.column_by_name("Bucket_B")
.unwrap()
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.unwrap();
assert!((scores_b.value(0) - 0.75).abs() < 0.0001);
assert!((scores_b.value(1) - 0.0).abs() < 0.0001);
}
#[test]
fn test_output_writer_new_wide_tsv_passthrough() {
let tmp = NamedTempFile::with_suffix(".tsv").unwrap();
let path = tmp.path().to_path_buf();
let mut bucket_names = HashMap::new();
bucket_names.insert(1, "Bucket_A".to_string());
bucket_names.insert(2, "Bucket_B".to_string());
let mut writer =
OutputWriter::new_wide(OutputFormat::Tsv, Some(&path), &bucket_names, None).unwrap();
writer
.write_header(b"read_id\tBucket_A\tBucket_B\n")
.unwrap();
writer
.write_chunk(b"read_1\t0.8500\t0.7500\n".to_vec())
.unwrap();
writer.finish().unwrap();
let mut content = String::new();
std::fs::File::open(&path)
.unwrap()
.read_to_string(&mut content)
.unwrap();
assert!(content.contains("read_id\tBucket_A\tBucket_B"));
assert!(content.contains("read_1\t0.8500\t0.7500"));
}
#[test]
fn test_output_writer_new_wide_parquet_requires_path() {
let bucket_names = HashMap::new();
let result = OutputWriter::new_wide(OutputFormat::Parquet, None, &bucket_names, None);
assert!(result.is_err());
}
}