pub mod cloud;
pub mod local;
pub use cloud::{
AzureConnector, CloudConfig, CloudConnector, CloudConnectorFactory, CloudCredentials,
CloudObject, CloudProvider, FileFormat, GCSConnector, ObjectMetadata, S3Connector,
};
pub use local::LocalConnector;
use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
pub enum DataConnector {
S3(cloud::S3Connector),
GCS(cloud::GCSConnector),
Azure(cloud::AzureConnector),
Local(local::LocalConnector),
}
impl DataConnector {
pub fn s3() -> Self {
Self::S3(cloud::S3Connector::new())
}
pub fn gcs() -> Self {
Self::GCS(cloud::GCSConnector::new())
}
pub fn azure() -> Self {
Self::Azure(cloud::AzureConnector::new())
}
pub fn local(base_path: impl Into<std::path::PathBuf>) -> Self {
Self::Local(local::LocalConnector::new(base_path))
}
pub fn from_connection_string(connection_string: &str) -> Result<Self> {
if connection_string.starts_with("s3://") {
Ok(Self::s3())
} else if connection_string.starts_with("gs://") {
Ok(Self::gcs())
} else if connection_string.starts_with("azure://") {
Ok(Self::azure())
} else if connection_string.starts_with("local://") {
let base = connection_string.trim_start_matches("local://");
Ok(Self::local(base))
} else {
Err(Error::InvalidOperation(format!(
"Unsupported connection string: {}",
connection_string
)))
}
}
}
pub struct DataSource {
connector: DataConnector,
}
impl DataSource {
pub fn new(connector: DataConnector) -> Self {
Self { connector }
}
pub async fn read_cloud(&self, bucket: &str, key: &str) -> Result<DataFrame> {
use cloud::CloudConnector;
let format = FileFormat::from_extension(key).unwrap_or(FileFormat::CSV {
delimiter: ',',
has_header: true,
});
match &self.connector {
DataConnector::S3(cloud) => cloud.read_dataframe(bucket, key, format).await,
DataConnector::GCS(cloud) => cloud.read_dataframe(bucket, key, format).await,
DataConnector::Azure(cloud) => cloud.read_dataframe(bucket, key, format).await,
DataConnector::Local(cloud) => cloud.read_dataframe(bucket, key, format).await,
}
}
pub async fn write_cloud(&self, df: &DataFrame, bucket: &str, key: &str) -> Result<()> {
use cloud::CloudConnector;
let format = FileFormat::from_extension(key).unwrap_or(FileFormat::CSV {
delimiter: ',',
has_header: true,
});
match &self.connector {
DataConnector::S3(cloud) => cloud.write_dataframe(df, bucket, key, format).await,
DataConnector::GCS(cloud) => cloud.write_dataframe(df, bucket, key, format).await,
DataConnector::Azure(cloud) => cloud.write_dataframe(df, bucket, key, format).await,
DataConnector::Local(cloud) => cloud.write_dataframe(df, bucket, key, format).await,
}
}
}
impl DataFrame {
pub async fn read_from(connection_string: &str, query_or_path: &str) -> Result<Self> {
let connector = DataConnector::from_connection_string(connection_string)?;
let source = DataSource::new(connector);
let parts: Vec<&str> = query_or_path.split('/').collect();
if parts.len() >= 2 {
let bucket = parts[0];
let key = parts[1..].join("/");
source.read_cloud(bucket, &key).await
} else {
Err(Error::InvalidOperation(
"Invalid cloud storage path".to_string(),
))
}
}
pub async fn write_to(&self, connection_string: &str, table_or_path: &str) -> Result<()> {
let connector = DataConnector::from_connection_string(connection_string)?;
let source = DataSource::new(connector);
let parts: Vec<&str> = table_or_path.split('/').collect();
if parts.len() >= 2 {
let bucket = parts[0];
let key = parts[1..].join("/");
source.write_cloud(self, bucket, &key).await
} else {
Err(Error::InvalidOperation(
"Invalid cloud storage path".to_string(),
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_data_connector_from_connection_string() {
let s3_connector = DataConnector::from_connection_string("s3://bucket/path");
assert!(s3_connector.is_ok());
assert!(matches!(
s3_connector.expect("operation should succeed"),
DataConnector::S3(_)
));
let gcs_connector = DataConnector::from_connection_string("gs://bucket/path");
assert!(gcs_connector.is_ok());
assert!(matches!(
gcs_connector.expect("operation should succeed"),
DataConnector::GCS(_)
));
let local_connector = DataConnector::from_connection_string("local:///tmp/test");
assert!(local_connector.is_ok());
assert!(matches!(
local_connector.expect("operation should succeed"),
DataConnector::Local(_)
));
}
#[test]
fn test_data_source_creation() {
let connector = DataConnector::s3();
let _source = DataSource::new(connector);
}
}