use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufWriter, Seek, Write};
use std::path::Path;
use std::vec;
use futures::sink::SinkExt;
use byteorder::{NativeEndian, WriteBytesExt};
use tokio::runtime::{Handle, Runtime};
use crate::bbiwrite::process_internal::BBIDataProcessorCreate;
use crate::utils::tell::Tell;
use crate::{
write_info, BBIDataProcessor, BBIDataProcessoredData, BBIDataProcessoringInputSectionChannel,
BBIDataSource, InternalProcessData, InternalTempZoomInfo, NoZoomsInternalProcessData,
NoZoomsInternalProcessedData, ProcessDataError, ZoomsInternalProcessData,
ZoomsInternalProcessedData,
};
use crate::bbi::{Summary, Value, ZoomRecord, BIGWIG_MAGIC};
use crate::bbiwrite::{
self, encode_zoom_section, write_blank_headers, write_zooms, BBIProcessError, BBIWriteOptions,
SectionData,
};
struct ZoomItem {
size: u32,
live_info: Option<ZoomRecord>,
records: Vec<ZoomRecord>,
channel: BBIDataProcessoringInputSectionChannel,
}
pub struct BigWigWrite<W: Write + Seek + Send + 'static> {
out: W,
chrom_sizes: HashMap<String, u32>,
pub options: BBIWriteOptions,
}
impl BigWigWrite<File> {
pub fn create_file(
path: impl AsRef<Path>,
chrom_sizes: HashMap<String, u32>,
) -> io::Result<Self> {
let out = File::create(path)?;
Ok(BigWigWrite::new(out, chrom_sizes))
}
}
impl<W: Write + Seek + Send + 'static> BigWigWrite<W> {
pub fn new(out: W, chrom_sizes: HashMap<String, u32>) -> Self {
BigWigWrite {
out,
chrom_sizes,
options: BBIWriteOptions::default(),
}
}
fn write_pre(file: &mut BufWriter<W>) -> Result<(u64, u64, u64), ProcessDataError> {
write_blank_headers(file)?;
let total_summary_offset = file.tell()?;
file.write_all(&[0; 40])?;
let full_data_offset = file.tell()?;
file.write_u64::<NativeEndian>(0)?;
let pre_data = file.tell()?;
Ok((total_summary_offset, full_data_offset, pre_data))
}
pub fn write<V: BBIDataSource<Value = Value>>(
self,
vals: V,
runtime: Runtime,
) -> Result<(), BBIProcessError<V::Error>> {
let mut file = BufWriter::new(self.out);
let (total_summary_offset, full_data_offset, pre_data) = BigWigWrite::write_pre(&mut file)?;
let output = bbiwrite::write_vals::<_, _, BigWigFullProcess>(
vals,
file,
&self.options,
runtime,
&self.chrom_sizes,
)?;
let (
chrom_ids,
summary,
mut file,
raw_sections_iter,
zoom_infos,
max_uncompressed_buf_size,
) = output;
let chrom_ids = chrom_ids.get_map();
let (data_size, chrom_index_start, index_start, total_sections) = bbiwrite::write_mid(
&mut file,
pre_data,
raw_sections_iter,
self.chrom_sizes,
&chrom_ids,
&self.options,
)?;
let zoom_entries = write_zooms(&mut file, zoom_infos, data_size, &self.options)?;
let num_zooms = zoom_entries.len() as u16;
write_info(
&mut file,
BIGWIG_MAGIC,
num_zooms,
chrom_index_start,
full_data_offset,
index_start,
0,
0,
0,
total_summary_offset,
max_uncompressed_buf_size,
zoom_entries,
summary,
total_sections,
)?;
Ok(())
}
pub fn write_multipass<V: BBIDataSource<Value = Value>>(
self,
make_vals: impl Fn() -> Result<V, BBIProcessError<V::Error>>,
runtime: Runtime,
) -> Result<(), BBIProcessError<V::Error>> {
let mut file = BufWriter::new(self.out);
let (total_summary_offset, full_data_offset, pre_data) = BigWigWrite::write_pre(&mut file)?;
let vals = make_vals()?;
let output = bbiwrite::write_vals_no_zoom::<_, _, BigWigNoZoomsProcess>(
vals,
file,
&self.options,
&runtime,
&self.chrom_sizes,
);
let (chrom_ids, summary, zoom_counts, mut file, raw_sections_iter, mut uncompress_buf_size) =
output?;
let chrom_ids = chrom_ids.get_map();
let (data_size, chrom_index_start, index_start, total_sections) = bbiwrite::write_mid(
&mut file,
pre_data,
raw_sections_iter,
self.chrom_sizes,
&chrom_ids,
&self.options,
)?;
let vals = make_vals()?;
let output = bbiwrite::write_zoom_vals::<_, _, BigWigZoomsProcess<W>>(
vals,
self.options,
&runtime,
&chrom_ids,
(summary.bases_covered as f64 / summary.total_items as f64) as u32,
zoom_counts,
file,
data_size,
);
let (mut file, zoom_entries, zoom_uncompress_buf_size) = output?;
uncompress_buf_size = uncompress_buf_size.max(zoom_uncompress_buf_size);
let num_zooms = zoom_entries.len() as u16;
write_info(
&mut file,
BIGWIG_MAGIC,
num_zooms,
chrom_index_start,
full_data_offset,
index_start,
0,
0,
0,
total_summary_offset,
uncompress_buf_size,
zoom_entries,
summary,
total_sections,
)?;
Ok(())
}
}
async fn process_val(
current_val: Value,
next_val: Option<&Value>,
chrom_length: u32,
chrom: &String,
summary: &mut Summary,
items: &mut Vec<Value>,
options: &BBIWriteOptions,
runtime: &Handle,
ftx: &mut BBIDataProcessoringInputSectionChannel,
chrom_id: u32,
) -> Result<(), BigWigInvalidInput> {
if current_val.start > current_val.end {
return Err(BigWigInvalidInput(format!(
"Invalid bed graph: {} > {}",
current_val.start, current_val.end
)));
}
if current_val.end > chrom_length {
return Err(BigWigInvalidInput(format!(
"Invalid bed graph: `{}` is greater than the chromosome ({}) length ({})",
current_val.end, chrom, chrom_length
)));
}
match next_val {
None => {}
Some(next_val) => {
if current_val.end > next_val.start {
return Err(BigWigInvalidInput(format!(
"Invalid bed graph: overlapping values on chromosome {} at {}-{} and {}-{}",
chrom, current_val.start, current_val.end, next_val.start, next_val.end,
)));
}
}
}
let len = current_val.end - current_val.start;
let val = f64::from(current_val.value);
summary.total_items += 1;
summary.bases_covered += u64::from(len);
summary.min_val = summary.min_val.min(val);
summary.max_val = summary.max_val.max(val);
summary.sum += f64::from(len) * val;
summary.sum_squares += f64::from(len) * val * val;
items.push(current_val);
if next_val.is_none() || items.len() >= options.items_per_slot as usize {
let items = std::mem::replace(items, Vec::with_capacity(options.items_per_slot as usize));
let handle: tokio::task::JoinHandle<io::Result<(SectionData, usize)>> =
runtime.spawn(encode_section(options.compress, items, chrom_id));
ftx.send(handle).await.expect("Couldn't send");
}
Ok(())
}
async fn process_val_zoom(
zoom_items: &mut Vec<ZoomItem>,
options: &BBIWriteOptions,
current_val: Value,
next_val: Option<&Value>,
runtime: &Handle,
chrom_id: u32,
) {
for zoom_item in zoom_items.iter_mut() {
debug_assert_ne!(zoom_item.records.len(), options.items_per_slot as usize);
let mut add_start = current_val.start;
loop {
if (add_start >= current_val.end
&& zoom_item.live_info.is_none()
&& next_val.is_none()
&& !zoom_item.records.is_empty())
|| zoom_item.records.len() == options.items_per_slot as usize
{
let items = std::mem::take(&mut zoom_item.records);
let handle = runtime.spawn(encode_zoom_section(options.compress, items));
zoom_item.channel.send(handle).await.expect("Couln't send");
}
if add_start >= current_val.end {
if next_val.is_none() {
if let Some(zoom2) = zoom_item.live_info.take() {
zoom_item.records.push(zoom2);
continue;
}
}
break;
}
let val = f64::from(current_val.value);
let zoom2 = zoom_item.live_info.get_or_insert(ZoomRecord {
chrom: chrom_id,
start: add_start,
end: add_start,
summary: Summary {
total_items: 0,
bases_covered: 0,
min_val: val,
max_val: val,
sum: 0.0,
sum_squares: 0.0,
},
});
let next_end = zoom2.start + zoom_item.size;
let add_end = std::cmp::min(next_end, current_val.end);
if add_end >= add_start {
let added_bases = add_end - add_start;
zoom2.end = add_end;
zoom2.summary.total_items += 1;
zoom2.summary.bases_covered += u64::from(added_bases);
zoom2.summary.min_val = zoom2.summary.min_val.min(val);
zoom2.summary.max_val = zoom2.summary.max_val.max(val);
zoom2.summary.sum += f64::from(added_bases) * val;
zoom2.summary.sum_squares += f64::from(added_bases) * val * val;
}
if add_end == next_end {
zoom_item.records.push(zoom_item.live_info.take().unwrap());
}
add_start = add_end;
}
debug_assert_ne!(zoom_item.records.len(), options.items_per_slot as usize);
}
}
struct BigWigInvalidInput(String);
impl From<BigWigInvalidInput> for ProcessDataError {
fn from(value: BigWigInvalidInput) -> Self {
ProcessDataError::InvalidInput(value.0)
}
}
pub(crate) struct BigWigFullProcess {
summary: Summary,
items: Vec<Value>,
zoom_items: Vec<ZoomItem>,
ftx: BBIDataProcessoringInputSectionChannel,
chrom_id: u32,
options: BBIWriteOptions,
runtime: Handle,
chrom: String,
length: u32,
}
impl BBIDataProcessorCreate for BigWigFullProcess {
type I = InternalProcessData;
type Out = BBIDataProcessoredData;
fn create(internal_data: InternalProcessData) -> Self {
let InternalProcessData(zooms_channels, ftx, chrom_id, options, runtime, chrom, length) =
internal_data;
let summary = Summary {
total_items: 0,
bases_covered: 0,
min_val: f64::MAX,
max_val: f64::MIN,
sum: 0.0,
sum_squares: 0.0,
};
let items = Vec::with_capacity(options.items_per_slot as usize);
let zoom_items: Vec<ZoomItem> = zooms_channels
.into_iter()
.map(|(size, channel)| ZoomItem {
size,
live_info: None,
records: Vec::with_capacity(options.items_per_slot as usize),
channel,
})
.collect();
BigWigFullProcess {
summary,
items,
zoom_items,
ftx,
chrom_id,
options,
runtime,
chrom,
length,
}
}
fn destroy(self) -> BBIDataProcessoredData {
let Self {
mut summary,
items,
zoom_items,
..
} = self;
debug_assert!(items.is_empty());
for zoom_item in zoom_items.iter() {
debug_assert!(zoom_item.live_info.is_none());
debug_assert!(zoom_item.records.is_empty());
}
if summary.total_items == 0 {
summary.min_val = 0.0;
summary.max_val = 0.0;
}
BBIDataProcessoredData(summary)
}
}
impl BBIDataProcessor for BigWigFullProcess {
type Value = Value;
async fn do_process(
&mut self,
current_val: Value,
next_val: Option<&Value>,
) -> Result<(), ProcessDataError> {
let Self {
summary,
items,
zoom_items,
ftx,
chrom_id,
options,
runtime,
chrom,
length,
} = self;
let chrom_id = *chrom_id;
let length = *length;
process_val(
current_val,
next_val,
length,
&chrom,
summary,
items,
options,
&runtime,
ftx,
chrom_id,
)
.await?;
process_val_zoom(
zoom_items,
options,
current_val,
next_val,
&runtime,
chrom_id,
)
.await;
Ok(())
}
}
#[derive(Debug, Copy, Clone)]
struct ZoomCounts {
resolution: u64,
current_end: u64,
counts: u64,
}
struct BigWigNoZoomsProcess {
ftx: BBIDataProcessoringInputSectionChannel,
chrom_id: u32,
options: BBIWriteOptions,
runtime: Handle,
chrom: String,
length: u32,
summary: Summary,
items: Vec<Value>,
zoom_counts: Vec<ZoomCounts>,
}
impl BBIDataProcessorCreate for BigWigNoZoomsProcess {
type I = NoZoomsInternalProcessData;
type Out = NoZoomsInternalProcessedData;
fn create(internal_data: Self::I) -> Self {
let NoZoomsInternalProcessData(ftx, chrom_id, options, runtime, chrom, length) =
internal_data;
let summary = Summary {
total_items: 0,
bases_covered: 0,
min_val: f64::MAX,
max_val: f64::MIN,
sum: 0.0,
sum_squares: 0.0,
};
let items: Vec<Value> = Vec::with_capacity(options.items_per_slot as usize);
let zoom_counts: Vec<ZoomCounts> = std::iter::successors(Some(10), |z| Some(z * 4))
.take_while(|z| *z <= u64::MAX / 4 && *z <= length as u64 * 4)
.map(|z| ZoomCounts {
resolution: z,
current_end: 0,
counts: 0,
})
.collect();
BigWigNoZoomsProcess {
ftx,
chrom_id,
options,
runtime,
chrom,
length,
summary,
items,
zoom_counts,
}
}
fn destroy(self) -> Self::Out {
let BigWigNoZoomsProcess {
items,
mut summary,
zoom_counts,
..
} = self;
debug_assert!(items.is_empty());
if summary.total_items == 0 {
summary.min_val = 0.0;
summary.max_val = 0.0;
}
let zoom_counts = zoom_counts
.into_iter()
.map(|z| (z.resolution, z.counts))
.collect();
NoZoomsInternalProcessedData(summary, zoom_counts)
}
}
impl BBIDataProcessor for BigWigNoZoomsProcess {
type Value = Value;
async fn do_process(
&mut self,
current_val: Self::Value,
next_val: Option<&Self::Value>,
) -> Result<(), ProcessDataError> {
let BigWigNoZoomsProcess {
ftx,
chrom_id,
options,
runtime,
chrom,
length,
summary,
items,
zoom_counts,
} = self;
process_val(
current_val,
next_val,
*length,
&chrom,
summary,
items,
options,
&runtime,
ftx,
*chrom_id,
)
.await?;
for zoom in zoom_counts {
if current_val.start as u64 >= zoom.current_end {
zoom.counts += 1;
zoom.current_end = current_val.start as u64 + zoom.resolution;
}
while current_val.end as u64 > zoom.current_end {
zoom.counts += 1;
zoom.current_end += zoom.resolution;
}
}
Ok(())
}
}
struct BigWigZoomsProcess<W: Write + Seek + Send + 'static> {
temp_zoom_items: Vec<InternalTempZoomInfo<W>>,
chrom_id: u32,
options: BBIWriteOptions,
runtime: Handle,
zoom_items: Vec<ZoomItem>,
}
impl<W: Write + Seek + Send + 'static> BBIDataProcessorCreate for BigWigZoomsProcess<W> {
type I = ZoomsInternalProcessData<W>;
type Out = ZoomsInternalProcessedData<W>;
fn create(internal_data: Self::I) -> Self {
let ZoomsInternalProcessData(temp_zoom_items, zooms_channels, chrom_id, options, runtime) =
internal_data;
let zoom_items: Vec<ZoomItem> = zooms_channels
.into_iter()
.map(|(size, channel)| ZoomItem {
size,
live_info: None,
records: Vec::with_capacity(options.items_per_slot as usize),
channel,
})
.collect();
BigWigZoomsProcess {
temp_zoom_items,
chrom_id,
options,
runtime,
zoom_items,
}
}
fn destroy(self) -> Self::Out {
let BigWigZoomsProcess { zoom_items, .. } = self;
for zoom_item in zoom_items.iter() {
debug_assert!(zoom_item.live_info.is_none());
debug_assert!(zoom_item.records.is_empty());
}
ZoomsInternalProcessedData(self.temp_zoom_items)
}
}
impl<W: Write + Seek + Send + 'static> BBIDataProcessor for BigWigZoomsProcess<W> {
type Value = Value;
async fn do_process(
&mut self,
current_val: Self::Value,
next_val: Option<&Self::Value>,
) -> Result<(), ProcessDataError> {
let BigWigZoomsProcess {
chrom_id,
options,
runtime,
zoom_items,
..
} = self;
process_val_zoom(
zoom_items,
options,
current_val,
next_val,
&runtime,
*chrom_id,
)
.await;
Ok(())
}
}
async fn encode_section(
compress: bool,
items_in_section: Vec<Value>,
chrom_id: u32,
) -> io::Result<(SectionData, usize)> {
use libdeflater::{CompressionLvl, Compressor};
let mut bytes = Vec::with_capacity(24 + (items_in_section.len() * 24));
let start = items_in_section[0].start;
let end = items_in_section[items_in_section.len() - 1].end;
bytes.write_u32::<NativeEndian>(chrom_id)?;
bytes.write_u32::<NativeEndian>(start)?;
bytes.write_u32::<NativeEndian>(end)?;
bytes.write_u32::<NativeEndian>(0)?;
bytes.write_u32::<NativeEndian>(0)?;
bytes.write_u8(1)?;
bytes.write_u8(0)?;
bytes.write_u16::<NativeEndian>(items_in_section.len() as u16)?;
for item in items_in_section.iter() {
bytes.write_u32::<NativeEndian>(item.start)?;
bytes.write_u32::<NativeEndian>(item.end)?;
bytes.write_f32::<NativeEndian>(item.value)?;
}
let (out_bytes, uncompress_buf_size) = if compress {
let mut compressor = Compressor::new(CompressionLvl::default());
let max_sz = compressor.zlib_compress_bound(bytes.len());
let mut compressed_data = vec![0; max_sz];
let actual_sz = compressor
.zlib_compress(&bytes, &mut compressed_data)
.unwrap();
compressed_data.resize(actual_sz, 0);
(compressed_data, bytes.len())
} else {
(bytes, 0)
};
Ok((
SectionData {
chrom: chrom_id,
start,
end,
data: out_bytes,
},
uncompress_buf_size,
))
}