s3reader/lib.rs
1#![doc = include_str!("../README.md")]
2
3use aws_config::BehaviorVersion;
4use bytes::Buf;
5use std::io::{Read, Seek, SeekFrom};
6use thiserror::Error;
7use tokio::runtime::Runtime;
8
9/// Re-exported types from `aws_sdk_s3` and `aws_types`
10pub mod external_types;
11mod metadata;
12
13#[derive(Error, Debug)]
14pub enum S3ReaderError {
15 #[error("missing protocol in URI")]
16 MissingS3Protocol,
17 #[error("missing bucket or object in URI")]
18 MissingObjectUri,
19 #[error("object could not be fetched: {0}")]
20 ObjectNotFetched(String),
21 #[error("could not read from body of object")]
22 InvalidContent,
23 #[error("invalid read range {0}-{1}")]
24 InvalidRange(u64, u64),
25}
26
27impl From<external_types::SdkError<external_types::GetObjectError>> for S3ReaderError {
28 fn from(err: external_types::SdkError<external_types::GetObjectError>) -> S3ReaderError {
29 S3ReaderError::ObjectNotFetched(err.to_string())
30 }
31}
32
33impl From<S3ReaderError> for std::io::Error {
34 fn from(error: S3ReaderError) -> std::io::Error {
35 std::io::Error::new(std::io::ErrorKind::InvalidData, error)
36 }
37}
38
39/// The URI of an S3 object
40#[derive(Clone, Debug)]
41pub struct S3ObjectUri {
42 bucket: String,
43 key: String,
44}
45
46impl S3ObjectUri {
47 /// Returns an `S3ObjectUri` for the provided S3 URI
48 ///
49 /// # Example
50 ///
51 /// ```
52 /// use s3reader::S3ObjectUri;
53 /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
54 ///
55 /// assert_eq!(uri.bucket() , "mybucket");
56 /// assert_eq!(uri.key() , "path/to/file.xls");
57 /// ```
58 pub fn new(uri: &str) -> Result<S3ObjectUri, S3ReaderError> {
59 if let Some(uri) = uri.strip_prefix("s3://") {
60 if let Some(idx) = uri.find('/') {
61 Ok(S3ObjectUri {
62 bucket: uri[0..idx].to_string(),
63 key: uri[idx + 1..].to_string(),
64 })
65 } else {
66 Err(S3ReaderError::MissingObjectUri)
67 }
68 } else {
69 Err(S3ReaderError::MissingS3Protocol)
70 }
71 }
72
73 /// Returns the bucket name
74 /// # Example
75 ///
76 /// ```
77 /// use s3reader::S3ObjectUri;
78 /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
79 ///
80 /// assert_eq!(uri.bucket() , "mybucket");
81 /// ```
82 pub fn bucket(&self) -> &str {
83 &self.bucket
84 }
85
86 /// Returns the object's key
87 /// # Example
88 ///
89 /// ```
90 /// use s3reader::S3ObjectUri;
91 /// let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
92 ///
93 /// assert_eq!(uri.key() , "path/to/file.xls");
94 /// ```
95 pub fn key(&self) -> &str {
96 &self.key
97 }
98}
99
100/// A Reader for S3 objects that implements the `Read` and `Seek` traits
101///
102/// This reader allows byte-offset acces to any S3 objects
103///
104/// # Example
105/// ```no_run
106/// use std::io::{Read, Seek};
107/// use s3reader::S3Reader;
108/// use s3reader::S3ObjectUri;
109///
110/// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
111/// let mut reader = S3Reader::open(uri).unwrap();
112///
113/// reader.seek(std::io::SeekFrom::Start(100)).unwrap();
114///
115/// let mut buf: Vec<u8> = [0; 1024].to_vec();
116/// reader.read(&mut buf).expect("Error reading from S3");
117/// ```
118pub struct S3Reader {
119 client: aws_sdk_s3::Client,
120 uri: S3ObjectUri,
121 pos: u64,
122 header: Option<external_types::HeadObjectOutput>,
123}
124
125impl S3Reader {
126 /// Creates a new `S3Reader` and checks for presence of the S3 object
127 ///
128 /// This is the easiest method to open an S3Reader. Upon creation, it will check if the
129 /// S3 object is actually present and available and will fetch the header. This prevents
130 /// possible runtime errors later on.
131 pub fn from_uri(uri: &str) -> Result<S3Reader, S3ReaderError> {
132 let uri = S3ObjectUri::new(uri)?;
133 S3Reader::open(uri)
134 }
135
136 /// Creates a new `S3Reader`.
137 ///
138 /// This method does not check for presence of an actual object in S3 or for connectivity.
139 /// Use [`S3Reader::open`] instead to ensure that the S3 object actually exists.
140 pub fn new(uri: S3ObjectUri) -> S3Reader {
141 let config = Runtime::new()
142 .unwrap()
143 .block_on(aws_config::load_defaults(BehaviorVersion::latest()));
144 S3Reader::from_config(&config, uri)
145 }
146
147 /// Creates a new `S3Reader` and checks for presence of the S3 object
148 ///
149 /// This method is the preferred way to create a Reader. It has a minor overhead
150 /// because it fetches the object's header from S3, but this ensures that the
151 /// object is actually available and thus prevents possible runtime errors.
152 pub fn open(uri: S3ObjectUri) -> Result<S3Reader, S3ReaderError> {
153 let mut reader = S3Reader::new(uri);
154 match Runtime::new().unwrap().block_on(reader.fetch_header()) {
155 Err(err) => Err(S3ReaderError::ObjectNotFetched(err.to_string())),
156 Ok(_) => Ok(reader),
157 }
158 }
159
160 /// Creates a new `S3Reader` with a custom AWS S3 `aws_sdk_s3::Config`
161 ///
162 /// This method is useful if you don't want to use the default configbuilder using the environment.
163 /// It does not check for correctness, connectivity to the S3 bucket or presence of the S3 object.
164 pub fn from_s3_config(config: aws_sdk_s3::Config, uri: S3ObjectUri) -> S3Reader {
165 let client = aws_sdk_s3::Client::from_conf(config);
166 S3Reader {
167 client,
168 uri,
169 pos: 0,
170 header: None,
171 }
172 }
173
174 /// Creates a new `S3Reader` with a custom AWS `SdkConfig`
175 ///
176 /// This method is useful if you don't want to use the default configbuilder using the environment.
177 /// It does not check for correctness, connectivity to the S3 bucket or presence of the S3 object.
178 pub fn from_config(config: &external_types::SdkConfig, uri: S3ObjectUri) -> S3Reader {
179 let client = aws_sdk_s3::Client::new(config);
180 S3Reader {
181 client,
182 uri,
183 pos: 0,
184 header: None,
185 }
186 }
187
188 /// Returns A Future for the bytes read from the S3 object for the specified byte-range
189 ///
190 /// This method does not update the internal cursor position. To maintain
191 /// an internal state, use [`S3Reader::seek`] and [`S3Reader::read`] instead.
192 ///
193 /// The byte ranges `from` and `to` are both inclusive, see <https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35>
194 ///
195 /// # Example
196 /// ```no_run
197 /// use tokio::runtime::Runtime;
198 ///
199 /// use s3reader::S3Reader;
200 /// use s3reader::S3ObjectUri;
201 ///
202 /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
203 /// let mut reader = S3Reader::open(uri).unwrap();
204 ///
205 /// // `read_range` is an async function, we must wrap it in a runtime in the doctest
206 /// let bytes = Runtime::new().unwrap().block_on(
207 /// reader.read_range(100, 249)
208 /// ).unwrap().into_bytes();
209 /// assert_eq!(bytes.len(), 150);
210 /// ```
211 pub async fn read_range(
212 &mut self,
213 from: u64,
214 to: u64,
215 ) -> Result<external_types::AggregatedBytes, S3ReaderError> {
216 if to < from || from > self.len() {
217 return Err(S3ReaderError::InvalidRange(from, to));
218 }
219 let object_output = self
220 .client
221 .get_object()
222 .bucket(self.uri.bucket())
223 .key(self.uri.key())
224 .range(format!("bytes={}-{}", from, to))
225 .send()
226 .await?;
227
228 match object_output.body.collect().await {
229 Ok(x) => Ok(x),
230 Err(_) => Err(S3ReaderError::InvalidContent),
231 }
232 }
233
234 /// Returns the bytes read from the S3 object for the specified byte-range
235 ///
236 /// This method does not update the internal cursor position. To maintain
237 /// an internal state, use [`S3Reader::seek`] and [`S3Reader::read`] instead.
238 ///
239 /// The byte ranges `from` and `to` are both inclusive, see <https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35>
240 ///
241 /// This method also exists as an `async` method: [`S3Reader::read_range`]
242 ///
243 /// # Example
244 /// ```no_run
245 /// use s3reader::S3Reader;
246 /// use s3reader::S3ObjectUri;
247 ///
248 /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
249 /// let mut reader = S3Reader::open(uri).unwrap();
250 ///
251 /// let bytes = reader.read_range_sync(100, 249).unwrap().into_bytes();
252 /// assert_eq!(bytes.len(), 150);
253 /// ```
254 pub fn read_range_sync(
255 &mut self,
256 from: u64,
257 to: u64,
258 ) -> Result<external_types::AggregatedBytes, S3ReaderError> {
259 Runtime::new().unwrap().block_on(self.read_range(from, to))
260 }
261
262 /// Fetches the object's header from S3
263 ///
264 /// # Example
265 /// ```no_run
266 /// use tokio::runtime::Runtime;
267 ///
268 /// use s3reader::S3Reader;
269 /// use s3reader::S3ObjectUri;
270 ///
271 /// let uri = S3ObjectUri::new("s3://my-bucket/path/to/huge/file").unwrap();
272 /// let mut reader = S3Reader::open(uri).unwrap();
273 ///
274 /// // `fetch_header` is an async function, we must wrap it in a runtime in the doctest
275 /// Runtime::new().unwrap().block_on(
276 /// reader.fetch_header()
277 /// ).unwrap();
278 /// assert_eq!(reader.len(), 150);
279 /// ```
280 pub async fn fetch_header(
281 &mut self,
282 ) -> Result<(), external_types::SdkError<external_types::HeadObjectError>> {
283 let header = self
284 .client
285 .head_object()
286 .bucket(self.uri.bucket())
287 .key(self.uri.key())
288 .send()
289 .await?;
290 self.header = Some(header);
291 Ok(())
292 }
293
294 /// Returns the `content_length` of the S3 object
295 ///
296 /// # Panics
297 /// This method can panic if the header cannot be fetched (e.g. due to network issues, wrong URI etc).
298 /// This can be prevented by using [`S3Reader::open`] which guarantees that the header is present.
299 #[allow(clippy::len_without_is_empty)]
300 pub fn len(&mut self) -> u64 {
301 if let Some(header) = &self.header {
302 u64::try_from(header.content_length().unwrap()).unwrap()
303 } else {
304 Runtime::new()
305 .unwrap()
306 .block_on(self.fetch_header())
307 .expect("unable to determine the object size");
308 self.len()
309 }
310 }
311
312 pub fn pos(&self) -> u64 {
313 self.pos
314 }
315}
316
317impl Read for S3Reader {
318 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
319 if self.pos >= self.len() {
320 return Ok(0);
321 }
322 let end_pos = self.pos + buf.len() as u64;
323
324 // The `read_range` method uses inclusive byte ranges, we exclude the last byte
325 let s3_data = self.read_range_sync(self.pos, end_pos - 1)?;
326
327 // Ensure that the position cursor is only increased by the number of actually read bytes
328 self.pos += u64::try_from(s3_data.remaining()).unwrap();
329
330 // Use the Reader provided by `AggregatedBytes` instead of converting manually
331 let mut reader = s3_data.reader();
332 reader.read(buf)
333 }
334
335 /// Custom implementation to avoid too many `read` calls. The default trait
336 /// reads in 32 bytes blocks that grow over time. However, the IO for S3 has way
337 /// more latency so `S3Reader` tries to fetch all data in a single call
338 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize, std::io::Error> {
339 let reader_len = self.len();
340
341 // The `read_range` method uses inclusive byte ranges, we exclude the last byte
342 let s3_data = self.read_range_sync(self.pos, reader_len - 1)?;
343
344 // Ensure that the position cursor is only increased by the number of actually read bytes
345 let data_len = s3_data.remaining();
346 self.pos += u64::try_from(data_len).unwrap();
347
348 // We can't rely on the `AggregatedBytes` reader and must iterate the internal bytes buffer
349 // to push individual bytes into the buffer
350 buf.reserve(data_len);
351 for b in s3_data.into_bytes() {
352 buf.push(b);
353 }
354 Ok(data_len)
355 }
356
357 /// Custom implementation to avoid too many `read` calls. The default trait
358 /// reads in 32 bytes blocks that grow over time. However, the IO for S3 has way
359 /// more latency so `S3Reader` tries to fetch all data in a single call
360 fn read_to_string(&mut self, buf: &mut String) -> Result<usize, std::io::Error> {
361 // Allocate a new vector to utilize `read_to_end`. We don't have to specify the size here
362 // since `read_to_end` will extend the vector to the required capacity
363 let mut bytes = Vec::new();
364 match self.read_to_end(&mut bytes) {
365 Ok(n) => {
366 buf.reserve(n);
367 for byte in bytes {
368 buf.push(byte.into());
369 }
370 Ok(n)
371 }
372 Err(err) => Err(err),
373 }
374 }
375}
376
377impl Seek for S3Reader {
378 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
379 match s3reader_seek(self.len(), self.pos, pos) {
380 Ok(x) => {
381 self.pos = x;
382 Ok(x)
383 }
384 Err(err) => Err(err),
385 }
386 }
387}
388
389/// Calculates the new cursor for a `Seek` operation
390///
391/// This function is declared outside of `S3Reader` so that it can be
392/// unit-tested.
393fn s3reader_seek(len: u64, cursor: u64, pos: SeekFrom) -> Result<u64, std::io::Error> {
394 match pos {
395 SeekFrom::Start(x) => Ok(std::cmp::min(x, len)),
396 SeekFrom::Current(x) => match x >= 0 {
397 true => {
398 // we can safely cast this to u64, positive i64 will always be smaller and never be truncated
399 let x = x as u64;
400
401 // we can't seek beyond the end of the file
402 Ok(std::cmp::min(cursor + x, len))
403 }
404 false => {
405 // we can safely cast this to u64, since abs i64 will always be smaller than u64
406 let x = x.unsigned_abs();
407 if x > cursor {
408 return Err(std::io::Error::new(
409 std::io::ErrorKind::Other,
410 "position cannot be negative",
411 ));
412 }
413 Ok(cursor - x)
414 }
415 },
416 SeekFrom::End(x) => {
417 if x >= 0 {
418 // we can't seek beyond the end of the file
419 return Ok(len);
420 }
421 let x = x.unsigned_abs();
422 if x > len {
423 return Err(std::io::Error::new(
424 std::io::ErrorKind::Other,
425 "position cannot be negative",
426 ));
427 };
428 Ok(len - x)
429 }
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_absolute_position() {
439 assert_eq!(
440 s3reader_seek(100, 1, std::io::SeekFrom::Start(30)).unwrap(),
441 30
442 );
443 assert_eq!(
444 s3reader_seek(100, 1, std::io::SeekFrom::Start(0)).unwrap(),
445 0
446 );
447 assert_eq!(
448 s3reader_seek(100, 1, std::io::SeekFrom::Start(100)).unwrap(),
449 100
450 );
451 assert_eq!(
452 s3reader_seek(100, 1, std::io::SeekFrom::Start(120)).unwrap(),
453 100
454 );
455 }
456
457 #[test]
458 fn test_relative_position() {
459 assert_eq!(
460 s3reader_seek(100, 1, std::io::SeekFrom::Current(30)).unwrap(),
461 31
462 );
463 assert_eq!(
464 s3reader_seek(100, 1, std::io::SeekFrom::Current(99)).unwrap(),
465 100
466 );
467 assert_eq!(
468 s3reader_seek(100, 1, std::io::SeekFrom::Current(0)).unwrap(),
469 1
470 );
471 assert_eq!(
472 s3reader_seek(100, 1, std::io::SeekFrom::Current(-1)).unwrap(),
473 0
474 );
475 assert_eq!(
476 s3reader_seek(100, 0, std::io::SeekFrom::Current(0)).unwrap(),
477 0
478 );
479 assert_eq!(
480 s3reader_seek(100, 0, std::io::SeekFrom::Current(1)).unwrap(),
481 1
482 );
483 assert_eq!(
484 s3reader_seek(100, 1, std::io::SeekFrom::Current(100)).unwrap(),
485 100
486 );
487 assert!(s3reader_seek(100, 1, std::io::SeekFrom::Current(-2)).is_err());
488 }
489
490 #[test]
491 fn test_seek_from_end() {
492 assert_eq!(
493 s3reader_seek(100, 1, std::io::SeekFrom::End(1)).unwrap(),
494 100
495 );
496 assert_eq!(
497 s3reader_seek(100, 1, std::io::SeekFrom::End(0)).unwrap(),
498 100
499 );
500 assert_eq!(
501 s3reader_seek(100, 1, std::io::SeekFrom::End(-100)).unwrap(),
502 0
503 );
504 assert_eq!(
505 s3reader_seek(100, 1, std::io::SeekFrom::End(-50)).unwrap(),
506 50
507 );
508 assert!(s3reader_seek(100, 1, std::io::SeekFrom::End(-101)).is_err());
509 }
510
511 #[test]
512 fn test_uri_parser() {
513 let uri = S3ObjectUri::new("s3://mybucket/path/to/file.xls").unwrap();
514 assert_eq!(uri.bucket() , "mybucket");
515 assert_eq!(uri.key() , "path/to/file.xls");
516 }
517
518 #[test]
519 fn test_uri_without_protocol() {
520 assert!(S3ObjectUri::new("mybucket/path/to/file.xls").is_err());
521 }
522
523 #[test]
524 fn test_uri_with_wrong_protocol() {
525 assert!(S3ObjectUri::new("s5://mybucket/path/to/file.xls").is_err());
526 assert!(S3ObjectUri::new("s3//mybucket/path/to/file.xls").is_err());
527 assert!(S3ObjectUri::new("s3:/mybucket/path/to/file.xls").is_err());
528 }
529
530 #[test]
531 fn test_uri_with_missing_bucket() {
532 assert!(S3ObjectUri::new("s3://").is_err());
533 assert!(S3ObjectUri::new("s3://foobar").is_err());
534 }
535
536 #[test]
537 fn test_valid_uris() {
538 assert!(S3ObjectUri::new("s3://foobar/somethinglong").is_ok());
539 assert!(S3ObjectUri::new("s3://f/5").is_ok());
540 assert!(S3ObjectUri::new("s3://foobar/s/o/m/e/t/h/i/n/g/l/o/n/g").is_ok());
541 }
542}