#![warn(missing_docs)]
#![warn(clippy::pedantic)]
#![allow(
clippy::missing_panics_doc, // LATER: add panics docs
clippy::missing_errors_doc, // LATER: add errors docs
clippy::similar_names,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
clippy::cast_lossless
)]
#![doc = include_str!("../README.md")]
#[cfg(not(target_pointer_width = "64"))]
compile_error!("This code requires a 64-bit target architecture.");
use bytes::Bytes;
use object_store::delimited::newline_delimited_stream;
use core::fmt;
use futures_util::stream::BoxStream;
use futures_util::TryStreamExt;
use object_store::http::HttpBuilder;
#[doc(no_inline)]
pub use object_store::path::Path as StorePath;
use object_store::{GetOptions, GetRange, GetResult, ObjectStore};
use std::ops::{Deref, Range};
use std::path::Path;
use std::sync::Arc;
use thiserror::Error;
use url::Url;
#[derive(Debug)]
pub struct CloudFile {
pub cloud_service: Arc<DynObjectStore>,
pub store_path: StorePath,
}
impl Clone for CloudFile {
fn clone(&self) -> Self {
CloudFile {
cloud_service: self.cloud_service.clone(),
store_path: self.store_path.clone(),
}
}
}
pub const EMPTY_OPTIONS: [(&str, String); 0] = [];
impl CloudFile {
pub fn new(location: impl AsRef<str>) -> Result<CloudFile, CloudFileError> {
let location = location.as_ref();
let url = Url::parse(location)
.map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
let (object_store, store_path): (DynObjectStore, StorePath) =
parse_url_opts_work_around(&url, EMPTY_OPTIONS)?;
let cloud_file = CloudFile {
cloud_service: Arc::new(object_store),
store_path,
};
Ok(cloud_file)
}
#[inline]
pub fn from_structs(store: impl ObjectStore, store_path: StorePath) -> Self {
CloudFile {
cloud_service: Arc::new(DynObjectStore(Box::new(store))),
store_path,
}
}
pub fn new_with_options<I, K, V>(
location: impl AsRef<str>,
options: I,
) -> Result<CloudFile, CloudFileError>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let location = location.as_ref();
let url = Url::parse(location)
.map_err(|e| CloudFileError::CannotParseUrl(location.to_string(), e.to_string()))?;
let (object_store, store_path): (DynObjectStore, StorePath) =
parse_url_opts_work_around(&url, options)?;
let cloud_file = CloudFile {
cloud_service: Arc::new(object_store),
store_path,
};
Ok(cloud_file)
}
pub async fn count_lines(&self) -> Result<usize, CloudFileError> {
let stream = self.stream_chunks().await?;
let newline_count = stream
.try_fold(0, |acc, bytes| async move {
let count = bytecount::count(&bytes, b'\n');
Ok(acc + count) })
.await
.map_err(CloudFileError::ObjectStoreError)?;
Ok(newline_count)
}
pub async fn read_file_size(&self) -> Result<usize, CloudFileError> {
let meta = self.cloud_service.head(&self.store_path).await?;
Ok(meta.size)
}
pub async fn read_range(&self, range: Range<usize>) -> Result<Bytes, CloudFileError> {
Ok(self
.cloud_service
.get_range(&self.store_path, range)
.await?)
}
pub async fn read_ranges(&self, ranges: &[Range<usize>]) -> Result<Vec<Bytes>, CloudFileError> {
Ok(self
.cloud_service
.get_ranges(&self.store_path, ranges)
.await?)
}
pub async fn get_opts(&self, get_options: GetOptions) -> Result<GetResult, CloudFileError> {
Ok(self
.cloud_service
.get_opts(&self.store_path, get_options)
.await?)
}
pub async fn read_range_and_file_size (
&self,
range: Range<usize>,
) -> Result<(Bytes, usize), CloudFileError> {
let get_options = GetOptions {
range: Some(GetRange::Bounded(range)),
..Default::default()
};
let get_result = self
.cloud_service
.get_opts(&self.store_path, get_options)
.await?;
let size: usize = get_result.meta.size;
let bytes = get_result
.bytes()
.await
.map_err(CloudFileError::ObjectStoreError)?;
Ok((bytes, size))
}
pub async fn get(&self) -> Result<GetResult, CloudFileError> {
Ok(self.cloud_service.get(&self.store_path).await?)
}
pub async fn read_all(&self) -> Result<Bytes, CloudFileError> {
let all = self
.cloud_service
.get(&self.store_path)
.await?
.bytes()
.await?;
Ok(all)
}
pub async fn stream_chunks(
&self,
) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
let stream = self
.cloud_service
.get(&self.store_path)
.await?
.into_stream();
Ok(stream)
}
pub async fn stream_line_chunks(
&self,
) -> Result<BoxStream<'static, object_store::Result<Bytes>>, CloudFileError> {
let chunks = self.stream_chunks().await?;
let line_chunks = newline_delimited_stream(chunks);
Ok(Box::pin(line_chunks))
}
pub fn set_extension(&mut self, extension: &str) -> Result<(), CloudFileError> {
let mut path_str = self.store_path.to_string();
if let Some(dot_index) = path_str.rfind('.') {
path_str.truncate(dot_index);
}
if !extension.is_empty() {
path_str.push('.');
path_str.push_str(extension);
}
self.store_path = StorePath::parse(&path_str)?;
Ok(())
}
}
#[allow(clippy::match_bool)]
fn parse_work_around(url: &Url) -> Result<(bool, StorePath), object_store::Error> {
let strip_bucket = || Some(url.path().strip_prefix('/')?.split_once('/')?.1);
let (scheme, path) = match (url.scheme(), url.host_str()) {
("http", Some(_)) => (true, url.path()),
("https", Some(host)) => {
if host.ends_with("dfs.core.windows.net")
|| host.ends_with("blob.core.windows.net")
|| host.ends_with("dfs.fabric.microsoft.com")
|| host.ends_with("blob.fabric.microsoft.com")
{
(false, url.path())
} else if host.ends_with("amazonaws.com") {
match host.starts_with("s3") {
true => (false, strip_bucket().unwrap_or_default()),
false => (false, url.path()),
}
} else if host.ends_with("r2.cloudflarestorage.com") {
(false, strip_bucket().unwrap_or_default())
} else {
(true, url.path())
}
}
_ => (false, url.path()),
};
Ok((scheme, StorePath::from_url_path(path)?))
}
fn parse_url_opts_work_around<I, K, V>(
url: &Url,
options: I,
) -> Result<(DynObjectStore, StorePath), object_store::Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let (is_http, path) = parse_work_around(url)?;
if is_http {
let url = &url[..url::Position::BeforePath];
let path = StorePath::parse(path)?;
let builder = options.into_iter().fold(
<HttpBuilder>::new().with_url(url),
|builder, (key, value)| match key.as_ref().parse() {
Ok(k) => builder.with_config(k, value),
Err(_) => builder,
},
);
let store = DynObjectStore::new(builder.build()?);
Ok((store, path))
} else {
let (store, path) = object_store::parse_url_opts(url, options)?;
Ok((DynObjectStore(store), path))
}
}
impl fmt::Display for CloudFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CloudFile: {:?}", self.store_path)
}
}
#[derive(Debug)]
pub struct DynObjectStore(pub Box<dyn ObjectStore>);
impl Deref for DynObjectStore {
type Target = dyn ObjectStore;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl DynObjectStore {
#[inline]
fn new(store: impl ObjectStore ) -> Self {
DynObjectStore(Box::new(store) as Box<dyn ObjectStore>)
}
}
#[derive(Error, Debug)]
pub enum CloudFileError {
#[error("Object store error: {0}")]
ObjectStoreError(#[from] object_store::Error),
#[error("Object store path error: {0}")]
ObjectStorePathError(#[from] object_store::path::Error),
#[error("UTF-8 error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("Cannot parse URL: {0} {1}")]
CannotParseUrl(String, String),
#[error("Cannot create URL from this absolute file path: '{0}'")]
CannotCreateUrlFromFilePath(String),
}
#[tokio::test]
async fn cloud_file_2() -> Result<(), CloudFileError> {
let cloud_file = CloudFile::new(
"https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed",
)?;
assert_eq!(cloud_file.read_file_size().await?, 303);
Ok(())
}
#[tokio::test]
async fn line_n() -> Result<(), CloudFileError> {
use std::str::from_utf8;
use futures_util::StreamExt;
let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
let goal_index = 12;
let cloud_file = CloudFile::new(url)?;
let mut line_chunks = cloud_file.stream_line_chunks().await?;
let mut index_iter = 0..;
let mut goal_line = None;
'outer_loop: while let Some(line_chunk) = line_chunks.next().await {
let line_chunk = line_chunk?;
let lines = from_utf8(&line_chunk)?.lines();
for line in lines {
let index = index_iter.next().unwrap(); if index == goal_index {
goal_line = Some(line.to_string());
break 'outer_loop;
}
}
}
assert_eq!(goal_line, Some("per12 per12 0 0 2 -0.0382707".to_string()));
Ok(())
}
#[tokio::test]
async fn cloud_file_extension() -> Result<(), CloudFileError> {
let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
let mut cloud_file = CloudFile::new(url)?;
assert_eq!(cloud_file.read_file_size().await?, 303);
cloud_file.set_extension("fam")?;
assert_eq!(cloud_file.read_file_size().await?, 130);
Ok(())
}
#[tokio::test]
async fn s3_play_cloud() -> Result<(), CloudFileError> {
use rusoto_credential::{CredentialsError, ProfileProvider, ProvideAwsCredentials};
let credentials = if let Ok(provider) = ProfileProvider::new() {
provider.credentials().await
} else {
Err(CredentialsError::new("No credentials found"))
};
let Ok(credentials) = credentials else {
eprintln!("Skipping test because no AWS credentials found");
return Ok(());
};
let url = "s3://bedreader/v1/toydata.5chrom.bed";
let options = [
("aws_region", "us-west-2"),
("aws_access_key_id", credentials.aws_access_key_id()),
("aws_secret_access_key", credentials.aws_secret_access_key()),
];
let cloud_file = CloudFile::new_with_options(url, options)?;
assert_eq!(cloud_file.read_file_size().await?, 1_250_003);
Ok(())
}
pub fn abs_path_to_url_string(path: impl AsRef<Path>) -> Result<String, CloudFileError> {
let path = path.as_ref();
let url = Url::from_file_path(path)
.map_err(|_e| {
CloudFileError::CannotCreateUrlFromFilePath(path.to_string_lossy().to_string())
})?
.to_string();
Ok(url)
}
#[test]
fn readme_1() {
use futures_util::StreamExt; use tokio::runtime::Runtime;
Runtime::new()
.unwrap()
.block_on(async {
let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/toydata.5chrom.fam";
let cloud_file = CloudFile::new(url)?;
let mut chunks = cloud_file.stream_chunks().await?;
let mut newline_count: usize = 0;
while let Some(chunk) = chunks.next().await {
let chunk = chunk?;
newline_count += bytecount::count(&chunk, b'\n');
}
assert_eq!(newline_count, 500);
Ok::<(), CloudFileError>(())
})
.unwrap();
}
#[tokio::test]
async fn check_file_signature() -> Result<(), CloudFileError> {
let url = "https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed";
let cloud_file = CloudFile::new(url)?;
let (bytes, size) = cloud_file.read_range_and_file_size(0..3).await?;
assert_eq!(bytes.len(), 3);
assert_eq!(bytes[0], 0x6c);
assert_eq!(bytes[1], 0x1b);
assert_eq!(bytes[2], 0x01);
assert_eq!(size, 303);
Ok(())
}
#[tokio::test]
async fn from_structs_example() -> Result<(), CloudFileError> {
use object_store::{http::HttpBuilder, path::Path as StorePath, ClientOptions};
use std::time::Duration;
let client_options = ClientOptions::new().with_timeout(Duration::from_secs(30));
let http = HttpBuilder::new()
.with_url("https://raw.githubusercontent.com")
.with_client_options(client_options)
.build()?;
let store_path = StorePath::parse("fastlmm/bed-sample-files/main/plink_sim_10s_100v_10pmiss.bed")?;
let cloud_file = CloudFile::from_structs(http, store_path);
assert_eq!(cloud_file.read_file_size().await?, 303);
Ok(())
}
#[tokio::test]
async fn local_file() -> Result<(), CloudFileError> {
use std::env;
let apache_url = abs_path_to_url_string(env::var("CARGO_MANIFEST_DIR").unwrap() + "/LICENSE-APACHE")?;
let cloud_file = CloudFile::new(&apache_url)?;
assert_eq!(cloud_file.count_lines().await?, 175);
Ok(())
}