bigtools/utils/file/
remote_file.rs

1use std::fs::File;
2use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
3
4use tempfile;
5
6use crate::utils::file::reopen::Reopen;
7
8const READ_SIZE: usize = 10 * 1024; // 10 KB chunks
9
10// Remote file reads are cached to a temporary file. The size of each block
11// (with the exception of the last block) is `READ_SIZE + 1` bytes. The first
12// byte of a block is `0` if the data hasn't been written yet, or `1` if it
13// has (and has enough data to fill the block). A value of `2` signifies that
14// there wasn't enough data to fill the block, which only should happen for the
15// last block.
16
17pub struct RemoteFile {
18    url: String,
19    current_position: u64,
20    current: Option<(u64, Cursor<Vec<u8>>)>,
21    cache: Option<File>,
22}
23
24impl RemoteFile {
25    pub fn new(url: &str) -> RemoteFile {
26        RemoteFile {
27            url: url.to_string(),
28            current_position: 0,
29            current: None,
30            cache: None,
31        }
32    }
33}
34
35impl RemoteFile {
36    fn read_current_block(&mut self, read_size: u64) -> io::Result<u64> {
37        let block = self.current_position / READ_SIZE as u64;
38        let block_start = block * READ_SIZE as u64;
39        let cache_block_start = block * (READ_SIZE as u64 + 1);
40        let cache = match self.cache.as_mut() {
41            None => {
42                self.cache = Some(tempfile::tempfile()?);
43                self.cache.as_mut().unwrap()
44            }
45            Some(cache) => cache,
46        };
47        use byteorder::ReadBytesExt;
48        use byteorder::WriteBytesExt;
49        cache.seek(SeekFrom::Start(cache_block_start))?;
50        let status = cache.read_u8().unwrap_or(0);
51        if status == 1 {
52            let mut bytes = vec![0u8; READ_SIZE];
53            cache.read_exact(&mut bytes)?;
54            self.current = Some((block_start, Cursor::new(bytes.to_vec())));
55            return Ok(READ_SIZE as u64);
56        } else if status == 2 {
57            let bytes_available = cache.read_u64::<byteorder::BigEndian>()?;
58            let mut bytes = vec![0u8; bytes_available as usize];
59            cache.read_exact(&mut bytes)?;
60            self.current = Some((block_start, Cursor::new(bytes.to_vec())));
61            return Ok(bytes_available);
62        }
63
64        let read_len = {
65            let cur_pos = self.current_position;
66            let block = cur_pos / (READ_SIZE as u64);
67            let block_start = block * (READ_SIZE as u64);
68            let blocks_to_read = (cur_pos - block_start + read_size - 1) / (READ_SIZE as u64) + 1;
69            blocks_to_read * (READ_SIZE as u64)
70        };
71
72        let resp = attohttpc::get(&self.url)
73            .header(
74                "range",
75                format!(
76                    "bytes={}-{}",
77                    block_start,
78                    block_start + read_len as u64 - 1
79                ),
80            )
81            .send()?;
82        let bytes = if resp.is_success() {
83            resp.bytes()?
84        } else {
85            return Err(io::Error::new(
86                io::ErrorKind::Other,
87                "Unable to connect to server to receive file.".to_string(),
88            ));
89        };
90        cache.seek(SeekFrom::Start(cache_block_start))?;
91        let blocks_to_write = if bytes.len() == read_len as usize {
92            bytes.len() / READ_SIZE
93        } else {
94            (bytes.len() + READ_SIZE - 1) / READ_SIZE
95        };
96        for start in 0..blocks_to_write {
97            let begin = start * READ_SIZE;
98            let end = ((start + 1) * READ_SIZE).min(bytes.len());
99            let block_data = &bytes[begin..end];
100            if block_data.len() == READ_SIZE {
101                cache.write_u8(1)?;
102            } else {
103                cache.write_u8(2)?;
104                cache.write_u64::<byteorder::BigEndian>(block_data.len() as u64)?;
105            }
106            cache.write_all(block_data)?;
107        }
108        let len = bytes.len() as u64;
109        self.current = Some((block_start, Cursor::new(bytes)));
110        Ok(len)
111    }
112}
113
114impl Read for RemoteFile {
115    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116        let mut remaining_buf = buf;
117        let mut total_read = 0;
118        loop {
119            // At this point there's a few cases to consider:
120            // 1) We have not read the current block + maybe extra into memory
121            // 2) We have read the complete current block (+ maybe extra) into
122            //    memory.
123            // 3) Whatever is left in the current memory is leftover from a
124            //    a previous read, but it's enough.
125            // 4) Whatever is left in the current memory is leftover from a
126            //    a previous read, and it's not enough.
127            let reset_cursor = |this: &mut Self| -> io::Result<u64> {
128                let cursor_start = (this.current_position / READ_SIZE as u64) * READ_SIZE as u64;
129                let in_block = this.current_position - cursor_start;
130                // If we not at the start of the block, then the length that we need
131                // is longer than the length of the buf itself, since we have to
132                // acount for how far into the block we are.
133                let bytes_available = this.read_current_block(remaining_buf.len() as u64)?;
134                // If we are not at the beginning of a block, then skip to where
135                // we need to be.
136                if in_block > 0 {
137                    this.current
138                        .as_mut()
139                        .unwrap()
140                        .1
141                        .seek(SeekFrom::Start(in_block))?;
142                }
143                Ok(bytes_available - in_block.min(bytes_available))
144            };
145            let bytes_available = match self.current.as_ref() {
146                None => reset_cursor(self)?,
147                Some((_, cursor)) => {
148                    let bytes_in_cursor = cursor.get_ref().len() as u64;
149                    let cursor_position = cursor.position();
150                    let bytes_available = bytes_in_cursor - cursor_position;
151                    // We don't have enough bytes in the cursor. Let's reload this
152                    // block just to ensure that we have the data loaded.
153                    if bytes_available < remaining_buf.len() as u64 {
154                        reset_cursor(self)?
155                    } else {
156                        bytes_available
157                    }
158                }
159            };
160            let read = self.current.as_mut().unwrap().1.read(remaining_buf)?;
161            self.current_position += read as u64;
162            total_read += read;
163            if read == 0 || read == remaining_buf.len() || read == bytes_available as usize {
164                break;
165            }
166            let cursor_start = (self.current_position / READ_SIZE as u64) * READ_SIZE as u64;
167            let in_block = self.current_position - cursor_start;
168            let remaining_in_block = READ_SIZE - in_block as usize;
169            // If we didn't read everything, we *must* have at least read until
170            // the end of the block
171            assert!(read >= remaining_in_block);
172            remaining_buf = &mut remaining_buf[read..];
173        }
174        Ok(total_read)
175    }
176}
177
178impl Seek for RemoteFile {
179    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
180        self.current_position = match pos {
181            SeekFrom::Start(s) => s,
182            SeekFrom::End(_) => unimplemented!(),
183            SeekFrom::Current(s) => {
184                if s >= 0 {
185                    self.current_position + (s as u64)
186                } else {
187                    if self.current_position < s.checked_neg().unwrap() as u64 {
188                        panic!("Seeked to <0");
189                    }
190                    self.current_position - s.checked_neg().unwrap() as u64
191                }
192            }
193        };
194        if let Some((cursor_start, cursor)) = self.current.as_mut() {
195            let cursor_end = *cursor_start + READ_SIZE as u64;
196            if *cursor_start <= self.current_position && self.current_position < cursor_end {
197                let new_position = self.current_position - *cursor_start;
198                cursor.set_position(new_position);
199                return Ok(self.current_position);
200            }
201            self.current = None;
202        }
203        Ok(self.current_position)
204    }
205}
206
207impl Clone for RemoteFile {
208    fn clone(&self) -> Self {
209        RemoteFile {
210            url: self.url.clone(),
211            current_position: 0,
212            current: None,
213            cache: None,
214        }
215    }
216}
217
218impl Reopen for RemoteFile {
219    fn reopen(&self) -> io::Result<RemoteFile> {
220        Ok(RemoteFile {
221            url: self.url.clone(),
222            current_position: 0,
223            current: None,
224            cache: None,
225        })
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::bbi::{BigBedRead, BigWigRead};
233
234    #[ignore]
235    #[test]
236    fn test_remote() {
237        let f = RemoteFile::new("https://encode-public.s3.amazonaws.com/2020/01/17/7d2573b1-86f4-4592-a68a-ac3d5d0372d6/ENCFF592UJG.bigBed");
238        let mut remote = BigBedRead::open(f).unwrap();
239
240        let remote_intervals: Vec<_> = remote
241            .get_interval("chr10", 100000000, 100010000)
242            .unwrap()
243            .collect::<Result<_, _>>()
244            .unwrap();
245        assert_eq!(remote_intervals.len(), 5);
246    }
247
248    #[ignore]
249    #[test]
250    fn test_remote2() {
251        let f = RemoteFile::new("http://hgdownload.soe.ucsc.edu/goldenPath/hg19/encodeDCC/wgEncodeMapability/wgEncodeCrgMapabilityAlign100mer.bigWig");
252        let mut remote = BigWigRead::open(f).unwrap();
253
254        let interval = remote.get_zoom_interval("chr17", 0, 36996442, 2048);
255        let _: Vec<_> = interval.unwrap().collect();
256    }
257
258    #[ignore]
259    #[test]
260    fn test_remote3() {
261        let f = RemoteFile::new("http://hgdownload.soe.ucsc.edu/goldenPath/hg19/encodeDCC/wgEncodeMapability/wgEncodeCrgMapabilityAlign100mer.bigWig");
262        let mut remote = BigWigRead::open(f).unwrap();
263
264        let interval = remote.get_zoom_interval("chr2", 46087592, 174087320, 32768);
265        let _: Vec<_> = interval.unwrap().collect();
266    }
267
268    #[ignore]
269    #[test]
270    fn test_remote4() {
271        let f = RemoteFile::new("https://proteinpaint.stjude.org/ppdemo/hg19/bigwig/temp.bw");
272        let remote = BigWigRead::open(f).unwrap();
273
274        let _: Vec<_> = remote
275            .get_interval_move("chr1", 169253475, 169257278)
276            .unwrap()
277            .collect();
278    }
279}