use std::str::FromStr;
#[cfg(feature = "aws")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "aws")]
pub use object_store::aws::AmazonS3ConfigKey;
#[cfg(feature = "azure")]
pub use object_store::azure::AzureConfigKey;
#[cfg(feature = "azure")]
use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcp")]
use object_store::gcp::GoogleCloudStorageBuilder;
#[cfg(feature = "gcp")]
pub use object_store::gcp::GoogleConfigKey;
#[cfg(feature = "async")]
use object_store::ObjectStore;
use polars_error::{polars_bail, polars_err};
#[cfg(feature = "serde-lazy")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "async")]
use url::Url;
use crate::error::{PolarsError, PolarsResult};
#[allow(dead_code)]
type Configs<T> = Vec<(T, String)>;
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
pub struct CloudOptions {
#[cfg(feature = "aws")]
aws: Option<Configs<AmazonS3ConfigKey>>,
#[cfg(feature = "azure")]
azure: Option<Configs<AzureConfigKey>>,
#[cfg(feature = "gcp")]
gcp: Option<Configs<GoogleConfigKey>>,
}
#[allow(dead_code)]
fn parsed_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
config: I,
) -> PolarsResult<Configs<T>>
where
T: FromStr + Eq + std::hash::Hash,
{
config
.into_iter()
.map(|(key, val)| {
T::from_str(key.as_ref())
.map_err(
|_| polars_err!(ComputeError: "unknown configuration key: {}", key.as_ref()),
)
.map(|typed_key| (typed_key, val.into()))
})
.collect::<PolarsResult<Configs<T>>>()
}
pub enum CloudType {
Aws,
Azure,
File,
Gcp,
}
impl FromStr for CloudType {
type Err = PolarsError;
#[cfg(feature = "async")]
fn from_str(url: &str) -> Result<Self, Self::Err> {
let parsed = Url::parse(url).map_err(polars_error::to_compute_err)?;
Ok(match parsed.scheme() {
"s3" => Self::Aws,
"az" | "adl" | "abfs" => Self::Azure,
"gs" | "gcp" => Self::Gcp,
"file" => Self::File,
_ => polars_bail!(ComputeError: "unknown url scheme"),
})
}
#[cfg(not(feature = "async"))]
fn from_str(_s: &str) -> Result<Self, Self::Err> {
polars_bail!(ComputeError: "at least one of the cloud features must be enabled");
}
}
impl CloudOptions {
#[cfg(feature = "aws")]
pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.aws = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AmazonS3ConfigKey>>(),
);
self
}
#[cfg(feature = "aws")]
pub fn build_aws(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self
.aws
.as_ref()
.ok_or_else(|| polars_err!(ComputeError: "`aws` configuration missing"))?;
AmazonS3Builder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(polars_error::to_compute_err)
}
#[cfg(feature = "azure")]
pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.azure = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AzureConfigKey>>(),
);
self
}
#[cfg(feature = "azure")]
pub fn build_azure(&self, container_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self
.azure
.as_ref()
.ok_or_else(|| polars_err!(ComputeError: "`azure` configuration missing"))?;
MicrosoftAzureBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_container_name(container_name).build())
.map_err(polars_error::to_compute_err)
}
#[cfg(feature = "gcp")]
pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.gcp = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<GoogleConfigKey>>(),
);
self
}
#[cfg(feature = "gcp")]
pub fn build_gcp(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self
.gcp
.as_ref()
.ok_or_else(|| polars_err!(ComputeError: "`gcp` configuration missing"))?;
GoogleCloudStorageBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(polars_error::to_compute_err)
}
#[allow(unused_variables)]
pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
url: &str,
config: I,
) -> PolarsResult<Self> {
match CloudType::from_str(url)? {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
parsed_untyped_config::<AmazonS3ConfigKey, _>(config)
.map(|aws| Self::default().with_aws(aws))
}
#[cfg(not(feature = "aws"))]
{
polars_bail!(ComputeError: "'aws' feature is not enabled");
}
}
CloudType::Azure => {
#[cfg(feature = "azure")]
{
parsed_untyped_config::<AzureConfigKey, _>(config)
.map(|azure| Self::default().with_azure(azure))
}
#[cfg(not(feature = "azure"))]
{
polars_bail!(ComputeError: "'azure' feature is not enabled");
}
}
CloudType::File => Ok(Self::default()),
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
parsed_untyped_config::<GoogleConfigKey, _>(config)
.map(|gcp| Self::default().with_gcp(gcp))
}
#[cfg(not(feature = "gcp"))]
{
polars_bail!(ComputeError: "'gcp' feature is not enabled");
}
}
}
}
}