cloud_file/
lib.rs

1#![warn(missing_docs)]
2#![warn(clippy::pedantic)]
3#![allow(
4    clippy::missing_panics_doc, // LATER: add panics docs
5    clippy::missing_errors_doc, // LATER: add errors docs
6    clippy::similar_names,
7    clippy::cast_possible_truncation,
8    clippy::cast_possible_wrap,
9    clippy::cast_sign_loss,
10    clippy::cast_lossless
11)]
12#![doc = include_str!("../README.md")]
13//! ## Main Functions
14//!
15//! | Function | Description |
16//! | -------- | ----------- |
17//! | [`CloudFile::new`](struct.CloudFile.html#method.new) | Use a URL string to specify a cloud file for reading. |
18//! | [`CloudFile::new_with_options`](struct.CloudFile.html#method.new_with_options) | Use a URL string and string options to specify a cloud file for reading. |
19//!
20//! ## URLs
21//!
22//! | Cloud Service | Example |
23//! | ------------- | ------- |
24//! | HTTP          | `https://www.gutenberg.org/cache/epub/100/pg100.txt` |
25//! | local file    | `file:///M:/data%20files/small.bed` |
26//! | AWS S3        | `s3://bedreader/v1/toydata.5chrom.bed` |
27//!
28//! Note: For local files, use the [`abs_path_to_url_string`](fn.abs_path_to_url_string.html) function to properly encode into a URL.
29//! 
30//! ## Options
31//!
32//! | Cloud Service | Details | Example |
33//! | -------- | ------- | ----------- |
34//! | HTTP | [`ClientConfigKey`](https://docs.rs/object_store/latest/object_store/enum.ClientConfigKey.html#variant.Timeout) | `[("timeout", "30s")]` |
35//! | local file | *none* | |
36//! | AWS S3 | [`AmazonS3ConfigKey`](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) | `[("aws_region", "us-west-2"), ("aws_access_key_id",` ...`), ("aws_secret_access_key",` ...`)]` |
37//! | Azure | [`AzureConfigKey`](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html) |  |
38//! | Google | [`GoogleConfigKey`](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html) |  |
39//! 
40//!
41//! ## High-Level [`CloudFile`](struct.CloudFile.html) Methods
42//! 
43//! | Method                        | Retrieves                                                                                                  |
44//! |-------------------------------|-------------------------------------------------------------------|
45//! | [`stream_chunks`](struct.CloudFile.html#method.stream_chunks)       | File contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) |
46//! | [`stream_line_chunks`](struct.CloudFile.html#method.stream_line_chunks) | File contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html), each containing one or more whole lines |
47//! | [`read_all`](struct.CloudFile.html#method.read_all)                | Whole file contents as an in-memory [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) |
48//! | [`read_range`](struct.CloudFile.html#method.read_range)            | [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range |
49//! | [`read_ranges`](struct.CloudFile.html#method.read_ranges)          | `Vec` of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from specified ranges |
50//! | [`read_range_and_file_size`](struct.CloudFile.html#method.read_range_and_file_size) | [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range & the file's size |
51//! | [`read_file_size`](struct.CloudFile.html#method.read_file_size)    | Size of the file                                     |
52//! | [`count_lines`](struct.CloudFile.html#method.count_lines)          | Number of lines in the file                          |
53//! 
54//! Additional methods:
55//! 
56//! | Method                        | Description                                                                                                  |
57//! |-------------------------------|-------------------------------------------------------------------|
58//! | [`clone`](struct.CloudFile.html#method.clone)                      | Clone the [`CloudFile`](struct.CloudFile.html) instance. Efficient by design. |
59//! | [`set_extension`](struct.CloudFile.html#method.set_extension)      | Change the [`CloudFile`](struct.CloudFile.html)'s file extension (in place).  |
60//!
61//! ## Low-Level [`CloudFile`](struct.CloudFile.html) Methods
62//! 
63//! | Method | Description |
64//! | -------- | ----------- |
65//! | [`get`](struct.CloudFile.html#method.get) | Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get) crate's `get` method. |
66//! | [`get_opts`](struct.CloudFile.html#method.get_opts) | Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get_opts) crate's `get_opts` method. |
67//! 
68//! ## Lowest-Level [`CloudFile`](struct.CloudFile.html) Methods
69//! 
70//! You can call any method from the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) crate. For example, here we
71//! use [`head`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.head) to get the metadata for a file and the last_modified time.
72//! 
73//! ```
74//! use cloud_file::CloudFile;
75//!
76//! # Runtime::new().unwrap().block_on(async {
77//! let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
78//! let cloud_file = CloudFile::new(url)?;
79//! let meta = cloud_file.cloud_service.head(&cloud_file.store_path).await?;
80//! let last_modified = meta.last_modified;
81//! println!("last_modified: {}", last_modified);
82//! assert_eq!(meta.size, 303);
83//! # Ok::<(), CloudFileError>(())}).unwrap();
84//! # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
85//! ```
86
87#[cfg(not(target_pointer_width = "64"))]
88compile_error!("This code requires a 64-bit target architecture.");
89
90use bytes::Bytes;
91use object_store::delimited::newline_delimited_stream;
92use core::fmt;
93use futures_util::stream::BoxStream;
94use futures_util::TryStreamExt;
95use object_store::http::HttpBuilder;
96#[doc(no_inline)]
97pub use object_store::path::Path as StorePath;
98use object_store::{GetOptions, GetRange, GetResult, ObjectStore};
99use std::ops::{Deref, Range};
100use std::path::Path;
101use std::sync::Arc;
102use thiserror::Error;
103use url::Url;
104
105#[derive(Debug)]
106/// The main struct representing the location of a file in the cloud.
107///
108/// It is constructed with [`CloudFile::new`](struct.CloudFile.html#method.new). It is, by design, cheap to clone.
109///
110/// Internally, it stores two pieces of information: the file's cloud service and the path to the file on that service.
111///
112/// # Examples
113///
114/// ```
115/// use cloud_file::CloudFile;
116///
117/// # Runtime::new().unwrap().block_on(async {
118/// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
119/// let cloud_file = CloudFile::new(url)?;
120/// assert_eq!(cloud_file.read_file_size().await?, 303);
121/// # Ok::<(), CloudFileError>(())}).unwrap();
122/// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
123/// ```
124pub struct CloudFile {
125    /// A cloud service, for example, Http, AWS S3, Azure, the local file system, etc.
126    /// Under the covers, it is an `Arc`-wrapped [`DynObjectStore`](struct.DynObjectStore.html).
127    /// The `DynObjectStore`, in turn, holds an [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) from the
128    /// powerful [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate.
129    pub cloud_service: Arc<DynObjectStore>,
130    /// A path to a file on the cloud service.
131    /// Under the covers, `StorePath` is an alias for a [`Path`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html)
132    /// in the [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate.
133    pub store_path: StorePath,
134}
135
136impl Clone for CloudFile {
137    fn clone(&self) -> Self {
138        CloudFile {
139            cloud_service: self.cloud_service.clone(),
140            store_path: self.store_path.clone(),
141        }
142    }
143}
144
145/// An empty set of cloud options
146/// 
147/// # Example
148/// ```
149/// use cloud_file::{EMPTY_OPTIONS, CloudFile};
150/// 
151/// # Runtime::new().unwrap().block_on(async {
152/// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
153/// let cloud_file = CloudFile::new_with_options(url, EMPTY_OPTIONS)?;
154/// assert_eq!(cloud_file.read_file_size().await?, 303);
155/// # Ok::<(), CloudFileError>(())}).unwrap();
156/// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
157/// ```
158pub const EMPTY_OPTIONS: [(&str, String); 0] = [];
159
160impl CloudFile {
161    /// Create a new [`CloudFile`] from a URL string.
162    ///
163    /// # Example
164    /// ```
165    /// use cloud_file::CloudFile;
166    ///
167    /// # Runtime::new().unwrap().block_on(async {
168    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
169    /// let cloud_file = CloudFile::new(url)?;
170    /// assert_eq!(cloud_file.read_file_size().await?, 303);
171    /// # Ok::<(), CloudFileError>(())}).unwrap();
172    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
173    /// ```
174    pub fn new(location: impl AsRef<str>) -> Result<CloudFile, CloudFileError> {
175        let location = location.as_ref();
176        let url = Url::parse(location)
177            .map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
178
179        let (object_store, store_path): (DynObjectStore, StorePath) =
180            parse_url_opts_work_around(&url, EMPTY_OPTIONS)?;
181        let cloud_file = CloudFile {
182            cloud_service: Arc::new(object_store),
183            store_path,
184        };
185        Ok(cloud_file)
186    }
187
188    /// Create a new [`CloudFile`] from an [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html)
189    /// and a [`object_store::path::Path`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html).
190    /// 
191    /// # Example
192    /// 
193    /// ```
194    /// use cloud_file::CloudFile;
195    /// use object_store::{http::HttpBuilder, path::Path as StorePath, ClientOptions};
196    /// use std::time::Duration;
197    ///
198    /// # Runtime::new().unwrap().block_on(async {
199    /// let client_options = ClientOptions::new().with_timeout(Duration::from_secs(30));
200    /// let http = HttpBuilder::new()
201    ///     .with_url("https://raw.githubusercontent.com")
202    ///     .with_client_options(client_options)
203    ///     .build()?;
204    /// let store_path = StorePath::parse("fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed")?;
205    /// 
206    /// let cloud_file = CloudFile::from_structs(http, store_path);
207    /// assert_eq!(cloud_file.read_file_size().await?, 303);
208    /// # Ok::<(), CloudFileError>(())}).unwrap();
209    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
210    /// ```
211
212    #[inline]
213    pub fn from_structs(store: impl ObjectStore, store_path: StorePath) -> Self {
214        CloudFile {
215            cloud_service: Arc::new(DynObjectStore(Box::new(store))),
216            store_path,
217        }
218    }
219
220    /// Create a new [`CloudFile`] from a URL string and options.
221    ///
222    /// # Example
223    /// ```
224    /// use cloud_file::CloudFile;
225    ///
226    /// # Runtime::new().unwrap().block_on(async {
227    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
228    /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
229    /// assert_eq!(cloud_file.read_file_size().await?, 303);
230    /// # Ok::<(), CloudFileError>(())}).unwrap();
231    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
232    /// ```
233    pub fn new_with_options<I, K, V>(
234        location: impl AsRef<str>,
235        options: I,
236    ) -> Result<CloudFile, CloudFileError>
237    where
238        I: IntoIterator<Item = (K, V)>,
239        K: AsRef<str>,
240        V: Into<String>,
241    {
242        let location = location.as_ref();
243        let url = Url::parse(location)
244            .map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
245
246        let (object_store, store_path): (DynObjectStore, StorePath) =
247            parse_url_opts_work_around(&url, options)?;
248        let cloud_file = CloudFile {
249            cloud_service: Arc::new(object_store),
250            store_path,
251        };
252        Ok(cloud_file)
253    }
254
255    /// Count the lines in a file stored in the cloud.
256    ///
257    /// # Example
258    /// ```
259    /// use cloud_file::CloudFile;
260    ///
261    /// # Runtime::new().unwrap().block_on(async {
262    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.fam";
263    /// let cloud_file = CloudFile::new(url)?;
264    /// assert_eq!(cloud_file.count_lines().await?, 10);
265    /// # Ok::<(), CloudFileError>(())}).unwrap();
266    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
267    /// ```
268    pub async fn count_lines(&self) -> Result<usize, CloudFileError> {
269        let stream = self.stream_chunks().await?;
270
271        let newline_count = stream
272            .try_fold(0, |acc, bytes| async move {
273                let count = bytecount::count(&bytes, b'\n');
274                Ok(acc + count) // Accumulate the count
275            })
276            .await
277            .map_err(CloudFileError::ObjectStoreError)?;
278        Ok(newline_count)
279    }
280
281    /// Return the size of a file stored in the cloud.
282    ///
283    /// # Example
284    /// ```
285    /// use cloud_file::CloudFile;
286    ///
287    /// # Runtime::new().unwrap().block_on(async {
288    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
289    /// let cloud_file = CloudFile::new(url)?;
290    /// assert_eq!(cloud_file.read_file_size().await?, 303);
291    /// # Ok::<(), CloudFileError>(())}).unwrap();
292    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
293    /// ```
294    pub async fn read_file_size(&self) -> Result<usize, CloudFileError> {
295        let meta = self.cloud_service.head(&self.store_path).await?;
296        Ok(meta.size)
297    }
298
299    /// Return the [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range.
300    ///
301    /// # Example
302    /// ```
303    /// use cloud_file::CloudFile;
304    ///
305    /// # Runtime::new().unwrap().block_on(async {
306    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bim";
307    /// let cloud_file = CloudFile::new(url)?;
308    /// let bytes = cloud_file.read_range((0..10)).await?;
309    /// assert_eq!(bytes.as_ref(), b"1\t1:1:A:C\t");
310    /// # Ok::<(), CloudFileError>(())}).unwrap();
311    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
312    /// ```
313    pub async fn read_range(&self, range: Range<usize>) -> Result<Bytes, CloudFileError> {
314        Ok(self
315            .cloud_service
316            .get_range(&self.store_path, range)
317            .await?)
318    }    
319
320    /// Return the `Vec` of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from specified ranges.
321    ///
322    /// # Example
323    /// ```
324    /// use cloud_file::CloudFile;
325    ///
326    /// # Runtime::new().unwrap().block_on(async {
327    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bim";
328    /// let cloud_file = CloudFile::new(url)?;
329    /// let bytes_vec = cloud_file.read_ranges(&[0..10, 1000..1010]).await?;
330    /// assert_eq!(bytes_vec.len(), 2);
331    /// assert_eq!(bytes_vec[0].as_ref(), b"1\t1:1:A:C\t");
332    /// assert_eq!(bytes_vec[1].as_ref(), b":A:C\t0.0\t4");
333    /// # Ok::<(), CloudFileError>(())}).unwrap();
334    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
335    /// ```
336    pub async fn read_ranges(&self, ranges: &[Range<usize>]) -> Result<Vec<Bytes>, CloudFileError> {
337        Ok(self
338            .cloud_service
339            .get_ranges(&self.store_path, ranges)
340            .await?)
341    }
342
343    /// Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get_opts) crate's `get_opts` method.
344    /// 
345    /// You can, for example, in one call retrieve a range of bytes from the file and the file's metadata. The
346    /// result is a [`GetResult`](https://docs.rs/object_store/latest/object_store/struct.GetResult.html).
347    ///
348    /// # Example
349    ///
350    /// In one call, read the first three bytes of a genomic data file and get
351    /// the size of the file. Check that the file starts with the expected file signature.
352    /// ```
353    /// use cloud_file::CloudFile;
354    /// use object_store::{GetRange, GetOptions};
355    ///
356    /// # Runtime::new().unwrap().block_on(async {
357    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
358    /// let cloud_file = CloudFile::new(url)?;
359    /// let get_options = GetOptions {
360    ///     range: Some(GetRange::Bounded(0..3)),
361    ///     ..Default::default()
362    /// };
363    /// let get_result = cloud_file.get_opts(get_options).await?;
364    /// let size: usize = get_result.meta.size;
365    /// let bytes = get_result
366    ///     .bytes()
367    ///     .await?;
368    /// assert_eq!(bytes.len(), 3);
369    /// assert_eq!(bytes[0], 0x6c);
370    /// assert_eq!(bytes[1], 0x1b);
371    /// assert_eq!(bytes[2], 0x01);
372    /// assert_eq!(size, 303);
373    /// # Ok::<(), CloudFileError>(())}).unwrap();
374    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
375    /// ```
376    pub async fn get_opts(&self, get_options: GetOptions) -> Result<GetResult, CloudFileError> {
377        Ok(self
378            .cloud_service
379            .get_opts(&self.store_path, get_options)
380            .await?)
381    }
382
383    /// Retrieve the [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range & the file's size.
384    ///
385    /// # Example
386    ///
387    /// In one call, read the first three bytes of a genomic data file and get
388    /// the size of the file. Check that the file starts with the expected file signature.
389    /// ```
390    /// use cloud_file::CloudFile;
391    ///
392    /// # Runtime::new().unwrap().block_on(async {
393    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
394    /// let cloud_file = CloudFile::new(url)?;
395    /// let (bytes, size) = cloud_file.read_range_and_file_size(0..3).await?;
396    /// assert_eq!(bytes.len(), 3);
397    /// assert_eq!(bytes[0], 0x6c);
398    /// assert_eq!(bytes[1], 0x1b);
399    /// assert_eq!(bytes[2], 0x01);
400    /// assert_eq!(size, 303);
401    /// # Ok::<(), CloudFileError>(())}).unwrap();
402    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
403    /// ```
404    pub async fn read_range_and_file_size (
405        &self,
406        range: Range<usize>,
407    ) -> Result<(Bytes, usize), CloudFileError> {
408        let get_options = GetOptions {
409            range: Some(GetRange::Bounded(range)),
410            ..Default::default()
411        };
412        let get_result = self
413            .cloud_service
414            .get_opts(&self.store_path, get_options)
415            .await?;
416        let size: usize = get_result.meta.size;
417        let bytes = get_result
418            .bytes()
419            .await
420            .map_err(CloudFileError::ObjectStoreError)?;
421        Ok((bytes, size))
422    }
423
424    /// Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get) crate's `get` method.
425    ///
426    /// The result is a [`GetResult`](https://docs.rs/object_store/latest/object_store/struct.GetResult.html) which can,
427    /// for example, be converted into a stream of bytes.
428    ///
429    /// # Example
430    ///
431    /// Do a 'get', turn result into a stream, then scan all the bytes of the
432    /// file for the newline character.
433    ///
434    /// ```rust
435    /// use cloud_file::CloudFile;
436    /// use futures_util::StreamExt;  // Enables `.next()` on streams.
437    ///
438    /// # Runtime::new().unwrap().block_on(async {
439    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
440    /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
441    /// let mut stream = cloud_file.get().await?.into_stream();
442    /// let mut newline_count: usize = 0;
443    /// while let Some(bytes) = stream.next().await {
444    ///     let bytes = bytes?;
445    ///     newline_count += bytecount::count(&bytes, b'\n');
446    ///     }
447    /// assert_eq!(newline_count, 500);
448    /// # Ok::<(), CloudFileError>(())}).unwrap();
449    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
450    /// ```
451    pub async fn get(&self) -> Result<GetResult, CloudFileError> {
452        Ok(self.cloud_service.get(&self.store_path).await?)
453    }
454
455    /// Read the whole file into an in-memory [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html).
456    ///
457    /// # Example
458    ///
459    /// Read the whole file, then scan all the bytes of the
460    /// for the newline character.
461    ///
462    /// ```rust
463    /// use cloud_file::CloudFile;
464    ///
465    /// # Runtime::new().unwrap().block_on(async {
466    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
467    /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
468    /// let all = cloud_file.read_all().await?;
469    /// let newline_count = bytecount::count(&all, b'\n');
470    /// assert_eq!(newline_count, 500);
471    /// # Ok::<(), CloudFileError>(())}).unwrap();
472    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
473    /// ```
474    pub async fn read_all(&self) -> Result<Bytes, CloudFileError> {
475        let all = self
476            .cloud_service
477            .get(&self.store_path)
478            .await?
479            .bytes()
480            .await?;
481        Ok(all)
482    }
483
484    /// Retrieve the file's contents as a stream of
485    /// [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html).
486    ///
487    /// # Example
488    ///
489    /// Open the file as a stream of bytes, then scan all the bytes
490    /// for the newline character.
491    ///
492    /// ```rust
493    /// use cloud_file::CloudFile;
494    /// use futures::StreamExt; // Enables `.next()` on streams.
495    ///
496    /// # Runtime::new().unwrap().block_on(async {
497    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
498    /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
499    /// let mut chunks = cloud_file.stream_chunks().await?;
500    /// let mut newline_count: usize = 0;
501    /// while let Some(chunk) = chunks.next().await {
502    ///     let chunk = chunk?;
503    ///     newline_count += bytecount::count(&chunk, b'\n');
504    ///     }
505    /// assert_eq!(newline_count, 500);
506    /// # Ok::<(), CloudFileError>(())}).unwrap();
507    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
508    /// ```
509    pub async fn stream_chunks(
510        &self,
511    ) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
512        let stream = self
513            .cloud_service
514            .get(&self.store_path)
515            .await?
516            .into_stream();
517        Ok(stream)
518    }
519
520    ///  Retrieve the file's contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html),
521    ///  each containing one or more whole lines.
522    ///
523    /// # Example
524    ///
525    /// Return the 12th line of a file.
526    ///
527    /// ```rust
528    /// use cloud_file::CloudFile;
529    /// use futures::StreamExt; // Enables `.next()` on streams.
530    /// use std::str::from_utf8;
531    ///
532    /// # Runtime::new().unwrap().block_on(async {
533    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
534    /// let goal_index = 12;
535    ///
536    /// let cloud_file = CloudFile::new(url)?;
537    /// let mut line_chunks = cloud_file.stream_line_chunks().await?;
538    /// let mut index_iter = 0..;
539    /// let mut goal_line = None;
540    /// 'outer_loop: while let Some(line_chunk) = line_chunks.next().await {
541    ///     let line_chunk = line_chunk?;
542    ///     let lines = from_utf8(&line_chunk)?.lines();
543    ///     for line in lines {
544    ///         let index = index_iter.next().unwrap(); // Safe because the iterator is infinite
545    ///         if index == goal_index {
546    ///             goal_line = Some(line.to_string());
547    ///             break 'outer_loop;
548    ///         }
549    ///     }
550    /// }
551    /// assert_eq!(goal_line, Some("per12 per12 0 0 2 -0.0382707".to_string()));
552    /// # Ok::<(), CloudFileError>(())}).unwrap();
553    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
554    /// ```
555    ///
556    pub async fn stream_line_chunks(
557        &self,
558    ) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
559        let chunks = self.stream_chunks().await?;
560        let line_chunks = newline_delimited_stream(chunks);
561        Ok(Box::pin(line_chunks))
562    }
563
564
565    /// Change the [`CloudFile`]'s extension (in place).
566    ///
567    /// It removes the current extension, if any.
568    /// It appends the given extension, if any.
569    ///
570    /// The method is in-place rather than functional to make it consistent with
571    /// [`std::path::PathBuf::set_extension`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html#method.set_extension).
572    ///
573    /// # Example
574    /// ```
575    /// use cloud_file::CloudFile;
576    ///
577    /// # Runtime::new().unwrap().block_on(async {
578    /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
579    /// let mut cloud_file = CloudFile::new(url)?;
580    /// assert_eq!(cloud_file.read_file_size().await?, 303);
581    /// cloud_file.set_extension("fam")?;
582    /// assert_eq!(cloud_file.read_file_size().await?, 130);
583    /// # Ok::<(), CloudFileError>(())}).unwrap();
584    /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
585    /// ```
586    pub fn set_extension(&mut self, extension: &str) -> Result<(), CloudFileError> {
587        let mut path_str = self.store_path.to_string();
588
589        // Find the last dot in the object path
590        if let Some(dot_index) = path_str.rfind('.') {
591            // Remove the current extension
592            path_str.truncate(dot_index);
593        }
594
595        if !extension.is_empty() {
596            // Append the new extension
597            path_str.push('.');
598            path_str.push_str(extension);
599        }
600
601        // Parse the string back to StorePath
602        self.store_path = StorePath::parse(&path_str)?;
603        Ok(())
604    }
605}
606
607#[allow(clippy::match_bool)]
608fn parse_work_around(url: &Url) -> Result<(bool, StorePath), object_store::Error> {
609    let strip_bucket = || Some(url.path().strip_prefix('/')?.split_once('/')?.1);
610
611    let (scheme, path) = match (url.scheme(), url.host_str()) {
612        ("http", Some(_)) => (true, url.path()),
613        ("https", Some(host)) => {
614            if host.ends_with("dfs.core.windows.net")
615                || host.ends_with("blob.core.windows.net")
616                || host.ends_with("dfs.fabric.microsoft.com")
617                || host.ends_with("blob.fabric.microsoft.com")
618            {
619                (false, url.path())
620            } else if host.ends_with("amazonaws.com") {
621                match host.starts_with("s3") {
622                    true => (false, strip_bucket().unwrap_or_default()),
623                    false => (false, url.path()),
624                }
625            } else if host.ends_with("r2.cloudflarestorage.com") {
626                (false, strip_bucket().unwrap_or_default())
627            } else {
628                (true, url.path())
629            }
630        }
631        _ => (false, url.path()),
632    };
633
634    Ok((scheme, StorePath::from_url_path(path)?))
635}
636
637// LATER when https://github.com/apache/arrow-rs/issues/5310 gets fixed, can remove work around
638fn parse_url_opts_work_around<I, K, V>(
639    url: &Url,
640    options: I,
641) -> Result<(DynObjectStore, StorePath), object_store::Error>
642where
643    I: IntoIterator<Item = (K, V)>,
644    K: AsRef<str>,
645    V: Into<String>,
646{
647    let (is_http, path) = parse_work_around(url)?;
648    if is_http {
649        let url = &url[..url::Position::BeforePath];
650        let path = StorePath::parse(path)?;
651        let builder = options.into_iter().fold(
652            <HttpBuilder>::new().with_url(url),
653            |builder, (key, value)| match key.as_ref().parse() {
654                Ok(k) => builder.with_config(k, value),
655                Err(_) => builder,
656            },
657        );
658        let store = DynObjectStore::new(builder.build()?);
659        Ok((store, path))
660    } else {
661        let (store, path) = object_store::parse_url_opts(url, options)?;
662        Ok((DynObjectStore(store), path))
663    }
664}
665
666impl fmt::Display for CloudFile {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        write!(f, "CloudFile: {:?}", self.store_path)
669    }
670}
671
672/// Wraps `Box<dyn ObjectStore>` for easier usage. An [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html), from the
673/// powerful [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate, represents a cloud service.
674#[derive(Debug)]
675pub struct DynObjectStore(pub Box<dyn ObjectStore>);
676
677// Implement Deref to allow access to the inner `ObjectStore` methods
678impl Deref for DynObjectStore {
679    type Target = dyn ObjectStore;
680
681    fn deref(&self) -> &Self::Target {
682        &*self.0
683    }
684}
685
686impl DynObjectStore {
687    #[inline]
688    fn new(store: impl ObjectStore ) -> Self {
689        DynObjectStore(Box::new(store) as Box<dyn ObjectStore>)
690    }
691}
692
693/// The error type for [`CloudFile`](struct.CloudFile.html) methods.
694#[derive(Error, Debug)]
695pub enum CloudFileError {
696    /// An error from [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate
697    #[error("Object store error: {0}")]
698    ObjectStoreError(#[from] object_store::Error),
699
700    /// An path-related error from [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate
701    #[error("Object store path error: {0}")]
702    ObjectStorePathError(#[from] object_store::path::Error),
703
704    /// An error related to converting bytes into UTF-8
705    #[error("UTF-8 error: {0}")]
706    Utf8Error(#[from] std::str::Utf8Error),
707
708    /// An error related to parsing a URL string
709    #[error("Cannot parse URL: {0} {1}")]
710    CannotParseUrl(String, String),
711
712    /// An error related to creating a URL from a file path
713    #[error("Cannot create URL from this absolute file path: '{0}'")]
714    CannotCreateUrlFromFilePath(String),
715}
716
717#[tokio::test]
718async fn cloud_file_2() -> Result<(), CloudFileError> {
719    let cloud_file = CloudFile::new(
720        "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed",
721        
722    )?;
723    assert_eq!(cloud_file.read_file_size().await?, 303);
724    Ok(())
725}
726
727#[tokio::test]
728async fn line_n() -> Result<(), CloudFileError> {
729    use std::str::from_utf8;
730    use futures_util::StreamExt;  // Enables `.next()` on streams.
731
732    let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
733    let goal_index = 12;
734
735    let cloud_file = CloudFile::new(url)?;
736    let mut line_chunks = cloud_file.stream_line_chunks().await?;
737    let mut index_iter = 0..;
738    let mut goal_line = None;
739    'outer_loop: while let Some(line_chunk) = line_chunks.next().await {
740        let line_chunk = line_chunk?;
741        let lines = from_utf8(&line_chunk)?.lines();
742        for line in lines {
743            let index = index_iter.next().unwrap(); // safe because we know the iterator is infinite
744            if index == goal_index {
745                goal_line = Some(line.to_string());
746                break 'outer_loop;
747            }
748        }
749    }
750
751    assert_eq!(goal_line, Some("per12 per12 0 0 2 -0.0382707".to_string()));
752    Ok(())
753}
754
755
756#[tokio::test]
757async fn cloud_file_extension() -> Result<(), CloudFileError> {
758    let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
759    let mut cloud_file = CloudFile::new(url)?;
760    assert_eq!(cloud_file.read_file_size().await?, 303);
761    cloud_file.set_extension("fam")?;
762    assert_eq!(cloud_file.read_file_size().await?, 130);
763    Ok(())
764}
765
766// The AWS tests are skipped if credentials are not available.
767#[tokio::test]
768async fn s3_play_cloud() -> Result<(), CloudFileError> {
769    use rusoto_credential::{CredentialsError, ProfileProvider, ProvideAwsCredentials};
770    let credentials = if let Ok(provider) = ProfileProvider::new() {
771        provider.credentials().await
772    } else {
773        Err(CredentialsError::new("No credentials found"))
774    };
775
776    let Ok(credentials) = credentials else {
777        eprintln!("Skipping test because no AWS credentials found");
778        return Ok(());
779    };
780
781    let url = "s3://bedreader/v1/toydata.5chrom.bed";
782    let options = [
783        ("aws_region", "us-west-2"),
784        ("aws_access_key_id", credentials.aws_access_key_id()),
785        ("aws_secret_access_key", credentials.aws_secret_access_key()),
786    ];
787
788    let cloud_file = CloudFile::new_with_options(url, options)?;
789    assert_eq!(cloud_file.read_file_size().await?, 1_250_003);
790    Ok(())
791}
792
793/// Given a local file's absolute path, return a URL string to that file.
794/// 
795/// # Example
796/// ```
797/// use cloud_file::abs_path_to_url_string;
798/// 
799/// // Define a sample file_name and expected_url based on the target OS
800/// #[cfg(target_os = "windows")]
801/// let (file_name, expected_url) = (r"M:\data files\small.bed", "file:///M:/data%20files/small.bed");
802/// 
803/// #[cfg(not(target_os = "windows"))]
804/// let (file_name, expected_url) = (r"/data files/small.bed", "file:///data%20files/small.bed");
805/// 
806/// let url = abs_path_to_url_string(file_name)?;
807/// assert_eq!(url, expected_url);
808 /// # use cloud_file::CloudFileError;
809 /// # Ok::<(), CloudFileError>(())
810 /// ```
811pub fn abs_path_to_url_string(path: impl AsRef<Path>) -> Result<String, CloudFileError> {
812    let path = path.as_ref();
813    let url = Url::from_file_path(path)
814        .map_err(|_e| {
815            CloudFileError::CannotCreateUrlFromFilePath(path.to_string_lossy().to_string())
816        })?
817        .to_string();
818    Ok(url)
819}
820
821#[test]
822fn readme_1() {
823    use futures_util::StreamExt;  // Enables `.next()` on streams.
824    use tokio::runtime::Runtime;
825
826    Runtime::new()
827        .unwrap()
828        .block_on(async {
829            let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
830            let cloud_file = CloudFile::new(url)?;
831            let mut chunks = cloud_file.stream_chunks().await?;
832            let mut newline_count: usize = 0;
833            while let Some(chunk) = chunks.next().await {
834                let chunk = chunk?;
835                newline_count += bytecount::count(&chunk, b'\n');
836            }
837            assert_eq!(newline_count, 500);
838            Ok::<(), CloudFileError>(())
839        })
840        .unwrap();
841}
842
843
844#[tokio::test]
845async fn check_file_signature() -> Result<(), CloudFileError> {
846    let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
847    let cloud_file = CloudFile::new(url)?;
848    let (bytes, size) = cloud_file.read_range_and_file_size(0..3).await?;
849
850    assert_eq!(bytes.len(), 3);
851    assert_eq!(bytes[0], 0x6c);
852    assert_eq!(bytes[1], 0x1b);
853    assert_eq!(bytes[2], 0x01);
854    assert_eq!(size, 303);
855    Ok(())
856}
857
858#[tokio::test]
859async fn from_structs_example() -> Result<(), CloudFileError> {
860    use object_store::{http::HttpBuilder, path::Path as StorePath, ClientOptions};
861    use std::time::Duration;
862
863    let client_options = ClientOptions::new().with_timeout(Duration::from_secs(30));
864    let http = HttpBuilder::new()
865        .with_url("https://raw.githubusercontent.com")
866        .with_client_options(client_options)
867        .build()?;
868    let store_path = StorePath::parse("fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed")?;
869
870    let cloud_file = CloudFile::from_structs(http, store_path);
871    assert_eq!(cloud_file.read_file_size().await?, 303);
872    Ok(())
873}
874
875#[tokio::test]
876async fn local_file() -> Result<(), CloudFileError> {
877    use std::env;
878
879    let apache_url = abs_path_to_url_string(env::var("CARGO_MANIFEST_DIR").unwrap() + "/LICENSE-APACHE")?;
880    let cloud_file = CloudFile::new(&apache_url)?;
881    assert_eq!(cloud_file.count_lines().await?, 175);
882    Ok(())
883}