angsd_saf/reader/intersect.rs
1use std::{cmp::Ordering, io};
2
3use indexmap::IndexMap;
4
5use crate::{
6 record::{Id, Record},
7 version::Version,
8 ReadStatus,
9};
10
11use super::{Index, Reader};
12
13/// An intersection of SAF file readers.
14///
15/// The intersection takes an arbitrary number of readers and returns data where all readers
16/// contain data for the same contig and position. It is assumed that contigs are sorted in the
17/// same order in each file, and that positions are sorted numerically within each contig.
18pub struct Intersect<R, V> {
19 readers: Vec<Reader<R, V>>,
20 shared_contigs: SharedContigs,
21 ids: Vec<usize>, // Current reader contig IDs
22}
23
24impl<R, V> Intersect<R, V>
25where
26 R: io::BufRead + io::Seek,
27 V: Version,
28{
29 /// Returns a new collection of records suitable for use in reading.
30 pub fn create_record_bufs(&self) -> Vec<Record<Id, V::Item>> {
31 self.readers
32 .iter()
33 .map(|reader| reader.create_record_buf())
34 .collect()
35 }
36
37 /// Creates a new intersecting reader with an additional reader, consuming `self`.
38 ///
39 /// Since `self` is consumed, rather than mutated, this can be chained to build intersections
40 /// of multiple readers. See also the [`Reader::intersect`] method for a way to start create
41 /// the initial intersecting reader.
42 pub fn intersect(mut self, reader: Reader<R, V>) -> Self {
43 self.shared_contigs.add_index(reader.index());
44 self.readers.push(reader);
45 self.ids.push(0);
46 self
47 }
48
49 /// Returns the inner readers.
50 pub fn get_readers(&self) -> &[Reader<R, V>] {
51 &self.readers
52 }
53
54 /// Returns a mutable reference to the inner readers.
55 pub fn get_readers_mut(&mut self) -> &mut [Reader<R, V>] {
56 &mut self.readers
57 }
58
59 /// Returns the inner readers, consuming `self`.
60 pub fn into_readers(self) -> Vec<Reader<R, V>> {
61 self.readers
62 }
63
64 /// Creates a new intersecting reader from a collection of readers.
65 ///
66 /// # Panics
67 ///
68 /// Panics if `readers` is empty.
69 pub fn new(readers: Vec<Reader<R, V>>) -> Self {
70 match readers.as_slice() {
71 [] => panic!("cannot construct empty intersection"),
72 [fst, tl @ ..] => {
73 let mut contigs = SharedContigs::from(fst.index());
74 for reader in tl.iter() {
75 contigs.add_index(reader.index());
76 }
77
78 let ids = vec![0; readers.len()];
79
80 Self {
81 readers,
82 shared_contigs: contigs,
83 ids,
84 }
85 }
86 }
87 }
88
89 /// Reads a set of intersecting records, one from each contained reader.
90 ///
91 /// If successful, a record from each inner reader will be read into the corresponding buffer
92 /// such that all resulting records will be on the same contig and the same position.
93 ///
94 /// Note that the record buffer needs to be correctly set up. Use [`Self::create_record_bufs`]
95 /// for a correctly initialised record buffers to use for reading.
96 pub fn read_records(&mut self, bufs: &mut [Record<Id, V::Item>]) -> io::Result<ReadStatus> {
97 for ((reader, record), id) in self
98 .readers
99 .iter_mut()
100 .zip(bufs.iter_mut())
101 .zip(self.ids.iter_mut())
102 {
103 if reader.read_record(record)?.is_done() {
104 return Ok(ReadStatus::Done);
105 }
106
107 *id = *record.contig_id();
108 }
109
110 if self.read_until_shared_contig(bufs)?.is_done() {
111 return Ok(ReadStatus::Done);
112 }
113
114 match self.read_until_shared_position_on_contig(bufs)? {
115 Some(ReadStatus::Done) => Ok(ReadStatus::Done),
116 Some(ReadStatus::NotDone) => Ok(ReadStatus::NotDone),
117 None => self.read_records(bufs),
118 }
119 }
120
121 pub(super) fn from_reader(reader: Reader<R, V>) -> Self {
122 Self {
123 shared_contigs: SharedContigs::from(reader.index()),
124 readers: vec![reader],
125 ids: vec![0],
126 }
127 }
128
129 /// Read all readers until they are on a shared contig equal to or after the contigs defined
130 /// by the provided record buffers.
131 ///
132 /// If no more shared contigs exist, returns `Done`.
133 fn read_until_shared_contig(
134 &mut self,
135 bufs: &mut [Record<Id, V::Item>],
136 ) -> io::Result<ReadStatus> {
137 // For each record, get the first shared contig (by index into shared_contigs)
138 // equal to or after the contig ID of that record. Then get the greatest/most distant of
139 // those shared contigs. We can safely seek to this contig. If in the process, we find
140 // that a reader has no such shared contigs, we are done.
141 let mut next_idx = 0;
142 for (reader, buf) in self.readers.iter_mut().zip(bufs.iter_mut()) {
143 match self
144 .shared_contigs
145 .next_shared(reader.index(), *buf.contig_id())
146 {
147 Some(idx) => {
148 if idx > next_idx {
149 next_idx = idx;
150 }
151 }
152 None => return Ok(ReadStatus::Done),
153 }
154 }
155
156 // Seek all readers to candidate shared contig, if they are not on it already.
157 let next_ids = &self.shared_contigs.0[next_idx];
158 for (((reader, buf), next_id), id) in self
159 .readers
160 .iter_mut()
161 .zip(bufs.iter_mut())
162 .zip(next_ids.iter())
163 .zip(self.ids.iter_mut())
164 {
165 if buf.contig_id() != next_id {
166 reader.seek(*next_id)?;
167 reader.read_record(buf)?;
168 *id = *next_id;
169 }
170 }
171
172 Ok(ReadStatus::NotDone)
173 }
174
175 /// Reads all readers until they are on a shared position on the current contig.
176 ///
177 /// The starting read positions will be defined by the provided buffers; the current contig is
178 /// defined by `self.ids`.
179 ///
180 /// If no more shared positions exist, returns `Some(Done)`. If more shared positions may exist,
181 /// but not on the current contig, returns `None`. If `Some(NotDone)` is returned, an
182 /// intersecting positions has been found.
183 fn read_until_shared_position_on_contig(
184 &mut self,
185 bufs: &mut [Record<Id, V::Item>],
186 ) -> io::Result<Option<ReadStatus>> {
187 let mut max_pos = bufs
188 .iter()
189 .map(Record::position)
190 .max()
191 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "empty buffer slice"))?;
192
193 'outer: loop {
194 // We keep checking if all the records have reached the max position:
195 // if so, we have a shared record. If we find one greater than max, max is updated.
196 // As we go, we have to check that we have not reached a new contig ID.
197 'inner: for ((reader, record), id) in self
198 .readers
199 .iter_mut()
200 .zip(bufs.iter_mut())
201 .zip(self.ids.iter_mut())
202 {
203 let mut pos = record.position();
204
205 // A shared position must be at least as great as the max among all records
206 match pos.cmp(&max_pos) {
207 Ordering::Less => {
208 // If a position is less than the current max, we can forward the
209 // corresponding reader all the way to its first position equal to or
210 // greater than the current max
211 while pos < max_pos {
212 if reader.read_record(record)?.is_done() {
213 return Ok(Some(ReadStatus::Done));
214 }
215 if record.contig_id() != id {
216 *id = *record.contig_id();
217 return Ok(None);
218 }
219 pos = record.position();
220 }
221
222 if pos == max_pos {
223 continue 'inner;
224 } else {
225 // Forward overshot current max_pos, which means we have to start over
226 // checking all records
227 continue 'outer;
228 }
229 }
230 Ordering::Equal => (),
231 Ordering::Greater => {
232 max_pos = pos;
233 continue 'outer;
234 }
235 }
236 }
237
238 // To have reached this point, all record positions are at the current max, which means
239 // an intersection
240 return Ok(Some(ReadStatus::NotDone));
241 }
242 }
243}
244
245/// Shared contigs for readers based on their indexes.
246///
247/// The representation used is an ordered map from contig names to a vector of contig IDs,
248/// corresponding to the contig IDs used in the represented reader. For instance, if "chr1"
249/// is the first shared contig among two represented reads, and the first entry in the map is
250/// ("chr1", vec![1, 2]), then "chr1" has ID 1 in the first reader, and ID 2 in the second reader.
251/// As elsewhere here, the ID is based on the position in the index.
252///
253/// Note that as for `Intersect` generally, we assume that contigs occur in the same order in each
254/// index. That is, the same contigs may not be represented in each index, and the same contig may
255/// have a different IDs, but where two or more contigs occur in multiple indices, their ordering
256/// must be constant.
257#[derive(Clone, Debug)]
258struct SharedContigs(IndexMap<String, Vec<usize>>);
259
260impl SharedContigs {
261 /// Adds a new index to the collection.
262 ///
263 /// The new index will be placed last.
264 pub fn add_index<V>(&mut self, index: &Index<V>)
265 where
266 V: Version,
267 {
268 // Create a temporary mapping from names to IDs in the new index
269 let map: IndexMap<&str, usize> = index
270 .records()
271 .iter()
272 .enumerate()
273 .map(|(i, record)| (record.name(), i))
274 .collect();
275
276 // For each name in the current selection, check if the name exists in the new index:
277 // if so, (1) add its ID in the new index to the collection of IDs for this shared contig;
278 // if not, (2) the contig is no longer shared, and should be removed
279 self.0.retain(|name, ids| {
280 if let Some(new_id) = map.get(name.as_str()) {
281 // (1)
282 ids.push(*new_id);
283 true
284 } else {
285 // (2)
286 false
287 }
288 })
289 }
290
291 /// Returns the index in `self` of the first contig in `index` with ID equal to or greater
292 /// than `id`.
293 ///
294 /// If no more shared contigs exist in `index` after `id`, return `None`.
295 pub fn next_shared<V>(&self, index: &Index<V>, id: usize) -> Option<usize>
296 where
297 V: Version,
298 {
299 let name = index.records()[id].name();
300
301 self.0.get_index_of(name).or_else(|| {
302 index.records()[(id + 1)..]
303 .iter()
304 .find_map(|record| self.0.get_index_of(record.name()))
305 })
306 }
307}
308
309impl<V> From<&Index<V>> for SharedContigs
310where
311 V: Version,
312{
313 fn from(index: &Index<V>) -> Self {
314 index
315 .records()
316 .iter()
317 .enumerate()
318 .map(|(i, record)| (record.name().to_owned(), vec![i]))
319 .collect()
320 }
321}
322
323impl FromIterator<(String, Vec<usize>)> for SharedContigs {
324 fn from_iter<I>(iter: I) -> Self
325 where
326 I: IntoIterator<Item = (String, Vec<usize>)>,
327 {
328 Self(iter.into_iter().map(|(s, a)| (s, a)).collect())
329 }
330}