grass_runtime/algorithm/intersect/
inner.rs

1use super::heap::RegionHeap;
2use crate::algorithm::Sorted;
3use crate::property::{Region, RegionCore};
4use crate::ChrRef;
5
6pub(super) struct Context<I: Iterator + Sorted>
7where
8    I::Item: Region + Clone,
9{
10    iter: I,
11    peek_buffer: Option<I::Item>,
12    frontier: Vec<I::Item>,
13    active_regions: RegionHeap<I::Item>,
14}
15
16impl<I: Iterator + Sorted> Context<I>
17where
18    I::Item: Region + Clone,
19{
20    pub(super) fn from_iter(mut iter: I) -> Self {
21        let peek_buffer = iter.next();
22        Self {
23            iter,
24            peek_buffer,
25            frontier: Vec::new(),
26            active_regions: Default::default(),
27        }
28    }
29
30    fn skip_util_chrom(&mut self, target: &ChrRef) {
31        while let Some(head) = self.peek_buffer.as_ref() {
32            if &head.chrom() < target {
33                self.peek_buffer = self.iter.next();
34            } else {
35                break;
36            }
37        }
38    }
39
40    fn peek(&self) -> Option<&I::Item> {
41        self.peek_buffer.as_ref()
42    }
43
44    fn remove_inactive_regions(&mut self, chrom: &ChrRef, active_limit: u32) {
45        while let Some(top) = self.active_regions.peek() {
46            if &top.chrom() < chrom || top.end() <= active_limit {
47                self.active_regions.pop();
48            } else {
49                break;
50            }
51        }
52    }
53
54    fn push_frontier(&mut self) -> Option<u32> {
55        let new_frontier = self.peek_buffer.as_ref()?.start();
56        let chrom = self.peek_buffer.as_ref()?.chrom();
57
58        while let Some(region) = self.peek_buffer.as_ref() {
59            if region.start() == new_frontier && chrom == region.chrom() {
60                self.frontier.push(self.peek_buffer.take().unwrap());
61                self.peek_buffer = self.iter.next();
62            } else {
63                break;
64            }
65        }
66        self.remove_inactive_regions(&chrom, new_frontier);
67        Some(new_frontier)
68    }
69
70    fn flush_frontier(&mut self) {
71        for item in self.frontier.drain(0..self.frontier.len()) {
72            self.active_regions.push(item);
73        }
74    }
75
76    fn ingest_active_regions(&mut self, chrom: &ChrRef, active_limit: u32) {
77        while let Some(region) = self.peek_buffer.as_ref() {
78            if region.start() <= active_limit && &region.chrom() == chrom {
79                self.active_regions.push(self.peek_buffer.take().unwrap());
80                self.peek_buffer = self.iter.next();
81            } else {
82                break;
83            }
84        }
85        self.remove_inactive_regions(chrom, active_limit);
86    }
87}
88
89#[derive(Debug)]
90pub enum State {
91    FrontierA(usize, usize, Option<usize>),
92    FrontierB(usize, usize, Option<usize>),
93}
94
95impl State {
96    fn next<
97        A: Region + Clone,
98        B: Region + Clone,
99        IA: Iterator<Item = A> + Sorted,
100        IB: Iterator<Item = B> + Sorted,
101    >(
102        &mut self,
103        ctx: (&mut Context<IA>, &mut Context<IB>),
104    ) -> Option<(A, B)> {
105        match self {
106            Self::FrontierA(f_idx, h_idx, b_idx) if b_idx.is_none() => {
107                let ret = if *f_idx >= ctx.0.frontier.len() || ctx.1.active_regions.len() == 0 {
108                    return None;
109                } else {
110                    let a = ctx.0.frontier[*f_idx].clone();
111                    let b = ctx.1.active_regions.as_slice()[*h_idx].clone();
112                    (a, b)
113                };
114
115                if *f_idx == 0 && ret.1.start() == ret.0.start() && ctx.0.active_regions.len() > 0 {
116                    *b_idx = Some(0);
117                } else {
118                    *h_idx += 1;
119
120                    if *h_idx >= ctx.1.active_regions.len() {
121                        *f_idx += 1;
122                        *h_idx = 0;
123                    }
124                }
125                Some(ret)
126            }
127            Self::FrontierA(f_idx, h_idx, b_idx_ref) => {
128                let b_idx = b_idx_ref.unwrap();
129                let a = ctx.0.active_regions.as_slice()[b_idx].clone();
130                let b = ctx.1.active_regions.as_slice()[*h_idx].clone();
131                if b_idx == ctx.0.active_regions.len() - 1 {
132                    *h_idx += 1;
133                    if *h_idx >= ctx.1.active_regions.len() {
134                        *f_idx += 1;
135                        *h_idx = 0;
136                    }
137                    *b_idx_ref = None;
138                } else {
139                    *b_idx_ref = Some(b_idx + 1);
140                }
141                Some((a, b))
142            }
143            Self::FrontierB(f_idx, h_idx, b_idx) => {
144                let mut tmp_state = Self::FrontierA(*f_idx, *h_idx, *b_idx);
145                let ret = tmp_state.next((ctx.1, ctx.0)).map(|(b, a)| (a, b));
146                match tmp_state {
147                    Self::FrontierA(f, h, b) => {
148                        *f_idx = f;
149                        *h_idx = h;
150                        *b_idx = b;
151                    }
152                    _ => unreachable!(),
153                }
154                ret
155            }
156        }
157    }
158}
159
160pub struct SortedIntersectIter<IA: Iterator + Sorted, IB: Iterator + Sorted>
161where
162    IA::Item: Region + Clone,
163    IB::Item: Region + Clone,
164{
165    pub(super) context_a: Context<IA>,
166    pub(super) context_b: Context<IB>,
167    pub(super) state: State,
168}
169
170impl<IA, IB> Sorted for SortedIntersectIter<IA, IB>
171where
172    IA: Iterator + Sorted,
173    IB: Iterator + Sorted,
174    IA::Item: Region + Clone,
175    IB::Item: Region + Clone,
176{
177}
178
179impl<IA, IB> Iterator for SortedIntersectIter<IA, IB>
180where
181    IA: Iterator + Sorted,
182    IB: Iterator + Sorted,
183    IA::Item: Region + Clone,
184    IB::Item: Region + Clone,
185{
186    type Item = (IA::Item, IB::Item);
187    fn next(&mut self) -> Option<Self::Item> {
188        loop {
189            if let Some(next) = self.state.next((&mut self.context_a, &mut self.context_b)) {
190                return Some(next);
191            }
192
193            self.context_a.flush_frontier();
194            self.context_b.flush_frontier();
195
196            let (frontier_a, frontier_b) = loop {
197                let peek_a = self.context_a.peek();
198                let peek_b = self.context_b.peek();
199
200                if peek_a.is_none() && peek_b.is_none() {
201                    return None;
202                }
203
204                let chrom_cmp = if let (Some(peek_a), Some(peek_b)) = (peek_a, peek_b) {
205                    peek_a.chrom().cmp(&peek_b.chrom())
206                } else {
207                    std::cmp::Ordering::Equal
208                };
209
210                match chrom_cmp {
211                    std::cmp::Ordering::Less => {
212                        self.context_a
213                            .skip_util_chrom(&peek_b.as_ref().unwrap().chrom());
214                        self.context_a.frontier.clear();
215                        self.context_a.active_regions.data.clear();
216                    }
217                    std::cmp::Ordering::Greater => {
218                        self.context_b
219                            .skip_util_chrom(&peek_a.as_ref().unwrap().chrom());
220                        self.context_b.frontier.clear();
221                        self.context_b.active_regions.data.clear();
222                    }
223                    std::cmp::Ordering::Equal => {
224                        break (peek_a.map(|x| x.start()), peek_b.map(|x| x.start()));
225                    }
226                }
227            };
228
229            self.state =
230                if frontier_a.unwrap_or(std::u32::MAX) <= frontier_b.unwrap_or(std::u32::MAX) {
231                    let frontier = self.context_a.push_frontier()?;
232                    self.context_b
233                        .ingest_active_regions(&self.context_a.frontier[0].chrom(), frontier);
234                    State::FrontierA(0, 0, None)
235                } else {
236                    let frontier = self.context_b.push_frontier()?;
237                    self.context_a
238                        .ingest_active_regions(&self.context_b.frontier[0].chrom(), frontier);
239                    State::FrontierB(0, 0, None)
240                };
241        }
242    }
243}