d4 0.3.9

The D4 file format implementation
Documentation
use std::{io::Result, iter::Once, marker::PhantomData};

use crate::{
    task::{Task, TaskContext, TaskOutputVec, TaskPartition},
    D4TrackReader,
};

use super::DataIndexType;

pub trait DataSummary: Sized + Send + Sync + Clone {
    const INDEX_NAME: &'static str;
    const INDEX_TYPE_CODE: DataIndexType;
    fn identity() -> Self;
    fn add_data(&self, pos: u32, val: i32) -> Self;
    fn combine(&self, other: &Self) -> Self;
    fn to_native_byte_order(&self) -> Self;
    fn to_format_byte_order(&self) -> Self;
    fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
        let mut ret = Self::identity();
        for pos in begin..end {
            ret = ret.add_data(pos, val);
        }
        ret
    }
    fn from_data_iter<I: Iterator<Item = (u32, i32)>>(iter: I) -> Self {
        iter.fold(Self::identity(), |sum, (pos, val)| sum.add_data(pos, val))
    }
    fn combine_iter<'a, I: Iterator<Item = &'a Self>>(iter: I) -> Self
    where
        Self: 'a,
    {
        iter.fold(Self::identity(), |sum, value| sum.combine(value))
    }
    fn run_summary_task(reader: &mut D4TrackReader, bin_size: u32) -> Result<TaskOutputVec<Self>> {
        let chrom_list = reader.header().chrom_list().to_owned();
        let task_array: Vec<_> = chrom_list
            .iter()
            .map(|seq| {
                let chrom = seq.name.as_str();
                (0..(seq.size as u32 + bin_size - 1) / bin_size).map(move |idx| {
                    (
                        chrom,
                        idx * bin_size,
                        ((idx + 1) * bin_size).min(seq.size as u32),
                    )
                })
            })
            .flatten()
            .map(|(chrom, begin, end)| DataSummaryTask::<Self> {
                chrom,
                begin,
                end,
                _phantom_data: Default::default(),
            })
            .collect();

        Ok(TaskContext::new(reader, task_array)?.run())
    }
}

#[derive(Clone, Copy, Debug)]
pub struct Sum(f64);

impl Sum {
    pub fn mean(&self, base_count: u32) -> f64 {
        self.0 / base_count as f64
    }
    pub fn sum(&self) -> f64 {
        self.0
    }
}

impl DataSummary for Sum {
    fn identity() -> Self {
        Sum(0.0)
    }

    fn add_data(&self, _: u32, val: i32) -> Self {
        Sum(self.0 + val as f64)
    }

    fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
        Sum(self.0 + (end - begin) as f64 * val as f64)
    }

    fn combine(&self, other: &Self) -> Self {
        Sum(self.0 + other.0)
    }

    fn to_native_byte_order(&self) -> Self {
        *self
    }

    fn to_format_byte_order(&self) -> Self {
        *self
    }

    const INDEX_NAME: &'static str = "sum_index";

    const INDEX_TYPE_CODE: DataIndexType = DataIndexType::Sum;
}

pub struct DataSummaryTask<'a, T: DataSummary> {
    chrom: &'a str,
    begin: u32,
    end: u32,
    _phantom_data: PhantomData<T>,
}

pub struct DataSummaryTaskPart<'a, T: DataSummary> {
    sum: T,
    _phantom_data: PhantomData<&'a ()>,
}

impl<'a, T: DataSummary> TaskPartition<Once<i32>> for DataSummaryTaskPart<'a, T> {
    type ParentType = DataSummaryTask<'a, T>;

    type ResultType = T;

    fn new(_left: u32, _right: u32, _parent: &Self::ParentType) -> Self {
        Self {
            sum: T::identity(),
            _phantom_data: PhantomData::default(),
        }
    }

    fn feed(&mut self, pos: u32, value: &mut Once<i32>) -> bool {
        self.sum = self.sum.add_data(pos, value.next().unwrap());
        true
    }

    fn feed_range(&mut self, left: u32, right: u32, value: &mut Once<i32>) -> bool {
        self.sum = self.sum.add_data_range(left, right, value.next().unwrap());
        true
    }

    fn result(&mut self) -> Self::ResultType {
        self.sum.clone()
    }
}

impl<'a, T: DataSummary> Task<Once<i32>> for DataSummaryTask<'a, T> {
    type Partition = DataSummaryTaskPart<'a, T>;

    type Output = T;

    fn region(&self) -> (&str, u32, u32) {
        (self.chrom, self.begin, self.end)
    }

    fn combine(&self, parts: &[T]) -> Self::Output {
        T::combine_iter(parts.iter())
    }
}