bigtools/utils/file/
remote_file.rs1use std::fs::File;
2use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
3
4use tempfile;
5
6use crate::utils::file::reopen::Reopen;
7
8const READ_SIZE: usize = 10 * 1024; pub struct RemoteFile {
18 url: String,
19 current_position: u64,
20 current: Option<(u64, Cursor<Vec<u8>>)>,
21 cache: Option<File>,
22}
23
24impl RemoteFile {
25 pub fn new(url: &str) -> RemoteFile {
26 RemoteFile {
27 url: url.to_string(),
28 current_position: 0,
29 current: None,
30 cache: None,
31 }
32 }
33}
34
35impl RemoteFile {
36 fn read_current_block(&mut self, read_size: u64) -> io::Result<u64> {
37 let block = self.current_position / READ_SIZE as u64;
38 let block_start = block * READ_SIZE as u64;
39 let cache_block_start = block * (READ_SIZE as u64 + 1);
40 let cache = match self.cache.as_mut() {
41 None => {
42 self.cache = Some(tempfile::tempfile()?);
43 self.cache.as_mut().unwrap()
44 }
45 Some(cache) => cache,
46 };
47 use byteorder::ReadBytesExt;
48 use byteorder::WriteBytesExt;
49 cache.seek(SeekFrom::Start(cache_block_start))?;
50 let status = cache.read_u8().unwrap_or(0);
51 if status == 1 {
52 let mut bytes = vec![0u8; READ_SIZE];
53 cache.read_exact(&mut bytes)?;
54 self.current = Some((block_start, Cursor::new(bytes.to_vec())));
55 return Ok(READ_SIZE as u64);
56 } else if status == 2 {
57 let bytes_available = cache.read_u64::<byteorder::BigEndian>()?;
58 let mut bytes = vec![0u8; bytes_available as usize];
59 cache.read_exact(&mut bytes)?;
60 self.current = Some((block_start, Cursor::new(bytes.to_vec())));
61 return Ok(bytes_available);
62 }
63
64 let read_len = {
65 let cur_pos = self.current_position;
66 let block = cur_pos / (READ_SIZE as u64);
67 let block_start = block * (READ_SIZE as u64);
68 let blocks_to_read = (cur_pos - block_start + read_size - 1) / (READ_SIZE as u64) + 1;
69 blocks_to_read * (READ_SIZE as u64)
70 };
71
72 let resp = attohttpc::get(&self.url)
73 .header(
74 "range",
75 format!(
76 "bytes={}-{}",
77 block_start,
78 block_start + read_len as u64 - 1
79 ),
80 )
81 .send()?;
82 let bytes = if resp.is_success() {
83 resp.bytes()?
84 } else {
85 return Err(io::Error::new(
86 io::ErrorKind::Other,
87 "Unable to connect to server to receive file.".to_string(),
88 ));
89 };
90 cache.seek(SeekFrom::Start(cache_block_start))?;
91 let blocks_to_write = if bytes.len() == read_len as usize {
92 bytes.len() / READ_SIZE
93 } else {
94 (bytes.len() + READ_SIZE - 1) / READ_SIZE
95 };
96 for start in 0..blocks_to_write {
97 let begin = start * READ_SIZE;
98 let end = ((start + 1) * READ_SIZE).min(bytes.len());
99 let block_data = &bytes[begin..end];
100 if block_data.len() == READ_SIZE {
101 cache.write_u8(1)?;
102 } else {
103 cache.write_u8(2)?;
104 cache.write_u64::<byteorder::BigEndian>(block_data.len() as u64)?;
105 }
106 cache.write_all(block_data)?;
107 }
108 let len = bytes.len() as u64;
109 self.current = Some((block_start, Cursor::new(bytes)));
110 Ok(len)
111 }
112}
113
114impl Read for RemoteFile {
115 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116 let mut remaining_buf = buf;
117 let mut total_read = 0;
118 loop {
119 let reset_cursor = |this: &mut Self| -> io::Result<u64> {
128 let cursor_start = (this.current_position / READ_SIZE as u64) * READ_SIZE as u64;
129 let in_block = this.current_position - cursor_start;
130 let bytes_available = this.read_current_block(remaining_buf.len() as u64)?;
134 if in_block > 0 {
137 this.current
138 .as_mut()
139 .unwrap()
140 .1
141 .seek(SeekFrom::Start(in_block))?;
142 }
143 Ok(bytes_available - in_block.min(bytes_available))
144 };
145 let bytes_available = match self.current.as_ref() {
146 None => reset_cursor(self)?,
147 Some((_, cursor)) => {
148 let bytes_in_cursor = cursor.get_ref().len() as u64;
149 let cursor_position = cursor.position();
150 let bytes_available = bytes_in_cursor - cursor_position;
151 if bytes_available < remaining_buf.len() as u64 {
154 reset_cursor(self)?
155 } else {
156 bytes_available
157 }
158 }
159 };
160 let read = self.current.as_mut().unwrap().1.read(remaining_buf)?;
161 self.current_position += read as u64;
162 total_read += read;
163 if read == 0 || read == remaining_buf.len() || read == bytes_available as usize {
164 break;
165 }
166 let cursor_start = (self.current_position / READ_SIZE as u64) * READ_SIZE as u64;
167 let in_block = self.current_position - cursor_start;
168 let remaining_in_block = READ_SIZE - in_block as usize;
169 assert!(read >= remaining_in_block);
172 remaining_buf = &mut remaining_buf[read..];
173 }
174 Ok(total_read)
175 }
176}
177
178impl Seek for RemoteFile {
179 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
180 self.current_position = match pos {
181 SeekFrom::Start(s) => s,
182 SeekFrom::End(_) => unimplemented!(),
183 SeekFrom::Current(s) => {
184 if s >= 0 {
185 self.current_position + (s as u64)
186 } else {
187 if self.current_position < s.checked_neg().unwrap() as u64 {
188 panic!("Seeked to <0");
189 }
190 self.current_position - s.checked_neg().unwrap() as u64
191 }
192 }
193 };
194 if let Some((cursor_start, cursor)) = self.current.as_mut() {
195 let cursor_end = *cursor_start + READ_SIZE as u64;
196 if *cursor_start <= self.current_position && self.current_position < cursor_end {
197 let new_position = self.current_position - *cursor_start;
198 cursor.set_position(new_position);
199 return Ok(self.current_position);
200 }
201 self.current = None;
202 }
203 Ok(self.current_position)
204 }
205}
206
207impl Clone for RemoteFile {
208 fn clone(&self) -> Self {
209 RemoteFile {
210 url: self.url.clone(),
211 current_position: 0,
212 current: None,
213 cache: None,
214 }
215 }
216}
217
218impl Reopen for RemoteFile {
219 fn reopen(&self) -> io::Result<RemoteFile> {
220 Ok(RemoteFile {
221 url: self.url.clone(),
222 current_position: 0,
223 current: None,
224 cache: None,
225 })
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::bbi::{BigBedRead, BigWigRead};
233
234 #[ignore]
235 #[test]
236 fn test_remote() {
237 let f = RemoteFile::new("https://encode-public.s3.amazonaws.com/2020/01/17/7d2573b1-86f4-4592-a68a-ac3d5d0372d6/ENCFF592UJG.bigBed");
238 let mut remote = BigBedRead::open(f).unwrap();
239
240 let remote_intervals: Vec<_> = remote
241 .get_interval("chr10", 100000000, 100010000)
242 .unwrap()
243 .collect::<Result<_, _>>()
244 .unwrap();
245 assert_eq!(remote_intervals.len(), 5);
246 }
247
248 #[ignore]
249 #[test]
250 fn test_remote2() {
251 let f = RemoteFile::new("http://hgdownload.soe.ucsc.edu/goldenPath/hg19/encodeDCC/wgEncodeMapability/wgEncodeCrgMapabilityAlign100mer.bigWig");
252 let mut remote = BigWigRead::open(f).unwrap();
253
254 let interval = remote.get_zoom_interval("chr17", 0, 36996442, 2048);
255 let _: Vec<_> = interval.unwrap().collect();
256 }
257
258 #[ignore]
259 #[test]
260 fn test_remote3() {
261 let f = RemoteFile::new("http://hgdownload.soe.ucsc.edu/goldenPath/hg19/encodeDCC/wgEncodeMapability/wgEncodeCrgMapabilityAlign100mer.bigWig");
262 let mut remote = BigWigRead::open(f).unwrap();
263
264 let interval = remote.get_zoom_interval("chr2", 46087592, 174087320, 32768);
265 let _: Vec<_> = interval.unwrap().collect();
266 }
267
268 #[ignore]
269 #[test]
270 fn test_remote4() {
271 let f = RemoteFile::new("https://proteinpaint.stjude.org/ppdemo/hg19/bigwig/temp.bw");
272 let remote = BigWigRead::open(f).unwrap();
273
274 let _: Vec<_> = remote
275 .get_interval_move("chr1", 169253475, 169257278)
276 .unwrap()
277 .collect();
278 }
279}