cloud_file/lib.rs
1#![warn(missing_docs)]
2#![warn(clippy::pedantic)]
3#![allow(
4 clippy::missing_panics_doc, // LATER: add panics docs
5 clippy::missing_errors_doc, // LATER: add errors docs
6 clippy::similar_names,
7 clippy::cast_possible_truncation,
8 clippy::cast_possible_wrap,
9 clippy::cast_sign_loss,
10 clippy::cast_lossless
11)]
12#![doc = include_str!("../README.md")]
13//! ## Main Functions
14//!
15//! | Function | Description |
16//! | -------- | ----------- |
17//! | [`CloudFile::new`](struct.CloudFile.html#method.new) | Use a URL string to specify a cloud file for reading. |
18//! | [`CloudFile::new_with_options`](struct.CloudFile.html#method.new_with_options) | Use a URL string and string options to specify a cloud file for reading. |
19//!
20//! ## URLs
21//!
22//! | Cloud Service | Example |
23//! | ------------- | ------- |
24//! | HTTP | `https://www.gutenberg.org/cache/epub/100/pg100.txt` |
25//! | local file | `file:///M:/data%20files/small.bed` |
26//! | AWS S3 | `s3://bedreader/v1/toydata.5chrom.bed` |
27//!
28//! Note: For local files, use the [`abs_path_to_url_string`](fn.abs_path_to_url_string.html) function to properly encode into a URL.
29//!
30//! ## Options
31//!
32//! | Cloud Service | Details | Example |
33//! | -------- | ------- | ----------- |
34//! | HTTP | [`ClientConfigKey`](https://docs.rs/object_store/latest/object_store/enum.ClientConfigKey.html#variant.Timeout) | `[("timeout", "30s")]` |
35//! | local file | *none* | |
36//! | AWS S3 | [`AmazonS3ConfigKey`](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) | `[("aws_region", "us-west-2"), ("aws_access_key_id",` ...`), ("aws_secret_access_key",` ...`)]` |
37//! | Azure | [`AzureConfigKey`](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html) | |
38//! | Google | [`GoogleConfigKey`](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html) | |
39//!
40//!
41//! ## High-Level [`CloudFile`](struct.CloudFile.html) Methods
42//!
43//! | Method | Retrieves |
44//! |-------------------------------|-------------------------------------------------------------------|
45//! | [`stream_chunks`](struct.CloudFile.html#method.stream_chunks) | File contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) |
46//! | [`stream_line_chunks`](struct.CloudFile.html#method.stream_line_chunks) | File contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html), each containing one or more whole lines |
47//! | [`read_all`](struct.CloudFile.html#method.read_all) | Whole file contents as an in-memory [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) |
48//! | [`read_range`](struct.CloudFile.html#method.read_range) | [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range |
49//! | [`read_ranges`](struct.CloudFile.html#method.read_ranges) | `Vec` of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from specified ranges |
50//! | [`read_range_and_file_size`](struct.CloudFile.html#method.read_range_and_file_size) | [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range & the file's size |
51//! | [`read_file_size`](struct.CloudFile.html#method.read_file_size) | Size of the file |
52//! | [`count_lines`](struct.CloudFile.html#method.count_lines) | Number of lines in the file |
53//!
54//! Additional methods:
55//!
56//! | Method | Description |
57//! |-------------------------------|-------------------------------------------------------------------|
58//! | [`clone`](struct.CloudFile.html#method.clone) | Clone the [`CloudFile`](struct.CloudFile.html) instance. Efficient by design. |
59//! | [`set_extension`](struct.CloudFile.html#method.set_extension) | Change the [`CloudFile`](struct.CloudFile.html)'s file extension (in place). |
60//!
61//! ## Low-Level [`CloudFile`](struct.CloudFile.html) Methods
62//!
63//! | Method | Description |
64//! | -------- | ----------- |
65//! | [`get`](struct.CloudFile.html#method.get) | Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get) crate's `get` method. |
66//! | [`get_opts`](struct.CloudFile.html#method.get_opts) | Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get_opts) crate's `get_opts` method. |
67//!
68//! ## Lowest-Level [`CloudFile`](struct.CloudFile.html) Methods
69//!
70//! You can call any method from the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) crate. For example, here we
71//! use [`head`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.head) to get the metadata for a file and the last_modified time.
72//!
73//! ```
74//! use cloud_file::CloudFile;
75//!
76//! # Runtime::new().unwrap().block_on(async {
77//! let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
78//! let cloud_file = CloudFile::new(url)?;
79//! let meta = cloud_file.cloud_service.head(&cloud_file.store_path).await?;
80//! let last_modified = meta.last_modified;
81//! println!("last_modified: {}", last_modified);
82//! assert_eq!(meta.size, 303);
83//! # Ok::<(), CloudFileError>(())}).unwrap();
84//! # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
85//! ```
86
87#[cfg(not(target_pointer_width = "64"))]
88compile_error!("This code requires a 64-bit target architecture.");
89
90use bytes::Bytes;
91use object_store::delimited::newline_delimited_stream;
92use core::fmt;
93use futures_util::stream::BoxStream;
94use futures_util::TryStreamExt;
95use object_store::http::HttpBuilder;
96#[doc(no_inline)]
97pub use object_store::path::Path as StorePath;
98use object_store::{GetOptions, GetRange, GetResult, ObjectStore};
99use std::ops::{Deref, Range};
100use std::path::Path;
101use std::sync::Arc;
102use thiserror::Error;
103use url::Url;
104
105#[derive(Debug)]
106/// The main struct representing the location of a file in the cloud.
107///
108/// It is constructed with [`CloudFile::new`](struct.CloudFile.html#method.new). It is, by design, cheap to clone.
109///
110/// Internally, it stores two pieces of information: the file's cloud service and the path to the file on that service.
111///
112/// # Examples
113///
114/// ```
115/// use cloud_file::CloudFile;
116///
117/// # Runtime::new().unwrap().block_on(async {
118/// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
119/// let cloud_file = CloudFile::new(url)?;
120/// assert_eq!(cloud_file.read_file_size().await?, 303);
121/// # Ok::<(), CloudFileError>(())}).unwrap();
122/// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
123/// ```
124pub struct CloudFile {
125 /// A cloud service, for example, Http, AWS S3, Azure, the local file system, etc.
126 /// Under the covers, it is an `Arc`-wrapped [`DynObjectStore`](struct.DynObjectStore.html).
127 /// The `DynObjectStore`, in turn, holds an [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) from the
128 /// powerful [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate.
129 pub cloud_service: Arc<DynObjectStore>,
130 /// A path to a file on the cloud service.
131 /// Under the covers, `StorePath` is an alias for a [`Path`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html)
132 /// in the [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate.
133 pub store_path: StorePath,
134}
135
136impl Clone for CloudFile {
137 fn clone(&self) -> Self {
138 CloudFile {
139 cloud_service: self.cloud_service.clone(),
140 store_path: self.store_path.clone(),
141 }
142 }
143}
144
145/// An empty set of cloud options
146///
147/// # Example
148/// ```
149/// use cloud_file::{EMPTY_OPTIONS, CloudFile};
150///
151/// # Runtime::new().unwrap().block_on(async {
152/// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
153/// let cloud_file = CloudFile::new_with_options(url, EMPTY_OPTIONS)?;
154/// assert_eq!(cloud_file.read_file_size().await?, 303);
155/// # Ok::<(), CloudFileError>(())}).unwrap();
156/// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
157/// ```
158pub const EMPTY_OPTIONS: [(&str, String); 0] = [];
159
160impl CloudFile {
161 /// Create a new [`CloudFile`] from a URL string.
162 ///
163 /// # Example
164 /// ```
165 /// use cloud_file::CloudFile;
166 ///
167 /// # Runtime::new().unwrap().block_on(async {
168 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
169 /// let cloud_file = CloudFile::new(url)?;
170 /// assert_eq!(cloud_file.read_file_size().await?, 303);
171 /// # Ok::<(), CloudFileError>(())}).unwrap();
172 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
173 /// ```
174 pub fn new(location: impl AsRef<str>) -> Result<CloudFile, CloudFileError> {
175 let location = location.as_ref();
176 let url = Url::parse(location)
177 .map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
178
179 let (object_store, store_path): (DynObjectStore, StorePath) =
180 parse_url_opts_work_around(&url, EMPTY_OPTIONS)?;
181 let cloud_file = CloudFile {
182 cloud_service: Arc::new(object_store),
183 store_path,
184 };
185 Ok(cloud_file)
186 }
187
188 /// Create a new [`CloudFile`] from an [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html)
189 /// and a [`object_store::path::Path`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html).
190 ///
191 /// # Example
192 ///
193 /// ```
194 /// use cloud_file::CloudFile;
195 /// use object_store::{http::HttpBuilder, path::Path as StorePath, ClientOptions};
196 /// use std::time::Duration;
197 ///
198 /// # Runtime::new().unwrap().block_on(async {
199 /// let client_options = ClientOptions::new().with_timeout(Duration::from_secs(30));
200 /// let http = HttpBuilder::new()
201 /// .with_url("https://raw.githubusercontent.com")
202 /// .with_client_options(client_options)
203 /// .build()?;
204 /// let store_path = StorePath::parse("fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed")?;
205 ///
206 /// let cloud_file = CloudFile::from_structs(http, store_path);
207 /// assert_eq!(cloud_file.read_file_size().await?, 303);
208 /// # Ok::<(), CloudFileError>(())}).unwrap();
209 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
210 /// ```
211
212 #[inline]
213 pub fn from_structs(store: impl ObjectStore, store_path: StorePath) -> Self {
214 CloudFile {
215 cloud_service: Arc::new(DynObjectStore(Box::new(store))),
216 store_path,
217 }
218 }
219
220 /// Create a new [`CloudFile`] from a URL string and options.
221 ///
222 /// # Example
223 /// ```
224 /// use cloud_file::CloudFile;
225 ///
226 /// # Runtime::new().unwrap().block_on(async {
227 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
228 /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
229 /// assert_eq!(cloud_file.read_file_size().await?, 303);
230 /// # Ok::<(), CloudFileError>(())}).unwrap();
231 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
232 /// ```
233 pub fn new_with_options<I, K, V>(
234 location: impl AsRef<str>,
235 options: I,
236 ) -> Result<CloudFile, CloudFileError>
237 where
238 I: IntoIterator<Item = (K, V)>,
239 K: AsRef<str>,
240 V: Into<String>,
241 {
242 let location = location.as_ref();
243 let url = Url::parse(location)
244 .map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
245
246 let (object_store, store_path): (DynObjectStore, StorePath) =
247 parse_url_opts_work_around(&url, options)?;
248 let cloud_file = CloudFile {
249 cloud_service: Arc::new(object_store),
250 store_path,
251 };
252 Ok(cloud_file)
253 }
254
255 /// Count the lines in a file stored in the cloud.
256 ///
257 /// # Example
258 /// ```
259 /// use cloud_file::CloudFile;
260 ///
261 /// # Runtime::new().unwrap().block_on(async {
262 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.fam";
263 /// let cloud_file = CloudFile::new(url)?;
264 /// assert_eq!(cloud_file.count_lines().await?, 10);
265 /// # Ok::<(), CloudFileError>(())}).unwrap();
266 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
267 /// ```
268 pub async fn count_lines(&self) -> Result<usize, CloudFileError> {
269 let stream = self.stream_chunks().await?;
270
271 let newline_count = stream
272 .try_fold(0, |acc, bytes| async move {
273 let count = bytecount::count(&bytes, b'\n');
274 Ok(acc + count) // Accumulate the count
275 })
276 .await
277 .map_err(CloudFileError::ObjectStoreError)?;
278 Ok(newline_count)
279 }
280
281 /// Return the size of a file stored in the cloud.
282 ///
283 /// # Example
284 /// ```
285 /// use cloud_file::CloudFile;
286 ///
287 /// # Runtime::new().unwrap().block_on(async {
288 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
289 /// let cloud_file = CloudFile::new(url)?;
290 /// assert_eq!(cloud_file.read_file_size().await?, 303);
291 /// # Ok::<(), CloudFileError>(())}).unwrap();
292 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
293 /// ```
294 pub async fn read_file_size(&self) -> Result<usize, CloudFileError> {
295 let meta = self.cloud_service.head(&self.store_path).await?;
296 Ok(meta.size)
297 }
298
299 /// Return the [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range.
300 ///
301 /// # Example
302 /// ```
303 /// use cloud_file::CloudFile;
304 ///
305 /// # Runtime::new().unwrap().block_on(async {
306 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bim";
307 /// let cloud_file = CloudFile::new(url)?;
308 /// let bytes = cloud_file.read_range((0..10)).await?;
309 /// assert_eq!(bytes.as_ref(), b"1\t1:1:A:C\t");
310 /// # Ok::<(), CloudFileError>(())}).unwrap();
311 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
312 /// ```
313 pub async fn read_range(&self, range: Range<usize>) -> Result<Bytes, CloudFileError> {
314 Ok(self
315 .cloud_service
316 .get_range(&self.store_path, range)
317 .await?)
318 }
319
320 /// Return the `Vec` of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from specified ranges.
321 ///
322 /// # Example
323 /// ```
324 /// use cloud_file::CloudFile;
325 ///
326 /// # Runtime::new().unwrap().block_on(async {
327 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bim";
328 /// let cloud_file = CloudFile::new(url)?;
329 /// let bytes_vec = cloud_file.read_ranges(&[0..10, 1000..1010]).await?;
330 /// assert_eq!(bytes_vec.len(), 2);
331 /// assert_eq!(bytes_vec[0].as_ref(), b"1\t1:1:A:C\t");
332 /// assert_eq!(bytes_vec[1].as_ref(), b":A:C\t0.0\t4");
333 /// # Ok::<(), CloudFileError>(())}).unwrap();
334 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
335 /// ```
336 pub async fn read_ranges(&self, ranges: &[Range<usize>]) -> Result<Vec<Bytes>, CloudFileError> {
337 Ok(self
338 .cloud_service
339 .get_ranges(&self.store_path, ranges)
340 .await?)
341 }
342
343 /// Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get_opts) crate's `get_opts` method.
344 ///
345 /// You can, for example, in one call retrieve a range of bytes from the file and the file's metadata. The
346 /// result is a [`GetResult`](https://docs.rs/object_store/latest/object_store/struct.GetResult.html).
347 ///
348 /// # Example
349 ///
350 /// In one call, read the first three bytes of a genomic data file and get
351 /// the size of the file. Check that the file starts with the expected file signature.
352 /// ```
353 /// use cloud_file::CloudFile;
354 /// use object_store::{GetRange, GetOptions};
355 ///
356 /// # Runtime::new().unwrap().block_on(async {
357 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
358 /// let cloud_file = CloudFile::new(url)?;
359 /// let get_options = GetOptions {
360 /// range: Some(GetRange::Bounded(0..3)),
361 /// ..Default::default()
362 /// };
363 /// let get_result = cloud_file.get_opts(get_options).await?;
364 /// let size: usize = get_result.meta.size;
365 /// let bytes = get_result
366 /// .bytes()
367 /// .await?;
368 /// assert_eq!(bytes.len(), 3);
369 /// assert_eq!(bytes[0], 0x6c);
370 /// assert_eq!(bytes[1], 0x1b);
371 /// assert_eq!(bytes[2], 0x01);
372 /// assert_eq!(size, 303);
373 /// # Ok::<(), CloudFileError>(())}).unwrap();
374 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
375 /// ```
376 pub async fn get_opts(&self, get_options: GetOptions) -> Result<GetResult, CloudFileError> {
377 Ok(self
378 .cloud_service
379 .get_opts(&self.store_path, get_options)
380 .await?)
381 }
382
383 /// Retrieve the [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) from a specified range & the file's size.
384 ///
385 /// # Example
386 ///
387 /// In one call, read the first three bytes of a genomic data file and get
388 /// the size of the file. Check that the file starts with the expected file signature.
389 /// ```
390 /// use cloud_file::CloudFile;
391 ///
392 /// # Runtime::new().unwrap().block_on(async {
393 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
394 /// let cloud_file = CloudFile::new(url)?;
395 /// let (bytes, size) = cloud_file.read_range_and_file_size(0..3).await?;
396 /// assert_eq!(bytes.len(), 3);
397 /// assert_eq!(bytes[0], 0x6c);
398 /// assert_eq!(bytes[1], 0x1b);
399 /// assert_eq!(bytes[2], 0x01);
400 /// assert_eq!(size, 303);
401 /// # Ok::<(), CloudFileError>(())}).unwrap();
402 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
403 /// ```
404 pub async fn read_range_and_file_size (
405 &self,
406 range: Range<usize>,
407 ) -> Result<(Bytes, usize), CloudFileError> {
408 let get_options = GetOptions {
409 range: Some(GetRange::Bounded(range)),
410 ..Default::default()
411 };
412 let get_result = self
413 .cloud_service
414 .get_opts(&self.store_path, get_options)
415 .await?;
416 let size: usize = get_result.meta.size;
417 let bytes = get_result
418 .bytes()
419 .await
420 .map_err(CloudFileError::ObjectStoreError)?;
421 Ok((bytes, size))
422 }
423
424 /// Call the [`object_store`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.get) crate's `get` method.
425 ///
426 /// The result is a [`GetResult`](https://docs.rs/object_store/latest/object_store/struct.GetResult.html) which can,
427 /// for example, be converted into a stream of bytes.
428 ///
429 /// # Example
430 ///
431 /// Do a 'get', turn result into a stream, then scan all the bytes of the
432 /// file for the newline character.
433 ///
434 /// ```rust
435 /// use cloud_file::CloudFile;
436 /// use futures_util::StreamExt; // Enables `.next()` on streams.
437 ///
438 /// # Runtime::new().unwrap().block_on(async {
439 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
440 /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
441 /// let mut stream = cloud_file.get().await?.into_stream();
442 /// let mut newline_count: usize = 0;
443 /// while let Some(bytes) = stream.next().await {
444 /// let bytes = bytes?;
445 /// newline_count += bytecount::count(&bytes, b'\n');
446 /// }
447 /// assert_eq!(newline_count, 500);
448 /// # Ok::<(), CloudFileError>(())}).unwrap();
449 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
450 /// ```
451 pub async fn get(&self) -> Result<GetResult, CloudFileError> {
452 Ok(self.cloud_service.get(&self.store_path).await?)
453 }
454
455 /// Read the whole file into an in-memory [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html).
456 ///
457 /// # Example
458 ///
459 /// Read the whole file, then scan all the bytes of the
460 /// for the newline character.
461 ///
462 /// ```rust
463 /// use cloud_file::CloudFile;
464 ///
465 /// # Runtime::new().unwrap().block_on(async {
466 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
467 /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
468 /// let all = cloud_file.read_all().await?;
469 /// let newline_count = bytecount::count(&all, b'\n');
470 /// assert_eq!(newline_count, 500);
471 /// # Ok::<(), CloudFileError>(())}).unwrap();
472 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
473 /// ```
474 pub async fn read_all(&self) -> Result<Bytes, CloudFileError> {
475 let all = self
476 .cloud_service
477 .get(&self.store_path)
478 .await?
479 .bytes()
480 .await?;
481 Ok(all)
482 }
483
484 /// Retrieve the file's contents as a stream of
485 /// [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html).
486 ///
487 /// # Example
488 ///
489 /// Open the file as a stream of bytes, then scan all the bytes
490 /// for the newline character.
491 ///
492 /// ```rust
493 /// use cloud_file::CloudFile;
494 /// use futures::StreamExt; // Enables `.next()` on streams.
495 ///
496 /// # Runtime::new().unwrap().block_on(async {
497 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
498 /// let cloud_file = CloudFile::new_with_options(url, [("timeout", "30s")])?;
499 /// let mut chunks = cloud_file.stream_chunks().await?;
500 /// let mut newline_count: usize = 0;
501 /// while let Some(chunk) = chunks.next().await {
502 /// let chunk = chunk?;
503 /// newline_count += bytecount::count(&chunk, b'\n');
504 /// }
505 /// assert_eq!(newline_count, 500);
506 /// # Ok::<(), CloudFileError>(())}).unwrap();
507 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
508 /// ```
509 pub async fn stream_chunks(
510 &self,
511 ) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
512 let stream = self
513 .cloud_service
514 .get(&self.store_path)
515 .await?
516 .into_stream();
517 Ok(stream)
518 }
519
520 /// Retrieve the file's contents as a stream of [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html),
521 /// each containing one or more whole lines.
522 ///
523 /// # Example
524 ///
525 /// Return the 12th line of a file.
526 ///
527 /// ```rust
528 /// use cloud_file::CloudFile;
529 /// use futures::StreamExt; // Enables `.next()` on streams.
530 /// use std::str::from_utf8;
531 ///
532 /// # Runtime::new().unwrap().block_on(async {
533 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
534 /// let goal_index = 12;
535 ///
536 /// let cloud_file = CloudFile::new(url)?;
537 /// let mut line_chunks = cloud_file.stream_line_chunks().await?;
538 /// let mut index_iter = 0..;
539 /// let mut goal_line = None;
540 /// 'outer_loop: while let Some(line_chunk) = line_chunks.next().await {
541 /// let line_chunk = line_chunk?;
542 /// let lines = from_utf8(&line_chunk)?.lines();
543 /// for line in lines {
544 /// let index = index_iter.next().unwrap(); // Safe because the iterator is infinite
545 /// if index == goal_index {
546 /// goal_line = Some(line.to_string());
547 /// break 'outer_loop;
548 /// }
549 /// }
550 /// }
551 /// assert_eq!(goal_line, Some("per12 per12 0 0 2 -0.0382707".to_string()));
552 /// # Ok::<(), CloudFileError>(())}).unwrap();
553 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
554 /// ```
555 ///
556 pub async fn stream_line_chunks(
557 &self,
558 ) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
559 let chunks = self.stream_chunks().await?;
560 let line_chunks = newline_delimited_stream(chunks);
561 Ok(Box::pin(line_chunks))
562 }
563
564
565 /// Change the [`CloudFile`]'s extension (in place).
566 ///
567 /// It removes the current extension, if any.
568 /// It appends the given extension, if any.
569 ///
570 /// The method is in-place rather than functional to make it consistent with
571 /// [`std::path::PathBuf::set_extension`](https://doc.rust-lang.org/stable/std/path/struct.PathBuf.html#method.set_extension).
572 ///
573 /// # Example
574 /// ```
575 /// use cloud_file::CloudFile;
576 ///
577 /// # Runtime::new().unwrap().block_on(async {
578 /// let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
579 /// let mut cloud_file = CloudFile::new(url)?;
580 /// assert_eq!(cloud_file.read_file_size().await?, 303);
581 /// cloud_file.set_extension("fam")?;
582 /// assert_eq!(cloud_file.read_file_size().await?, 130);
583 /// # Ok::<(), CloudFileError>(())}).unwrap();
584 /// # use {tokio::runtime::Runtime, cloud_file::CloudFileError};
585 /// ```
586 pub fn set_extension(&mut self, extension: &str) -> Result<(), CloudFileError> {
587 let mut path_str = self.store_path.to_string();
588
589 // Find the last dot in the object path
590 if let Some(dot_index) = path_str.rfind('.') {
591 // Remove the current extension
592 path_str.truncate(dot_index);
593 }
594
595 if !extension.is_empty() {
596 // Append the new extension
597 path_str.push('.');
598 path_str.push_str(extension);
599 }
600
601 // Parse the string back to StorePath
602 self.store_path = StorePath::parse(&path_str)?;
603 Ok(())
604 }
605}
606
607#[allow(clippy::match_bool)]
608fn parse_work_around(url: &Url) -> Result<(bool, StorePath), object_store::Error> {
609 let strip_bucket = || Some(url.path().strip_prefix('/')?.split_once('/')?.1);
610
611 let (scheme, path) = match (url.scheme(), url.host_str()) {
612 ("http", Some(_)) => (true, url.path()),
613 ("https", Some(host)) => {
614 if host.ends_with("dfs.core.windows.net")
615 || host.ends_with("blob.core.windows.net")
616 || host.ends_with("dfs.fabric.microsoft.com")
617 || host.ends_with("blob.fabric.microsoft.com")
618 {
619 (false, url.path())
620 } else if host.ends_with("amazonaws.com") {
621 match host.starts_with("s3") {
622 true => (false, strip_bucket().unwrap_or_default()),
623 false => (false, url.path()),
624 }
625 } else if host.ends_with("r2.cloudflarestorage.com") {
626 (false, strip_bucket().unwrap_or_default())
627 } else {
628 (true, url.path())
629 }
630 }
631 _ => (false, url.path()),
632 };
633
634 Ok((scheme, StorePath::from_url_path(path)?))
635}
636
637// LATER when https://github.com/apache/arrow-rs/issues/5310 gets fixed, can remove work around
638fn parse_url_opts_work_around<I, K, V>(
639 url: &Url,
640 options: I,
641) -> Result<(DynObjectStore, StorePath), object_store::Error>
642where
643 I: IntoIterator<Item = (K, V)>,
644 K: AsRef<str>,
645 V: Into<String>,
646{
647 let (is_http, path) = parse_work_around(url)?;
648 if is_http {
649 let url = &url[..url::Position::BeforePath];
650 let path = StorePath::parse(path)?;
651 let builder = options.into_iter().fold(
652 <HttpBuilder>::new().with_url(url),
653 |builder, (key, value)| match key.as_ref().parse() {
654 Ok(k) => builder.with_config(k, value),
655 Err(_) => builder,
656 },
657 );
658 let store = DynObjectStore::new(builder.build()?);
659 Ok((store, path))
660 } else {
661 let (store, path) = object_store::parse_url_opts(url, options)?;
662 Ok((DynObjectStore(store), path))
663 }
664}
665
666impl fmt::Display for CloudFile {
667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668 write!(f, "CloudFile: {:?}", self.store_path)
669 }
670}
671
672/// Wraps `Box<dyn ObjectStore>` for easier usage. An [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html), from the
673/// powerful [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate, represents a cloud service.
674#[derive(Debug)]
675pub struct DynObjectStore(pub Box<dyn ObjectStore>);
676
677// Implement Deref to allow access to the inner `ObjectStore` methods
678impl Deref for DynObjectStore {
679 type Target = dyn ObjectStore;
680
681 fn deref(&self) -> &Self::Target {
682 &*self.0
683 }
684}
685
686impl DynObjectStore {
687 #[inline]
688 fn new(store: impl ObjectStore ) -> Self {
689 DynObjectStore(Box::new(store) as Box<dyn ObjectStore>)
690 }
691}
692
693/// The error type for [`CloudFile`](struct.CloudFile.html) methods.
694#[derive(Error, Debug)]
695pub enum CloudFileError {
696 /// An error from [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate
697 #[error("Object store error: {0}")]
698 ObjectStoreError(#[from] object_store::Error),
699
700 /// An path-related error from [`object_store`](https://github.com/apache/arrow-rs/tree/master/object_store) crate
701 #[error("Object store path error: {0}")]
702 ObjectStorePathError(#[from] object_store::path::Error),
703
704 /// An error related to converting bytes into UTF-8
705 #[error("UTF-8 error: {0}")]
706 Utf8Error(#[from] std::str::Utf8Error),
707
708 /// An error related to parsing a URL string
709 #[error("Cannot parse URL: {0} {1}")]
710 CannotParseUrl(String, String),
711
712 /// An error related to creating a URL from a file path
713 #[error("Cannot create URL from this absolute file path: '{0}'")]
714 CannotCreateUrlFromFilePath(String),
715}
716
717#[tokio::test]
718async fn cloud_file_2() -> Result<(), CloudFileError> {
719 let cloud_file = CloudFile::new(
720 "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed",
721
722 )?;
723 assert_eq!(cloud_file.read_file_size().await?, 303);
724 Ok(())
725}
726
727#[tokio::test]
728async fn line_n() -> Result<(), CloudFileError> {
729 use std::str::from_utf8;
730 use futures_util::StreamExt; // Enables `.next()` on streams.
731
732 let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
733 let goal_index = 12;
734
735 let cloud_file = CloudFile::new(url)?;
736 let mut line_chunks = cloud_file.stream_line_chunks().await?;
737 let mut index_iter = 0..;
738 let mut goal_line = None;
739 'outer_loop: while let Some(line_chunk) = line_chunks.next().await {
740 let line_chunk = line_chunk?;
741 let lines = from_utf8(&line_chunk)?.lines();
742 for line in lines {
743 let index = index_iter.next().unwrap(); // safe because we know the iterator is infinite
744 if index == goal_index {
745 goal_line = Some(line.to_string());
746 break 'outer_loop;
747 }
748 }
749 }
750
751 assert_eq!(goal_line, Some("per12 per12 0 0 2 -0.0382707".to_string()));
752 Ok(())
753}
754
755
756#[tokio::test]
757async fn cloud_file_extension() -> Result<(), CloudFileError> {
758 let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
759 let mut cloud_file = CloudFile::new(url)?;
760 assert_eq!(cloud_file.read_file_size().await?, 303);
761 cloud_file.set_extension("fam")?;
762 assert_eq!(cloud_file.read_file_size().await?, 130);
763 Ok(())
764}
765
766// The AWS tests are skipped if credentials are not available.
767#[tokio::test]
768async fn s3_play_cloud() -> Result<(), CloudFileError> {
769 use rusoto_credential::{CredentialsError, ProfileProvider, ProvideAwsCredentials};
770 let credentials = if let Ok(provider) = ProfileProvider::new() {
771 provider.credentials().await
772 } else {
773 Err(CredentialsError::new("No credentials found"))
774 };
775
776 let Ok(credentials) = credentials else {
777 eprintln!("Skipping test because no AWS credentials found");
778 return Ok(());
779 };
780
781 let url = "s3://bedreader/v1/toydata.5chrom.bed";
782 let options = [
783 ("aws_region", "us-west-2"),
784 ("aws_access_key_id", credentials.aws_access_key_id()),
785 ("aws_secret_access_key", credentials.aws_secret_access_key()),
786 ];
787
788 let cloud_file = CloudFile::new_with_options(url, options)?;
789 assert_eq!(cloud_file.read_file_size().await?, 1_250_003);
790 Ok(())
791}
792
793/// Given a local file's absolute path, return a URL string to that file.
794///
795/// # Example
796/// ```
797/// use cloud_file::abs_path_to_url_string;
798///
799/// // Define a sample file_name and expected_url based on the target OS
800/// #[cfg(target_os = "windows")]
801/// let (file_name, expected_url) = (r"M:\data files\small.bed", "file:///M:/data%20files/small.bed");
802///
803/// #[cfg(not(target_os = "windows"))]
804/// let (file_name, expected_url) = (r"/data files/small.bed", "file:///data%20files/small.bed");
805///
806/// let url = abs_path_to_url_string(file_name)?;
807/// assert_eq!(url, expected_url);
808 /// # use cloud_file::CloudFileError;
809 /// # Ok::<(), CloudFileError>(())
810 /// ```
811pub fn abs_path_to_url_string(path: impl AsRef<Path>) -> Result<String, CloudFileError> {
812 let path = path.as_ref();
813 let url = Url::from_file_path(path)
814 .map_err(|_e| {
815 CloudFileError::CannotCreateUrlFromFilePath(path.to_string_lossy().to_string())
816 })?
817 .to_string();
818 Ok(url)
819}
820
821#[test]
822fn readme_1() {
823 use futures_util::StreamExt; // Enables `.next()` on streams.
824 use tokio::runtime::Runtime;
825
826 Runtime::new()
827 .unwrap()
828 .block_on(async {
829 let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
830 let cloud_file = CloudFile::new(url)?;
831 let mut chunks = cloud_file.stream_chunks().await?;
832 let mut newline_count: usize = 0;
833 while let Some(chunk) = chunks.next().await {
834 let chunk = chunk?;
835 newline_count += bytecount::count(&chunk, b'\n');
836 }
837 assert_eq!(newline_count, 500);
838 Ok::<(), CloudFileError>(())
839 })
840 .unwrap();
841}
842
843
844#[tokio::test]
845async fn check_file_signature() -> Result<(), CloudFileError> {
846 let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
847 let cloud_file = CloudFile::new(url)?;
848 let (bytes, size) = cloud_file.read_range_and_file_size(0..3).await?;
849
850 assert_eq!(bytes.len(), 3);
851 assert_eq!(bytes[0], 0x6c);
852 assert_eq!(bytes[1], 0x1b);
853 assert_eq!(bytes[2], 0x01);
854 assert_eq!(size, 303);
855 Ok(())
856}
857
858#[tokio::test]
859async fn from_structs_example() -> Result<(), CloudFileError> {
860 use object_store::{http::HttpBuilder, path::Path as StorePath, ClientOptions};
861 use std::time::Duration;
862
863 let client_options = ClientOptions::new().with_timeout(Duration::from_secs(30));
864 let http = HttpBuilder::new()
865 .with_url("https://raw.githubusercontent.com")
866 .with_client_options(client_options)
867 .build()?;
868 let store_path = StorePath::parse("fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed")?;
869
870 let cloud_file = CloudFile::from_structs(http, store_path);
871 assert_eq!(cloud_file.read_file_size().await?, 303);
872 Ok(())
873}
874
875#[tokio::test]
876async fn local_file() -> Result<(), CloudFileError> {
877 use std::env;
878
879 let apache_url = abs_path_to_url_string(env::var("CARGO_MANIFEST_DIR").unwrap() + "/LICENSE-APACHE")?;
880 let cloud_file = CloudFile::new(&apache_url)?;
881 assert_eq!(cloud_file.count_lines().await?, 175);
882 Ok(())
883}