angsd_saf/reader/
intersect.rs

1use std::{cmp::Ordering, io};
2
3use indexmap::IndexMap;
4
5use crate::{
6    record::{Id, Record},
7    version::Version,
8    ReadStatus,
9};
10
11use super::{Index, Reader};
12
13/// An intersection of SAF file readers.
14///
15/// The intersection takes an arbitrary number of readers and returns data where all readers
16/// contain data for the same contig and position. It is assumed that contigs are sorted in the
17/// same order in each file, and that positions are sorted numerically within each contig.
18pub struct Intersect<R, V> {
19    readers: Vec<Reader<R, V>>,
20    shared_contigs: SharedContigs,
21    ids: Vec<usize>, // Current reader contig IDs
22}
23
24impl<R, V> Intersect<R, V>
25where
26    R: io::BufRead + io::Seek,
27    V: Version,
28{
29    /// Returns a new collection of records suitable for use in reading.
30    pub fn create_record_bufs(&self) -> Vec<Record<Id, V::Item>> {
31        self.readers
32            .iter()
33            .map(|reader| reader.create_record_buf())
34            .collect()
35    }
36
37    /// Creates a new intersecting reader with an additional reader, consuming `self`.
38    ///
39    /// Since `self` is consumed, rather than mutated, this can be chained to build intersections
40    /// of multiple readers. See also the [`Reader::intersect`] method for a way to start create
41    /// the initial intersecting reader.
42    pub fn intersect(mut self, reader: Reader<R, V>) -> Self {
43        self.shared_contigs.add_index(reader.index());
44        self.readers.push(reader);
45        self.ids.push(0);
46        self
47    }
48
49    /// Returns the inner readers.
50    pub fn get_readers(&self) -> &[Reader<R, V>] {
51        &self.readers
52    }
53
54    /// Returns a mutable reference to the inner readers.
55    pub fn get_readers_mut(&mut self) -> &mut [Reader<R, V>] {
56        &mut self.readers
57    }
58
59    /// Returns the inner readers, consuming `self`.
60    pub fn into_readers(self) -> Vec<Reader<R, V>> {
61        self.readers
62    }
63
64    /// Creates a new intersecting reader from a collection of readers.
65    ///
66    /// # Panics
67    ///
68    /// Panics if `readers` is empty.
69    pub fn new(readers: Vec<Reader<R, V>>) -> Self {
70        match readers.as_slice() {
71            [] => panic!("cannot construct empty intersection"),
72            [fst, tl @ ..] => {
73                let mut contigs = SharedContigs::from(fst.index());
74                for reader in tl.iter() {
75                    contigs.add_index(reader.index());
76                }
77
78                let ids = vec![0; readers.len()];
79
80                Self {
81                    readers,
82                    shared_contigs: contigs,
83                    ids,
84                }
85            }
86        }
87    }
88
89    /// Reads a set of intersecting records, one from each contained reader.
90    ///
91    /// If successful, a record from each inner reader will be read into the corresponding buffer
92    /// such that all resulting records will be on the same contig and the same position.
93    ///
94    /// Note that the record buffer needs to be correctly set up. Use [`Self::create_record_bufs`]
95    /// for a correctly initialised record buffers to use for reading.
96    pub fn read_records(&mut self, bufs: &mut [Record<Id, V::Item>]) -> io::Result<ReadStatus> {
97        for ((reader, record), id) in self
98            .readers
99            .iter_mut()
100            .zip(bufs.iter_mut())
101            .zip(self.ids.iter_mut())
102        {
103            if reader.read_record(record)?.is_done() {
104                return Ok(ReadStatus::Done);
105            }
106
107            *id = *record.contig_id();
108        }
109
110        if self.read_until_shared_contig(bufs)?.is_done() {
111            return Ok(ReadStatus::Done);
112        }
113
114        match self.read_until_shared_position_on_contig(bufs)? {
115            Some(ReadStatus::Done) => Ok(ReadStatus::Done),
116            Some(ReadStatus::NotDone) => Ok(ReadStatus::NotDone),
117            None => self.read_records(bufs),
118        }
119    }
120
121    pub(super) fn from_reader(reader: Reader<R, V>) -> Self {
122        Self {
123            shared_contigs: SharedContigs::from(reader.index()),
124            readers: vec![reader],
125            ids: vec![0],
126        }
127    }
128
129    /// Read all readers until they are on a shared contig equal to or after the contigs defined
130    /// by the provided record buffers.
131    ///
132    /// If no more shared contigs exist, returns `Done`.
133    fn read_until_shared_contig(
134        &mut self,
135        bufs: &mut [Record<Id, V::Item>],
136    ) -> io::Result<ReadStatus> {
137        // For each record, get the first shared contig (by index into shared_contigs)
138        // equal to or after the contig ID of that record. Then get the greatest/most distant of
139        // those shared contigs. We can safely seek to this contig. If in the process, we find
140        // that a reader has no such shared contigs, we are done.
141        let mut next_idx = 0;
142        for (reader, buf) in self.readers.iter_mut().zip(bufs.iter_mut()) {
143            match self
144                .shared_contigs
145                .next_shared(reader.index(), *buf.contig_id())
146            {
147                Some(idx) => {
148                    if idx > next_idx {
149                        next_idx = idx;
150                    }
151                }
152                None => return Ok(ReadStatus::Done),
153            }
154        }
155
156        // Seek all readers to candidate shared contig, if they are not on it already.
157        let next_ids = &self.shared_contigs.0[next_idx];
158        for (((reader, buf), next_id), id) in self
159            .readers
160            .iter_mut()
161            .zip(bufs.iter_mut())
162            .zip(next_ids.iter())
163            .zip(self.ids.iter_mut())
164        {
165            if buf.contig_id() != next_id {
166                reader.seek(*next_id)?;
167                reader.read_record(buf)?;
168                *id = *next_id;
169            }
170        }
171
172        Ok(ReadStatus::NotDone)
173    }
174
175    /// Reads all readers until they are on a shared position on the current contig.
176    ///
177    /// The starting read positions will be defined by the provided buffers; the current contig is
178    /// defined by `self.ids`.
179    ///
180    /// If no more shared positions exist, returns `Some(Done)`. If more shared positions may exist,
181    /// but not on the current contig, returns `None`. If `Some(NotDone)` is returned, an
182    /// intersecting positions has been found.
183    fn read_until_shared_position_on_contig(
184        &mut self,
185        bufs: &mut [Record<Id, V::Item>],
186    ) -> io::Result<Option<ReadStatus>> {
187        let mut max_pos = bufs
188            .iter()
189            .map(Record::position)
190            .max()
191            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "empty buffer slice"))?;
192
193        'outer: loop {
194            // We keep checking if all the records have reached the max position:
195            // if so, we have a shared record. If we find one greater than max, max is updated.
196            // As we go, we have to check that we have not reached a new contig ID.
197            'inner: for ((reader, record), id) in self
198                .readers
199                .iter_mut()
200                .zip(bufs.iter_mut())
201                .zip(self.ids.iter_mut())
202            {
203                let mut pos = record.position();
204
205                // A shared position must be at least as great as the max among all records
206                match pos.cmp(&max_pos) {
207                    Ordering::Less => {
208                        // If a position is less than the current max, we can forward the
209                        // corresponding reader all the way to its first position equal to or
210                        // greater than the current max
211                        while pos < max_pos {
212                            if reader.read_record(record)?.is_done() {
213                                return Ok(Some(ReadStatus::Done));
214                            }
215                            if record.contig_id() != id {
216                                *id = *record.contig_id();
217                                return Ok(None);
218                            }
219                            pos = record.position();
220                        }
221
222                        if pos == max_pos {
223                            continue 'inner;
224                        } else {
225                            // Forward overshot current max_pos, which means we have to start over
226                            // checking all records
227                            continue 'outer;
228                        }
229                    }
230                    Ordering::Equal => (),
231                    Ordering::Greater => {
232                        max_pos = pos;
233                        continue 'outer;
234                    }
235                }
236            }
237
238            // To have reached this point, all record positions are at the current max, which means
239            // an intersection
240            return Ok(Some(ReadStatus::NotDone));
241        }
242    }
243}
244
245/// Shared contigs for readers based on their indexes.
246///
247/// The representation used is an ordered map from contig names to a vector of contig IDs,
248/// corresponding to the contig IDs used in the represented reader. For instance, if "chr1"
249/// is the first shared contig among two represented reads, and the first entry in the map is
250/// ("chr1", vec![1, 2]), then "chr1" has ID 1 in the first reader, and ID 2 in the second reader.
251/// As elsewhere here, the ID is based on the position in the index.
252///
253/// Note that as for `Intersect` generally, we assume that contigs occur in the same order in each
254/// index. That is, the same contigs may not be represented in each index, and the same contig may
255/// have a different IDs, but where two or more contigs occur in multiple indices, their ordering
256/// must be constant.
257#[derive(Clone, Debug)]
258struct SharedContigs(IndexMap<String, Vec<usize>>);
259
260impl SharedContigs {
261    /// Adds a new index to the collection.
262    ///
263    /// The new index will be placed last.
264    pub fn add_index<V>(&mut self, index: &Index<V>)
265    where
266        V: Version,
267    {
268        // Create a temporary mapping from names to IDs in the new index
269        let map: IndexMap<&str, usize> = index
270            .records()
271            .iter()
272            .enumerate()
273            .map(|(i, record)| (record.name(), i))
274            .collect();
275
276        // For each name in the current selection, check if the name exists in the new index:
277        // if so, (1) add its ID in the new index to the collection of IDs for this shared contig;
278        // if not, (2) the contig is no longer shared, and should be removed
279        self.0.retain(|name, ids| {
280            if let Some(new_id) = map.get(name.as_str()) {
281                // (1)
282                ids.push(*new_id);
283                true
284            } else {
285                // (2)
286                false
287            }
288        })
289    }
290
291    /// Returns the index in `self` of the first contig in `index` with ID equal to or greater
292    /// than `id`.
293    ///
294    /// If no more shared contigs exist in `index` after `id`, return `None`.
295    pub fn next_shared<V>(&self, index: &Index<V>, id: usize) -> Option<usize>
296    where
297        V: Version,
298    {
299        let name = index.records()[id].name();
300
301        self.0.get_index_of(name).or_else(|| {
302            index.records()[(id + 1)..]
303                .iter()
304                .find_map(|record| self.0.get_index_of(record.name()))
305        })
306    }
307}
308
309impl<V> From<&Index<V>> for SharedContigs
310where
311    V: Version,
312{
313    fn from(index: &Index<V>) -> Self {
314        index
315            .records()
316            .iter()
317            .enumerate()
318            .map(|(i, record)| (record.name().to_owned(), vec![i]))
319            .collect()
320    }
321}
322
323impl FromIterator<(String, Vec<usize>)> for SharedContigs {
324    fn from_iter<I>(iter: I) -> Self
325    where
326        I: IntoIterator<Item = (String, Vec<usize>)>,
327    {
328        Self(iter.into_iter().map(|(s, a)| (s, a)).collect())
329    }
330}