d4/index/data_index/
data.rs

1use std::{io::Result, iter::Once, marker::PhantomData};
2
3use crate::{
4    task::{Task, TaskContext, TaskOutputVec, TaskPartition},
5    D4TrackReader,
6};
7
8use super::DataIndexType;
9
10pub trait DataSummary: Sized + Send + Sync + Clone {
11    const INDEX_NAME: &'static str;
12    const INDEX_TYPE_CODE: DataIndexType;
13    fn identity() -> Self;
14    fn add_data(&self, pos: u32, val: i32) -> Self;
15    fn combine(&self, other: &Self) -> Self;
16    fn to_native_byte_order(&self) -> Self;
17    fn to_format_byte_order(&self) -> Self;
18    fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
19        let mut ret = Self::identity();
20        for pos in begin..end {
21            ret = ret.add_data(pos, val);
22        }
23        ret
24    }
25    fn from_data_iter<I: Iterator<Item = (u32, i32)>>(iter: I) -> Self {
26        iter.fold(Self::identity(), |sum, (pos, val)| sum.add_data(pos, val))
27    }
28    fn combine_iter<'a, I: Iterator<Item = &'a Self>>(iter: I) -> Self
29    where
30        Self: 'a,
31    {
32        iter.fold(Self::identity(), |sum, value| sum.combine(value))
33    }
34    fn run_summary_task(reader: &mut D4TrackReader, bin_size: u32) -> Result<TaskOutputVec<Self>> {
35        let chrom_list = reader.header().chrom_list().to_owned();
36        let task_array: Vec<_> = chrom_list
37            .iter()
38            .map(|seq| {
39                let chrom = seq.name.as_str();
40                (0..(seq.size as u32 + bin_size - 1) / bin_size).map(move |idx| {
41                    (
42                        chrom,
43                        idx * bin_size,
44                        ((idx + 1) * bin_size).min(seq.size as u32),
45                    )
46                })
47            })
48            .flatten()
49            .map(|(chrom, begin, end)| DataSummaryTask::<Self> {
50                chrom,
51                begin,
52                end,
53                _phantom_data: Default::default(),
54            })
55            .collect();
56
57        Ok(TaskContext::new(reader, task_array)?.run())
58    }
59}
60
61#[derive(Clone, Copy, Debug)]
62pub struct Sum(f64);
63
64impl Sum {
65    pub fn mean(&self, base_count: u32) -> f64 {
66        self.0 / base_count as f64
67    }
68    pub fn sum(&self) -> f64 {
69        self.0
70    }
71}
72
73impl DataSummary for Sum {
74    fn identity() -> Self {
75        Sum(0.0)
76    }
77
78    fn add_data(&self, _: u32, val: i32) -> Self {
79        Sum(self.0 + val as f64)
80    }
81
82    fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
83        Sum(self.0 + (end - begin) as f64 * val as f64)
84    }
85
86    fn combine(&self, other: &Self) -> Self {
87        Sum(self.0 + other.0)
88    }
89
90    fn to_native_byte_order(&self) -> Self {
91        *self
92    }
93
94    fn to_format_byte_order(&self) -> Self {
95        *self
96    }
97
98    const INDEX_NAME: &'static str = "sum_index";
99
100    const INDEX_TYPE_CODE: DataIndexType = DataIndexType::Sum;
101}
102
103pub struct DataSummaryTask<'a, T: DataSummary> {
104    chrom: &'a str,
105    begin: u32,
106    end: u32,
107    _phantom_data: PhantomData<T>,
108}
109
110pub struct DataSummaryTaskPart<'a, T: DataSummary> {
111    sum: T,
112    _phantom_data: PhantomData<&'a ()>,
113}
114
115impl<'a, T: DataSummary> TaskPartition<Once<i32>> for DataSummaryTaskPart<'a, T> {
116    type ParentType = DataSummaryTask<'a, T>;
117
118    type ResultType = T;
119
120    fn new(_left: u32, _right: u32, _parent: &Self::ParentType) -> Self {
121        Self {
122            sum: T::identity(),
123            _phantom_data: PhantomData::default(),
124        }
125    }
126
127    fn feed(&mut self, pos: u32, value: &mut Once<i32>) -> bool {
128        self.sum = self.sum.add_data(pos, value.next().unwrap());
129        true
130    }
131
132    fn feed_range(&mut self, left: u32, right: u32, value: &mut Once<i32>) -> bool {
133        self.sum = self.sum.add_data_range(left, right, value.next().unwrap());
134        true
135    }
136
137    fn result(&mut self) -> Self::ResultType {
138        self.sum.clone()
139    }
140}
141
142impl<'a, T: DataSummary> Task<Once<i32>> for DataSummaryTask<'a, T> {
143    type Partition = DataSummaryTaskPart<'a, T>;
144
145    type Output = T;
146
147    fn region(&self) -> (&str, u32, u32) {
148        (self.chrom, self.begin, self.end)
149    }
150
151    fn combine(&self, parts: &[T]) -> Self::Output {
152        T::combine_iter(parts.iter())
153    }
154}