d4/index/data_index/
data.rs1use std::{io::Result, iter::Once, marker::PhantomData};
2
3use crate::{
4 task::{Task, TaskContext, TaskOutputVec, TaskPartition},
5 D4TrackReader,
6};
7
8use super::DataIndexType;
9
10pub trait DataSummary: Sized + Send + Sync + Clone {
11 const INDEX_NAME: &'static str;
12 const INDEX_TYPE_CODE: DataIndexType;
13 fn identity() -> Self;
14 fn add_data(&self, pos: u32, val: i32) -> Self;
15 fn combine(&self, other: &Self) -> Self;
16 fn to_native_byte_order(&self) -> Self;
17 fn to_format_byte_order(&self) -> Self;
18 fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
19 let mut ret = Self::identity();
20 for pos in begin..end {
21 ret = ret.add_data(pos, val);
22 }
23 ret
24 }
25 fn from_data_iter<I: Iterator<Item = (u32, i32)>>(iter: I) -> Self {
26 iter.fold(Self::identity(), |sum, (pos, val)| sum.add_data(pos, val))
27 }
28 fn combine_iter<'a, I: Iterator<Item = &'a Self>>(iter: I) -> Self
29 where
30 Self: 'a,
31 {
32 iter.fold(Self::identity(), |sum, value| sum.combine(value))
33 }
34 fn run_summary_task(reader: &mut D4TrackReader, bin_size: u32) -> Result<TaskOutputVec<Self>> {
35 let chrom_list = reader.header().chrom_list().to_owned();
36 let task_array: Vec<_> = chrom_list
37 .iter()
38 .map(|seq| {
39 let chrom = seq.name.as_str();
40 (0..(seq.size as u32 + bin_size - 1) / bin_size).map(move |idx| {
41 (
42 chrom,
43 idx * bin_size,
44 ((idx + 1) * bin_size).min(seq.size as u32),
45 )
46 })
47 })
48 .flatten()
49 .map(|(chrom, begin, end)| DataSummaryTask::<Self> {
50 chrom,
51 begin,
52 end,
53 _phantom_data: Default::default(),
54 })
55 .collect();
56
57 Ok(TaskContext::new(reader, task_array)?.run())
58 }
59}
60
61#[derive(Clone, Copy, Debug)]
62pub struct Sum(f64);
63
64impl Sum {
65 pub fn mean(&self, base_count: u32) -> f64 {
66 self.0 / base_count as f64
67 }
68 pub fn sum(&self) -> f64 {
69 self.0
70 }
71}
72
73impl DataSummary for Sum {
74 fn identity() -> Self {
75 Sum(0.0)
76 }
77
78 fn add_data(&self, _: u32, val: i32) -> Self {
79 Sum(self.0 + val as f64)
80 }
81
82 fn add_data_range(&self, begin: u32, end: u32, val: i32) -> Self {
83 Sum(self.0 + (end - begin) as f64 * val as f64)
84 }
85
86 fn combine(&self, other: &Self) -> Self {
87 Sum(self.0 + other.0)
88 }
89
90 fn to_native_byte_order(&self) -> Self {
91 *self
92 }
93
94 fn to_format_byte_order(&self) -> Self {
95 *self
96 }
97
98 const INDEX_NAME: &'static str = "sum_index";
99
100 const INDEX_TYPE_CODE: DataIndexType = DataIndexType::Sum;
101}
102
103pub struct DataSummaryTask<'a, T: DataSummary> {
104 chrom: &'a str,
105 begin: u32,
106 end: u32,
107 _phantom_data: PhantomData<T>,
108}
109
110pub struct DataSummaryTaskPart<'a, T: DataSummary> {
111 sum: T,
112 _phantom_data: PhantomData<&'a ()>,
113}
114
115impl<'a, T: DataSummary> TaskPartition<Once<i32>> for DataSummaryTaskPart<'a, T> {
116 type ParentType = DataSummaryTask<'a, T>;
117
118 type ResultType = T;
119
120 fn new(_left: u32, _right: u32, _parent: &Self::ParentType) -> Self {
121 Self {
122 sum: T::identity(),
123 _phantom_data: PhantomData::default(),
124 }
125 }
126
127 fn feed(&mut self, pos: u32, value: &mut Once<i32>) -> bool {
128 self.sum = self.sum.add_data(pos, value.next().unwrap());
129 true
130 }
131
132 fn feed_range(&mut self, left: u32, right: u32, value: &mut Once<i32>) -> bool {
133 self.sum = self.sum.add_data_range(left, right, value.next().unwrap());
134 true
135 }
136
137 fn result(&mut self) -> Self::ResultType {
138 self.sum.clone()
139 }
140}
141
142impl<'a, T: DataSummary> Task<Once<i32>> for DataSummaryTask<'a, T> {
143 type Partition = DataSummaryTaskPart<'a, T>;
144
145 type Output = T;
146
147 fn region(&self) -> (&str, u32, u32) {
148 (self.chrom, self.begin, self.end)
149 }
150
151 fn combine(&self, parts: &[T]) -> Self::Output {
152 T::combine_iter(parts.iter())
153 }
154}