grass_runtime/algorithm/intersect/
inner.rs1use super::heap::RegionHeap;
2use crate::algorithm::Sorted;
3use crate::property::{Region, RegionCore};
4use crate::ChrRef;
5
6pub(super) struct Context<I: Iterator + Sorted>
7where
8 I::Item: Region + Clone,
9{
10 iter: I,
11 peek_buffer: Option<I::Item>,
12 frontier: Vec<I::Item>,
13 active_regions: RegionHeap<I::Item>,
14}
15
16impl<I: Iterator + Sorted> Context<I>
17where
18 I::Item: Region + Clone,
19{
20 pub(super) fn from_iter(mut iter: I) -> Self {
21 let peek_buffer = iter.next();
22 Self {
23 iter,
24 peek_buffer,
25 frontier: Vec::new(),
26 active_regions: Default::default(),
27 }
28 }
29
30 fn skip_util_chrom(&mut self, target: &ChrRef) {
31 while let Some(head) = self.peek_buffer.as_ref() {
32 if &head.chrom() < target {
33 self.peek_buffer = self.iter.next();
34 } else {
35 break;
36 }
37 }
38 }
39
40 fn peek(&self) -> Option<&I::Item> {
41 self.peek_buffer.as_ref()
42 }
43
44 fn remove_inactive_regions(&mut self, chrom: &ChrRef, active_limit: u32) {
45 while let Some(top) = self.active_regions.peek() {
46 if &top.chrom() < chrom || top.end() <= active_limit {
47 self.active_regions.pop();
48 } else {
49 break;
50 }
51 }
52 }
53
54 fn push_frontier(&mut self) -> Option<u32> {
55 let new_frontier = self.peek_buffer.as_ref()?.start();
56 let chrom = self.peek_buffer.as_ref()?.chrom();
57
58 while let Some(region) = self.peek_buffer.as_ref() {
59 if region.start() == new_frontier && chrom == region.chrom() {
60 self.frontier.push(self.peek_buffer.take().unwrap());
61 self.peek_buffer = self.iter.next();
62 } else {
63 break;
64 }
65 }
66 self.remove_inactive_regions(&chrom, new_frontier);
67 Some(new_frontier)
68 }
69
70 fn flush_frontier(&mut self) {
71 for item in self.frontier.drain(0..self.frontier.len()) {
72 self.active_regions.push(item);
73 }
74 }
75
76 fn ingest_active_regions(&mut self, chrom: &ChrRef, active_limit: u32) {
77 while let Some(region) = self.peek_buffer.as_ref() {
78 if region.start() <= active_limit && ®ion.chrom() == chrom {
79 self.active_regions.push(self.peek_buffer.take().unwrap());
80 self.peek_buffer = self.iter.next();
81 } else {
82 break;
83 }
84 }
85 self.remove_inactive_regions(chrom, active_limit);
86 }
87}
88
89#[derive(Debug)]
90pub enum State {
91 FrontierA(usize, usize, Option<usize>),
92 FrontierB(usize, usize, Option<usize>),
93}
94
95impl State {
96 fn next<
97 A: Region + Clone,
98 B: Region + Clone,
99 IA: Iterator<Item = A> + Sorted,
100 IB: Iterator<Item = B> + Sorted,
101 >(
102 &mut self,
103 ctx: (&mut Context<IA>, &mut Context<IB>),
104 ) -> Option<(A, B)> {
105 match self {
106 Self::FrontierA(f_idx, h_idx, b_idx) if b_idx.is_none() => {
107 let ret = if *f_idx >= ctx.0.frontier.len() || ctx.1.active_regions.len() == 0 {
108 return None;
109 } else {
110 let a = ctx.0.frontier[*f_idx].clone();
111 let b = ctx.1.active_regions.as_slice()[*h_idx].clone();
112 (a, b)
113 };
114
115 if *f_idx == 0 && ret.1.start() == ret.0.start() && ctx.0.active_regions.len() > 0 {
116 *b_idx = Some(0);
117 } else {
118 *h_idx += 1;
119
120 if *h_idx >= ctx.1.active_regions.len() {
121 *f_idx += 1;
122 *h_idx = 0;
123 }
124 }
125 Some(ret)
126 }
127 Self::FrontierA(f_idx, h_idx, b_idx_ref) => {
128 let b_idx = b_idx_ref.unwrap();
129 let a = ctx.0.active_regions.as_slice()[b_idx].clone();
130 let b = ctx.1.active_regions.as_slice()[*h_idx].clone();
131 if b_idx == ctx.0.active_regions.len() - 1 {
132 *h_idx += 1;
133 if *h_idx >= ctx.1.active_regions.len() {
134 *f_idx += 1;
135 *h_idx = 0;
136 }
137 *b_idx_ref = None;
138 } else {
139 *b_idx_ref = Some(b_idx + 1);
140 }
141 Some((a, b))
142 }
143 Self::FrontierB(f_idx, h_idx, b_idx) => {
144 let mut tmp_state = Self::FrontierA(*f_idx, *h_idx, *b_idx);
145 let ret = tmp_state.next((ctx.1, ctx.0)).map(|(b, a)| (a, b));
146 match tmp_state {
147 Self::FrontierA(f, h, b) => {
148 *f_idx = f;
149 *h_idx = h;
150 *b_idx = b;
151 }
152 _ => unreachable!(),
153 }
154 ret
155 }
156 }
157 }
158}
159
160pub struct SortedIntersectIter<IA: Iterator + Sorted, IB: Iterator + Sorted>
161where
162 IA::Item: Region + Clone,
163 IB::Item: Region + Clone,
164{
165 pub(super) context_a: Context<IA>,
166 pub(super) context_b: Context<IB>,
167 pub(super) state: State,
168}
169
170impl<IA, IB> Sorted for SortedIntersectIter<IA, IB>
171where
172 IA: Iterator + Sorted,
173 IB: Iterator + Sorted,
174 IA::Item: Region + Clone,
175 IB::Item: Region + Clone,
176{
177}
178
179impl<IA, IB> Iterator for SortedIntersectIter<IA, IB>
180where
181 IA: Iterator + Sorted,
182 IB: Iterator + Sorted,
183 IA::Item: Region + Clone,
184 IB::Item: Region + Clone,
185{
186 type Item = (IA::Item, IB::Item);
187 fn next(&mut self) -> Option<Self::Item> {
188 loop {
189 if let Some(next) = self.state.next((&mut self.context_a, &mut self.context_b)) {
190 return Some(next);
191 }
192
193 self.context_a.flush_frontier();
194 self.context_b.flush_frontier();
195
196 let (frontier_a, frontier_b) = loop {
197 let peek_a = self.context_a.peek();
198 let peek_b = self.context_b.peek();
199
200 if peek_a.is_none() && peek_b.is_none() {
201 return None;
202 }
203
204 let chrom_cmp = if let (Some(peek_a), Some(peek_b)) = (peek_a, peek_b) {
205 peek_a.chrom().cmp(&peek_b.chrom())
206 } else {
207 std::cmp::Ordering::Equal
208 };
209
210 match chrom_cmp {
211 std::cmp::Ordering::Less => {
212 self.context_a
213 .skip_util_chrom(&peek_b.as_ref().unwrap().chrom());
214 self.context_a.frontier.clear();
215 self.context_a.active_regions.data.clear();
216 }
217 std::cmp::Ordering::Greater => {
218 self.context_b
219 .skip_util_chrom(&peek_a.as_ref().unwrap().chrom());
220 self.context_b.frontier.clear();
221 self.context_b.active_regions.data.clear();
222 }
223 std::cmp::Ordering::Equal => {
224 break (peek_a.map(|x| x.start()), peek_b.map(|x| x.start()));
225 }
226 }
227 };
228
229 self.state =
230 if frontier_a.unwrap_or(std::u32::MAX) <= frontier_b.unwrap_or(std::u32::MAX) {
231 let frontier = self.context_a.push_frontier()?;
232 self.context_b
233 .ingest_active_regions(&self.context_a.frontier[0].chrom(), frontier);
234 State::FrontierA(0, 0, None)
235 } else {
236 let frontier = self.context_b.push_frontier()?;
237 self.context_a
238 .ingest_active_regions(&self.context_b.frontier[0].chrom(), frontier);
239 State::FrontierB(0, 0, None)
240 };
241 }
242 }
243}