winsfs_core/
io.rs

1//! Utilities for working with SAF files stored on disk.
2//!
3//! To read and write standard SAF files, see the [`angsd_saf`] crate. This module contains
4//! utilities based on that for doing SFS estimation from files kept on disk.
5
6use std::{fs::File, io, path::Path};
7
8pub use angsd_saf::{
9    record::Id,
10    version::{Version, V3, V4},
11    ReadStatus,
12};
13
14use crate::saf::Site;
15
16mod adaptors;
17pub use adaptors::{Enumerate, Take};
18
19pub mod shuffle;
20
21/// A type that can read SAF sites from a source.
22pub trait ReadSite {
23    /// The type of site that can be read.
24    type Site;
25
26    /// Reads a single site into the provided buffer.
27    ///
28    /// In the multi-dimensional case, values should be read from the first population into the
29    /// start of the buffer, then the next population, and so on. That is, providing
30    /// [`Site::as_mut_slice`](crate::saf::Site::as_mut_slice) as a buffer will be correct, given
31    /// a site of correct shape for the underlying reader.
32    fn read_site(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus>;
33
34    /// Reads a single site into the provided buffer without normalising out of log-space.
35    ///
36    /// See also documentation for [`Self::read_site`].
37    fn read_site_unnormalised(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus>;
38
39    /// Returns a reader adaptor which counts the number of sites read.
40    fn enumerate(self) -> Enumerate<Self>
41    where
42        Self: Sized,
43    {
44        Enumerate::new(self)
45    }
46
47    /// Returns a reader adaptor which limits the number of sites read.
48    fn take(self, max_sites: usize) -> Take<Enumerate<Self>>
49    where
50        Self: Sized,
51    {
52        Take::new(Enumerate::new(self), max_sites)
53    }
54}
55
56/// A reader type that can return to the beginning of the data.
57pub trait Rewind: ReadSite {
58    /// Returns `true` if reader has reached the end of the data, `false` otherwise.
59    #[allow(clippy::wrong_self_convention)]
60    fn is_done(&mut self) -> io::Result<bool>;
61
62    /// Positions reader at the beginning of the data.
63    ///
64    /// The stream should be positioned so as to be ready to call [`ReadSite::read_site`].
65    /// In particular, the stream should be positioned past any magic number, headers, etc.
66    fn rewind(&mut self) -> io::Result<()>;
67}
68
69impl<'a, T> Rewind for &'a mut T
70where
71    T: Rewind,
72{
73    fn is_done(&mut self) -> io::Result<bool> {
74        <T as Rewind>::is_done(*self)
75    }
76
77    fn rewind(&mut self) -> io::Result<()> {
78        <T as Rewind>::rewind(*self)
79    }
80}
81
82impl<'a, T> ReadSite for &'a mut T
83where
84    T: ReadSite,
85{
86    type Site = T::Site;
87
88    fn read_site(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
89        <T as ReadSite>::read_site(*self, buf)
90    }
91
92    fn read_site_unnormalised(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
93        <T as ReadSite>::read_site_unnormalised(*self, buf)
94    }
95}
96
97/// An intersecting SAF reader.
98///
99/// This a wrapper around the [`Intersect`](angsd_saf::Intersect) with a static number of readers
100/// and holds its read buffers internally.
101pub struct Intersect<const D: usize, R, V>
102where
103    V: Version,
104{
105    // D readers in inner intersect is maintained as invariant
106    inner: angsd_saf::Intersect<R, V>,
107    bufs: [angsd_saf::Record<Id, V::Item>; D],
108}
109
110impl<const D: usize, R, V> Intersect<D, R, V>
111where
112    R: io::BufRead + io::Seek,
113    V: Version,
114{
115    /// Returns the inner reader.
116    pub fn get(&self) -> &angsd_saf::Intersect<R, V> {
117        &self.inner
118    }
119
120    /// Returns a mutable reference to the the inner reader.
121    pub fn get_mut(&mut self) -> &mut angsd_saf::Intersect<R, V> {
122        &mut self.inner
123    }
124
125    /// Returns the inner reader, consuming `self`.
126    pub fn into_inner(self) -> angsd_saf::Intersect<R, V> {
127        self.inner
128    }
129
130    /// Creates a new reader.
131    pub fn new(readers: [angsd_saf::Reader<R, V>; D]) -> Self {
132        let inner = angsd_saf::Intersect::new(readers.into());
133        let bufs = inner
134            .create_record_bufs()
135            .try_into()
136            .map_err(|_| ())
137            .unwrap();
138
139        Self { inner, bufs }
140    }
141}
142
143impl<const D: usize, V> Intersect<D, io::BufReader<File>, V>
144where
145    V: Version,
146{
147    /// Creates a new reader from a collection of member file paths.
148    ///
149    /// The stream will be positioned immediately after the magic number.
150    pub fn from_paths<P>(paths: &[P; D]) -> io::Result<Self>
151    where
152        P: AsRef<Path>,
153    {
154        paths
155            .iter()
156            .map(|p| angsd_saf::reader::Builder::<V>::default().build_from_member_path(p))
157            .collect::<io::Result<Vec<_>>>()
158            .map(|vec| Self::new(vec.try_into().map_err(|_| ()).unwrap()))
159    }
160}
161
162impl<const D: usize, R> ReadSite for Intersect<D, R, V3>
163where
164    R: io::BufRead + io::Seek,
165{
166    type Site = Site<D>;
167
168    fn read_site(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
169        let status = self.read_site_unnormalised(buf)?;
170
171        buf.iter_mut().for_each(|x| *x = x.exp());
172
173        Ok(status)
174    }
175
176    fn read_site_unnormalised(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
177        let status = self.inner.read_records(&mut self.bufs)?;
178
179        let src = self.bufs.iter().map(|record| record.item());
180        copy_from_slices(src, buf.as_mut_slice());
181
182        Ok(status)
183    }
184}
185
186impl<const D: usize, R> ReadSite for Intersect<D, R, V4>
187where
188    R: io::BufRead + io::Seek,
189{
190    // TODO: This should be fixed after implementing EM logic for banded sites;
191    // for now, we use it as a stop-gap to get V4 working
192    type Site = Site<D>;
193
194    fn read_site(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
195        let status = self.read_site_unnormalised(buf)?;
196
197        buf.iter_mut().for_each(|x| *x = x.exp());
198
199        Ok(status)
200    }
201
202    fn read_site_unnormalised(&mut self, buf: &mut Self::Site) -> io::Result<ReadStatus> {
203        let status = self.inner.read_records(&mut self.bufs)?;
204
205        let alleles_iter = self
206            .inner
207            .get_readers()
208            .iter()
209            .map(|reader| reader.index().alleles());
210        let src = self
211            .bufs
212            .iter_mut()
213            .zip(alleles_iter)
214            .map(|(record, alleles)| record.item().clone().into_full(alleles, f32::NEG_INFINITY));
215        copy_from_slices(src, buf.as_mut_slice());
216
217        Ok(status)
218    }
219}
220
221/// Copy multiple slices into successive subslices of a new slice.
222///
223/// `dest` is assumed to have length equal to the sum of the lengths of slice sin `src`.
224fn copy_from_slices<I, T>(src: I, dest: &mut [T])
225where
226    I: IntoIterator,
227    I::Item: AsRef<[T]>,
228    T: Copy,
229{
230    let mut offset = 0;
231    for s in src {
232        let n = s.as_ref().len();
233        dest[offset..][..n].copy_from_slice(s.as_ref());
234        offset += n;
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn test_copy_from_slices() {
244        let src = vec![&[0, 1][..], &[2, 3, 4, 5]];
245        let mut dest = vec![0; 6];
246        copy_from_slices(src, dest.as_mut_slice());
247        assert_eq!(dest, (0..6).collect::<Vec<_>>());
248    }
249}