Skip to main content

fastx/
remote.rs

1//! Remote file reader with HTTP range request support and block-level caching.
2//!
3//! This module provides `RemoteReader` which implements `Read` and `Seek`
4//! for HTTP/HTTPS URLs using range requests, with intelligent caching to
5//! minimize network requests.
6
7#![cfg(feature = "url")]
8
9use std::collections::HashMap;
10use std::io::{self, Read, Seek, SeekFrom};
11use std::sync::Mutex;
12use ureq::Agent;
13
14/// Default block size for caching (64KB).
15const DEFAULT_BLOCK_SIZE: u64 = 64 * 1024;
16
17/// A cached block of data from the remote file.
18#[derive(Debug, Clone)]
19struct CachedBlock
20{
21    /// Starting offset of this block in the file
22    #[allow(dead_code)]
23    offset: u64,
24    /// The cached data
25    data: Vec<u8>,
26}
27
28/// A remote file reader with HTTP range request support and caching.
29///
30/// This reader fetches data from HTTP/HTTPS URLs on demand, caching blocks
31/// to minimize network traffic. It implements `Read` and `Seek` for random
32/// access to remote files.
33///
34/// # Caching
35///
36/// The reader caches 64KB blocks. When data is requested, it fetches the
37/// entire block containing that position, serving subsequent reads from
38/// the same range from the cache.
39///
40/// # Example
41///
42/// ```no_run
43/// use fastx::remote::RemoteReader;
44///
45/// let reader = RemoteReader::new("https://example.com/data.fasta.gz").unwrap();
46/// ```
47pub struct RemoteReader
48{
49    /// The base URL
50    url: String,
51    /// The HTTP agent for making requests
52    agent: Agent,
53    /// Cache of fetched blocks (offset -> data)
54    cache: Mutex<HashMap<u64, CachedBlock>>,
55    /// Current position in the file
56    pos: u64,
57    /// Total file size (cached after first request)
58    file_size: Option<u64>,
59    /// Block size for caching
60    block_size: u64,
61}
62
63impl RemoteReader
64{
65    /// Create a new remote reader for the given URL.
66    ///
67    /// # Arguments
68    ///
69    /// * `url` - The HTTP/HTTPS URL to read from
70    ///
71    /// # Returns
72    ///
73    /// * `Ok(reader)` - The remote reader ready for use
74    /// * `Err(io::Error)` - If the URL is invalid or the request fails
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use fastx::remote::RemoteReader;
80    ///
81    /// let reader = RemoteReader::new("https://example.com/data.fasta.gz").unwrap();
82    /// ```
83    pub fn new(url: impl Into<String>) -> io::Result<Self>
84    {
85        let url = url.into();
86        let agent = Agent::new_with_defaults();
87
88        // Probe for file size using a HEAD request
89        let file_size = Self::get_file_size_for_url(&agent, &url)?;
90
91        Ok(Self {
92            url,
93            agent,
94            cache: Mutex::new(HashMap::new()),
95            pos: 0,
96            file_size: Some(file_size),
97            block_size: DEFAULT_BLOCK_SIZE,
98        })
99    }
100
101    /// Get the total file size for a URL (static helper).
102    fn get_file_size_for_url(agent: &Agent, url: &str) -> io::Result<u64>
103    {
104        let response = agent.head(url).call().map_err(|e| {
105            io::Error::new(
106                io::ErrorKind::ConnectionRefused,
107                format!("HTTP HEAD request failed: {}", e),
108            )
109        })?;
110
111        let content_length = response
112            .headers()
113            .get("Content-Length")
114            .and_then(|v| v.to_str().ok())
115            .and_then(|s| s.parse::<u64>().ok())
116            .ok_or_else(|| {
117                io::Error::new(
118                    io::ErrorKind::InvalidData,
119                    "Missing or invalid Content-Length header",
120                )
121            })?;
122
123        Ok(content_length)
124    }
125
126    /// Set the block size for caching.
127    ///
128    /// Larger blocks reduce the number of HTTP requests but use more memory.
129    ///
130    /// # Arguments
131    ///
132    /// * `size` - Block size in bytes
133    pub fn with_block_size(mut self, size: u64) -> Self
134    {
135        self.block_size = size;
136        self
137    }
138
139    /// Get the total file size.
140    ///
141    /// Makes a HEAD request to determine Content-Length if not already cached.
142    fn get_file_size(&self) -> io::Result<u64>
143    {
144        if let Some(size) = self.file_size
145        {
146            return Ok(size);
147        }
148
149        let response = self.agent.head(&self.url).call().map_err(|e| {
150            io::Error::new(
151                io::ErrorKind::ConnectionRefused,
152                format!("HTTP HEAD request failed: {}", e),
153            )
154        })?;
155
156        let content_length = response
157            .headers()
158            .get("Content-Length")
159            .and_then(|v| v.to_str().ok())
160            .and_then(|s| s.parse::<u64>().ok())
161            .ok_or_else(|| {
162                io::Error::new(
163                    io::ErrorKind::InvalidData,
164                    "Missing or invalid Content-Length header",
165                )
166            })?;
167
168        Ok(content_length)
169    }
170
171    /// Get the starting offset of the block containing a given position.
172    fn block_start(&self, pos: u64) -> u64
173    {
174        (pos / self.block_size) * self.block_size
175    }
176
177    /// Fetch a block from the remote server.
178    ///
179    /// # Arguments
180    ///
181    /// * `offset` - Starting offset of the block
182    fn fetch_block(&self, offset: u64) -> io::Result<CachedBlock>
183    {
184        let file_size = self.get_file_size()?;
185        let end = std::cmp::min(offset + self.block_size - 1, file_size.saturating_sub(1));
186
187        let range = if offset >= file_size
188        {
189            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Seek beyond end of file"));
190        }
191        else if end < offset
192        {
193            // Empty file or offset at end
194            format!("bytes={0}-", offset)
195        }
196        else
197        {
198            format!("bytes={}-{}", offset, end)
199        };
200
201        let response = self
202            .agent
203            .get(&self.url)
204            .header("Range", &range)
205            .call()
206            .map_err(|e| {
207                io::Error::new(
208                    io::ErrorKind::ConnectionRefused,
209                    format!("HTTP GET request failed: {}", e),
210                )
211            })?;
212
213        // Check for partial content or OK status
214        let status = response.status();
215        if status != 206 && status != 200
216        {
217            return Err(io::Error::new(
218                io::ErrorKind::InvalidData,
219                format!("Unexpected HTTP status: {}", status),
220            ));
221        }
222
223        let data = response.into_body().read_to_vec().map_err(|e| {
224            io::Error::new(
225                io::ErrorKind::ConnectionRefused,
226                format!("Failed to read response body: {}", e),
227            )
228        })?;
229
230        Ok(CachedBlock { offset, data })
231    }
232
233    /// Get data at a specific offset, using cache if available.
234    ///
235    /// # Arguments
236    ///
237    /// * `offset` - Position in the file to read from
238    ///
239    /// # Returns
240    ///
241    /// A slice containing the cached block data
242    fn get_data_at(&self, offset: u64) -> io::Result<Vec<u8>>
243    {
244        let block_start = self.block_start(offset);
245
246        // Check if we need to fetch the block
247        if !self
248            .cache
249            .lock()
250            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?
251            .contains_key(&block_start)
252        {
253            // Fetch the block
254            let block = self.fetch_block(block_start)?;
255            let mut cache = self
256                .cache
257                .lock()
258                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?;
259            cache.insert(block_start, block);
260        }
261
262        // Get the data from cache
263        let cache = self
264            .cache
265            .lock()
266            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?;
267        let block = cache.get(&block_start).unwrap();
268        let offset_in_block = (offset - block_start) as usize;
269        Ok(block.data[offset_in_block..].to_vec())
270    }
271}
272
273impl Read for RemoteReader
274{
275    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>
276    {
277        let file_size = self.get_file_size()?;
278        if self.pos >= file_size
279        {
280            return Ok(0);
281        }
282
283        let remaining = file_size - self.pos;
284        let to_read = std::cmp::min(buf.len() as u64, remaining) as usize;
285
286        // Fetch data for current position
287        let data = self.get_data_at(self.pos)?;
288
289        let actual_read = std::cmp::min(to_read, data.len());
290        buf[..actual_read].copy_from_slice(&data[..actual_read]);
291        self.pos += actual_read as u64;
292
293        Ok(actual_read)
294    }
295}
296
297impl Seek for RemoteReader
298{
299    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64>
300    {
301        let file_size = self.get_file_size().ok();
302
303        self.pos = match pos
304        {
305            SeekFrom::Start(n) => n,
306            SeekFrom::End(offset) =>
307            {
308                let size = file_size
309                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Unknown file size"))?;
310                let offset_i64 = offset as i64;
311                if offset_i64 < 0
312                {
313                    size.checked_sub(offset_i64.unsigned_abs()).ok_or_else(|| {
314                        io::Error::new(io::ErrorKind::InvalidInput, "Seek before file start")
315                    })?
316                }
317                else
318                {
319                    size.checked_add(offset as u64).ok_or_else(|| {
320                        io::Error::new(io::ErrorKind::InvalidInput, "Seek overflow")
321                    })?
322                }
323            }
324            SeekFrom::Current(offset) =>
325            {
326                let offset_i64 = offset as i64;
327                if offset_i64 < 0
328                {
329                    self.pos
330                        .checked_sub(offset_i64.unsigned_abs())
331                        .ok_or_else(|| {
332                            io::Error::new(io::ErrorKind::InvalidInput, "Seek before file start")
333                        })?
334                }
335                else
336                {
337                    self.pos.checked_add(offset as u64).ok_or_else(|| {
338                        io::Error::new(io::ErrorKind::InvalidInput, "Seek overflow")
339                    })?
340                }
341            }
342        };
343
344        Ok(self.pos)
345    }
346}
347
348#[cfg(test)]
349mod tests
350{
351    use super::*;
352
353    #[test]
354    fn test_block_start()
355    {
356        // Test block_start calculation without making HTTP requests
357        let url = "http://example.com/test";
358        let agent = Agent::new_with_defaults();
359
360        // Create a reader without probing file size
361        let reader = RemoteReader {
362            url: url.to_string(),
363            agent,
364            cache: Mutex::new(HashMap::new()),
365            pos: 0,
366            file_size: None,
367            block_size: DEFAULT_BLOCK_SIZE,
368        };
369
370        assert_eq!(reader.block_start(0), 0);
371        assert_eq!(reader.block_start(100), 0);
372        assert_eq!(reader.block_start(65536), 65536);
373        assert_eq!(reader.block_start(70000), 65536);
374        assert_eq!(reader.block_start(131072), 131072);
375    }
376
377    #[test]
378    fn test_block_start_custom_size()
379    {
380        let url = "http://example.com/test";
381        let agent = Agent::new_with_defaults();
382
383        let reader = RemoteReader {
384            url: url.to_string(),
385            agent,
386            cache: Mutex::new(HashMap::new()),
387            pos: 0,
388            file_size: None,
389            block_size: 1024,
390        };
391
392        assert_eq!(reader.block_start(0), 0);
393        assert_eq!(reader.block_start(100), 0);
394        assert_eq!(reader.block_start(1024), 1024);
395        assert_eq!(reader.block_start(2000), 1024);
396    }
397}