s3reader/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use aws_config::BehaviorVersion;
4use bytes::Buf;
5use std::io::{Read, Seek, SeekFrom};
6use thiserror::Error;
7use tokio::runtime::Runtime;
8
9/// Re-exported types from `aws_sdk_s3` and `aws_types`
10pub mod external_types;
11mod metadata;
12
13#[derive(Error, Debug)]
14pub enum S3ReaderError {
15    #[error("missing protocol in URI")]
16    MissingS3Protocol,
17    #[error("missing bucket or object in URI")]
18    MissingObjectUri,
19    #[error("object could not be fetched: {0}")]
20    ObjectNotFetched(String),
21    #[error("could not read from body of object")]
22    InvalidContent,
23    #[error("invalid read range {0}-{1}")]
24    InvalidRange(u64, u64),
25}
26
27impl From<external_types::SdkError<external_types::GetObjectError>> for S3ReaderError {
28    fn from(err: external_types::SdkError<external_types::GetObjectError>) -> S3ReaderError {
29        S3ReaderError::ObjectNotFetched(err.to_string())
30    }
31}
32
33impl From<S3ReaderError> for std::io::Error {
34    fn from(error: S3ReaderError) -> std::io::Error {
35        std::io::Error::new(std::io::ErrorKind::InvalidData, error)
36    }
37}
38
39/// The URI of an S3 object
40#[derive(Clone, Debug)]
41pub struct S3ObjectUri {
42    bucket: String,
43    key: String,
44}
45
46impl S3ObjectUri {
47    /// Returns an `S3ObjectUri` for the provided S3 URI
48    ///
49    /// # Example
50    ///
51    /// ```
52    /// use s3reader::S3ObjectUri;
53    /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
54    ///
55    /// assert_eq!(uri.bucket() , "mybucket");
56    /// assert_eq!(uri.key() , "path/to/file.xls");
57    /// ```
58    pub fn new(uri: &str) -> Result<S3ObjectUri, S3ReaderError> {
59        if let Some(uri) = uri.strip_prefix("s3://") {
60            if let Some(idx) = uri.find('/') {
61                Ok(S3ObjectUri {
62                    bucket: uri[0..idx].to_string(),
63                    key: uri[idx + 1..].to_string(),
64                })
65            } else {
66                Err(S3ReaderError::MissingObjectUri)
67            }
68        } else {
69            Err(S3ReaderError::MissingS3Protocol)
70        }
71    }
72
73    /// Returns the bucket name
74    /// # Example
75    ///
76    /// ```
77    /// use s3reader::S3ObjectUri;
78    /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
79    ///
80    /// assert_eq!(uri.bucket() , "mybucket");
81    /// ```
82    pub fn bucket(&self) -> &str {
83        &self.bucket
84    }
85
86    /// Returns the object's key
87    /// # Example
88    ///
89    /// ```
90    /// use s3reader::S3ObjectUri;
91    /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
92    ///
93    /// assert_eq!(uri.key() , "path/to/file.xls");
94    /// ```
95    pub fn key(&self) -> &str {
96        &self.key
97    }
98}
99
100/// A Reader for S3 objects that implements the `Read` and `Seek` traits
101///
102/// This reader allows byte-offset acces to any S3 objects
103///
104/// # Example
105/// ```no_run
106/// use std::io::{Read, Seek};
107/// use s3reader::S3Reader;
108/// use s3reader::S3ObjectUri;
109///
110/// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
111/// let mut reader = S3Reader::open(uri).unwrap();
112///
113/// reader.seek(std::io::SeekFrom::Start(100)).unwrap();
114///
115/// let mut buf: Vec<u8> = [0; 1024].to_vec();
116/// reader.read(&mut buf).expect("Error reading from S3");
117/// ```
118pub struct S3Reader {
119    client: aws_sdk_s3::Client,
120    uri: S3ObjectUri,
121    pos: u64,
122    header: Option<external_types::HeadObjectOutput>,
123}
124
125impl S3Reader {
126    /// Creates a new `S3Reader` and checks for presence of the S3 object
127    ///
128    /// This is the easiest method to open an S3Reader. Upon creation, it will check if the
129    /// S3 object is actually present and available and will fetch the header. This prevents
130    /// possible runtime errors later on.
131    pub fn from_uri(uri: &str) -> Result<S3Reader, S3ReaderError> {
132        let uri = S3ObjectUri::new(uri)?;
133        S3Reader::open(uri)
134    }
135
136    /// Creates a new `S3Reader`.
137    ///
138    /// This method does not check for presence of an actual object in S3 or for connectivity.
139    /// Use [`S3Reader::open`] instead to ensure that the S3 object actually exists.
140    pub fn new(uri: S3ObjectUri) -> S3Reader {
141        let config = Runtime::new()
142            .unwrap()
143            .block_on(aws_config::load_defaults(BehaviorVersion::latest()));
144        S3Reader::from_config(&config, uri)
145    }
146
147    /// Creates a new `S3Reader` and checks for presence of the S3 object
148    ///
149    /// This method is the preferred way to create a Reader. It has a minor overhead
150    /// because it fetches the object's header from S3, but this ensures that the
151    /// object is actually available and thus prevents possible runtime errors.
152    pub fn open(uri: S3ObjectUri) -> Result<S3Reader, S3ReaderError> {
153        let mut reader = S3Reader::new(uri);
154        match Runtime::new().unwrap().block_on(reader.fetch_header()) {
155            Err(err) => Err(S3ReaderError::ObjectNotFetched(err.to_string())),
156            Ok(_) => Ok(reader),
157        }
158    }
159
160    /// Creates a new `S3Reader` with a custom AWS S3 `aws_sdk_s3::Config`
161    ///
162    /// This method is useful if you don't want to use the default configbuilder using the environment.
163    /// It does not check for correctness, connectivity to the S3 bucket or presence of the S3 object.
164    pub fn from_s3_config(config: aws_sdk_s3::Config, uri: S3ObjectUri) -> S3Reader {
165        let client = aws_sdk_s3::Client::from_conf(config);
166        S3Reader {
167            client,
168            uri,
169            pos: 0,
170            header: None,
171        }
172    }
173
174    /// Creates a new `S3Reader` with a custom AWS `SdkConfig`
175    ///
176    /// This method is useful if you don't want to use the default configbuilder using the environment.
177    /// It does not check for correctness, connectivity to the S3 bucket or presence of the S3 object.
178    pub fn from_config(config: &external_types::SdkConfig, uri: S3ObjectUri) -> S3Reader {
179        let client = aws_sdk_s3::Client::new(config);
180        S3Reader {
181            client,
182            uri,
183            pos: 0,
184            header: None,
185        }
186    }
187
188    /// Returns A Future for the bytes read from the S3 object for the specified byte-range
189    ///
190    /// This method does not update the internal cursor position. To maintain
191    /// an internal state, use [`S3Reader::seek`] and [`S3Reader::read`] instead.
192    ///
193    /// The byte ranges `from` and `to` are both inclusive, see <https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35>
194    ///
195    /// # Example
196    /// ```no_run
197    /// use tokio::runtime::Runtime;
198    ///
199    /// use s3reader::S3Reader;
200    /// use s3reader::S3ObjectUri;
201    ///
202    /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
203    /// let mut reader = S3Reader::open(uri).unwrap();
204    ///
205    /// // `read_range` is an async function, we must wrap it in a runtime in the doctest
206    /// let bytes = Runtime::new().unwrap().block_on(
207    ///     reader.read_range(100, 249)
208    /// ).unwrap().into_bytes();
209    /// assert_eq!(bytes.len(), 150);
210    /// ```
211    pub async fn read_range(
212        &mut self,
213        from: u64,
214        to: u64,
215    ) -> Result<external_types::AggregatedBytes, S3ReaderError> {
216        if to < from || from > self.len() {
217            return Err(S3ReaderError::InvalidRange(from, to));
218        }
219        let object_output = self
220            .client
221            .get_object()
222            .bucket(self.uri.bucket())
223            .key(self.uri.key())
224            .range(format!("bytes={}-{}", from, to))
225            .send()
226            .await?;
227
228        match object_output.body.collect().await {
229            Ok(x) => Ok(x),
230            Err(_) => Err(S3ReaderError::InvalidContent),
231        }
232    }
233
234    /// Returns the bytes read from the S3 object for the specified byte-range
235    ///
236    /// This method does not update the internal cursor position. To maintain
237    /// an internal state, use [`S3Reader::seek`] and [`S3Reader::read`] instead.
238    ///
239    /// The byte ranges `from` and `to` are both inclusive, see <https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35>
240    ///
241    /// This method also exists as an `async` method: [`S3Reader::read_range`]
242    ///
243    /// # Example
244    /// ```no_run
245    /// use s3reader::S3Reader;
246    /// use s3reader::S3ObjectUri;
247    ///
248    /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
249    /// let mut reader = S3Reader::open(uri).unwrap();
250    ///
251    /// let bytes = reader.read_range_sync(100, 249).unwrap().into_bytes();
252    /// assert_eq!(bytes.len(), 150);
253    /// ```
254    pub fn read_range_sync(
255        &mut self,
256        from: u64,
257        to: u64,
258    ) -> Result<external_types::AggregatedBytes, S3ReaderError> {
259        Runtime::new().unwrap().block_on(self.read_range(from, to))
260    }
261
262    /// Fetches the object's header from S3
263    ///
264    /// # Example
265    /// ```no_run
266    /// use tokio::runtime::Runtime;
267    ///
268    /// use s3reader::S3Reader;
269    /// use s3reader::S3ObjectUri;
270    ///
271    /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
272    /// let mut reader = S3Reader::open(uri).unwrap();
273    ///
274    /// // `fetch_header` is an async function, we must wrap it in a runtime in the doctest
275    /// Runtime::new().unwrap().block_on(
276    ///     reader.fetch_header()
277    /// ).unwrap();
278    /// assert_eq!(reader.len(), 150);
279    /// ```
280    pub async fn fetch_header(
281        &mut self,
282    ) -> Result<(), external_types::SdkError<external_types::HeadObjectError>> {
283        let header = self
284            .client
285            .head_object()
286            .bucket(self.uri.bucket())
287            .key(self.uri.key())
288            .send()
289            .await?;
290        self.header = Some(header);
291        Ok(())
292    }
293
294    /// Returns the `content_length` of the S3 object
295    ///
296    /// # Panics
297    /// This method can panic if the header cannot be fetched (e.g. due to network issues, wrong URI etc).
298    /// This can be prevented by using [`S3Reader::open`] which guarantees that the header is present.
299    #[allow(clippy::len_without_is_empty)]
300    pub fn len(&mut self) -> u64 {
301        if let Some(header) = &self.header {
302            u64::try_from(header.content_length().unwrap()).unwrap()
303        } else {
304            Runtime::new()
305                .unwrap()
306                .block_on(self.fetch_header())
307                .expect("unable to determine the object size");
308            self.len()
309        }
310    }
311
312    pub fn pos(&self) -> u64 {
313        self.pos
314    }
315}
316
317impl Read for S3Reader {
318    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
319        if self.pos >= self.len() {
320            return Ok(0);
321        }
322        let end_pos = self.pos + buf.len() as u64;
323
324        // The `read_range` method uses inclusive byte ranges, we exclude the last byte
325        let s3_data = self.read_range_sync(self.pos, end_pos - 1)?;
326
327        // Ensure that the position cursor is only increased by the number of actually read bytes
328        self.pos += u64::try_from(s3_data.remaining()).unwrap();
329
330        // Use the Reader provided by `AggregatedBytes` instead of converting manually
331        let mut reader = s3_data.reader();
332        reader.read(buf)
333    }
334
335    /// Custom implementation to avoid too many `read` calls. The default trait
336    /// reads in 32 bytes blocks that grow over time. However, the IO for S3 has way
337    /// more latency so `S3Reader` tries to fetch all data in a single call
338    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize, std::io::Error> {
339        let reader_len = self.len();
340
341        // The `read_range` method uses inclusive byte ranges, we exclude the last byte
342        let s3_data = self.read_range_sync(self.pos, reader_len - 1)?;
343
344        // Ensure that the position cursor is only increased by the number of actually read bytes
345        let data_len = s3_data.remaining();
346        self.pos += u64::try_from(data_len).unwrap();
347
348        // We can't rely on the `AggregatedBytes` reader and must iterate the internal bytes buffer
349        // to push individual bytes into the buffer
350        buf.reserve(data_len);
351        for b in s3_data.into_bytes() {
352            buf.push(b);
353        }
354        Ok(data_len)
355    }
356
357    /// Custom implementation to avoid too many `read` calls. The default trait
358    /// reads in 32 bytes blocks that grow over time. However, the IO for S3 has way
359    /// more latency so `S3Reader` tries to fetch all data in a single call
360    fn read_to_string(&mut self, buf: &mut String) -> Result<usize, std::io::Error> {
361        // Allocate a new vector to utilize `read_to_end`. We don't have to specify the size here
362        // since `read_to_end` will extend the vector to the required capacity
363        let mut bytes = Vec::new();
364        match self.read_to_end(&mut bytes) {
365            Ok(n) => {
366                buf.reserve(n);
367                for byte in bytes {
368                    buf.push(byte.into());
369                }
370                Ok(n)
371            }
372            Err(err) => Err(err),
373        }
374    }
375}
376
377impl Seek for S3Reader {
378    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
379        match s3reader_seek(self.len(), self.pos, pos) {
380            Ok(x) => {
381                self.pos = x;
382                Ok(x)
383            }
384            Err(err) => Err(err),
385        }
386    }
387}
388
389/// Calculates the new cursor for a `Seek` operation
390///
391/// This function is declared outside of `S3Reader` so that it can be
392/// unit-tested.
393fn s3reader_seek(len: u64, cursor: u64, pos: SeekFrom) -> Result<u64, std::io::Error> {
394    match pos {
395        SeekFrom::Start(x) => Ok(std::cmp::min(x, len)),
396        SeekFrom::Current(x) => match x >= 0 {
397            true => {
398                // we can safely cast this to u64, positive i64 will always be smaller and never be truncated
399                let x = x as u64;
400
401                // we can't seek beyond the end of the file
402                Ok(std::cmp::min(cursor + x, len))
403            }
404            false => {
405                // we can safely cast this to u64, since abs i64 will always be smaller than u64
406                let x = x.unsigned_abs();
407                if x > cursor {
408                    return Err(std::io::Error::new(
409                        std::io::ErrorKind::Other,
410                        "position cannot be negative",
411                    ));
412                }
413                Ok(cursor - x)
414            }
415        },
416        SeekFrom::End(x) => {
417            if x >= 0 {
418                // we can't seek beyond the end of the file
419                return Ok(len);
420            }
421            let x = x.unsigned_abs();
422            if x > len {
423                return Err(std::io::Error::new(
424                    std::io::ErrorKind::Other,
425                    "position cannot be negative",
426                ));
427            };
428            Ok(len - x)
429        }
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_absolute_position() {
439        assert_eq!(
440            s3reader_seek(100, 1, std::io::SeekFrom::Start(30)).unwrap(),
441            30
442        );
443        assert_eq!(
444            s3reader_seek(100, 1, std::io::SeekFrom::Start(0)).unwrap(),
445            0
446        );
447        assert_eq!(
448            s3reader_seek(100, 1, std::io::SeekFrom::Start(100)).unwrap(),
449            100
450        );
451        assert_eq!(
452            s3reader_seek(100, 1, std::io::SeekFrom::Start(120)).unwrap(),
453            100
454        );
455    }
456
457    #[test]
458    fn test_relative_position() {
459        assert_eq!(
460            s3reader_seek(100, 1, std::io::SeekFrom::Current(30)).unwrap(),
461            31
462        );
463        assert_eq!(
464            s3reader_seek(100, 1, std::io::SeekFrom::Current(99)).unwrap(),
465            100
466        );
467        assert_eq!(
468            s3reader_seek(100, 1, std::io::SeekFrom::Current(0)).unwrap(),
469            1
470        );
471        assert_eq!(
472            s3reader_seek(100, 1, std::io::SeekFrom::Current(-1)).unwrap(),
473            0
474        );
475        assert_eq!(
476            s3reader_seek(100, 0, std::io::SeekFrom::Current(0)).unwrap(),
477            0
478        );
479        assert_eq!(
480            s3reader_seek(100, 0, std::io::SeekFrom::Current(1)).unwrap(),
481            1
482        );
483        assert_eq!(
484            s3reader_seek(100, 1, std::io::SeekFrom::Current(100)).unwrap(),
485            100
486        );
487        assert!(s3reader_seek(100, 1, std::io::SeekFrom::Current(-2)).is_err());
488    }
489
490    #[test]
491    fn test_seek_from_end() {
492        assert_eq!(
493            s3reader_seek(100, 1, std::io::SeekFrom::End(1)).unwrap(),
494            100
495        );
496        assert_eq!(
497            s3reader_seek(100, 1, std::io::SeekFrom::End(0)).unwrap(),
498            100
499        );
500        assert_eq!(
501            s3reader_seek(100, 1, std::io::SeekFrom::End(-100)).unwrap(),
502            0
503        );
504        assert_eq!(
505            s3reader_seek(100, 1, std::io::SeekFrom::End(-50)).unwrap(),
506            50
507        );
508        assert!(s3reader_seek(100, 1, std::io::SeekFrom::End(-101)).is_err());
509    }
510
511    #[test]
512    fn test_uri_parser() {
513        let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
514        assert_eq!(uri.bucket() , "mybucket");
515        assert_eq!(uri.key() , "path/to/file.xls");
516    }
517
518    #[test]
519    fn test_uri_without_protocol() {
520        assert!(S3ObjectUri::new("mybucket/path/to/file.xls").is_err());
521    }
522
523    #[test]
524    fn test_uri_with_wrong_protocol() {
525        assert!(S3ObjectUri::new("s5://mybucket/path/to/file.xls").is_err());
526        assert!(S3ObjectUri::new("s3//mybucket/path/to/file.xls").is_err());
527        assert!(S3ObjectUri::new("s3:/mybucket/path/to/file.xls").is_err());
528    }
529
530    #[test]
531    fn test_uri_with_missing_bucket() {
532        assert!(S3ObjectUri::new("s3://").is_err());
533        assert!(S3ObjectUri::new("s3://foobar").is_err());
534    }
535
536    #[test]
537    fn test_valid_uris() {
538        assert!(S3ObjectUri::new("s3://foobar/somethinglong").is_ok());
539        assert!(S3ObjectUri::new("s3://f/5").is_ok());
540        assert!(S3ObjectUri::new("s3://foobar/s/o/m/e/t/h/i/n/g/l/o/n/g").is_ok());
541    }
542}