use std::io::Write;
use std::path::PathBuf;
use anyhow::{
Context,
Result,
};
use itertools::Itertools;
use polars::prelude::*;
use crate::data_structs::batch::{
create_caregorical_dtype,
BsxBatch,
BsxColumns,
};
pub struct BsxFileWriter<W>
where
W: Write, {
writer: polars::io::ipc::BatchedWriter<W>,
schema: Schema,
}
impl<W> BsxFileWriter<W>
where
W: Write,
{
pub fn try_new<R: AsRef<str>>(
sink: W,
chr_names: &[R],
compression: Option<IpcCompression>,
) -> Result<BsxFileWriter<W>> {
let opts = IpcWriterOptions {
compression,
maintain_order: true,
};
let chr_dtype =
create_caregorical_dtype(chr_names.iter().map(Some).collect_vec());
let mut schema = BsxColumns::schema();
schema.set_dtype(BsxColumns::Chr.as_str(), chr_dtype);
let writer = opts.to_writer(sink);
let batched_writer = writer
.batched(&schema)
.with_context(|| "Failed to create batched writer")?;
Ok(Self {
writer: batched_writer,
schema,
})
}
pub fn try_from_sink_and_fai(
sink: W,
fai_path: PathBuf,
compression: Option<IpcCompression>,
) -> Result<Self> {
let index = bio::io::fasta::Index::from_file(&fai_path).with_context(|| {
format!("Failed to read FASTA index from {:?}", fai_path)
})?;
let chr_names = index
.sequences()
.into_iter()
.map(|seq| seq.name)
.collect_vec();
Self::try_new(sink, &chr_names, compression).with_context(|| {
format!("Failed to create writer from FASTA index at {:?}", fai_path)
})
}
pub fn try_from_sink_and_fasta(
sink: W,
fasta_path: PathBuf,
compression: Option<IpcCompression>,
) -> Result<Self> {
noodles_fasta::fs::index(fasta_path.clone())
.with_context(|| format!("Failed to index FASTA file {:?}", fasta_path))?;
let index_path = format!("{}.fai", fasta_path.to_str().unwrap());
Self::try_from_sink_and_fai(sink, index_path.into(), compression).with_context(
|| format!("Failed to create writer from FASTA file {:?}", fasta_path),
)
}
pub fn write_batch(
&mut self,
batch: BsxBatch,
) -> PolarsResult<()> {
self.writer.write_batch(batch.data())
}
pub fn close(&mut self) -> PolarsResult<()> {
self.writer.finish()
}
pub fn get_chr_dtype(&self) -> &DataType {
self.schema.get("chr").unwrap()
}
}
impl<W: Write> Drop for BsxFileWriter<W> {
fn drop(&mut self) {
let _ = self.close();
}
}