use crate::dataframe::DataFrame;
use polars::prelude::{LazyFrame, PolarsError, ScanArgsParquet, UnionArgs};
use std::path::Path;
use url::Url;
#[cfg(feature = "delta")]
fn concat_lazy_frames(lfs: Vec<LazyFrame>) -> Result<LazyFrame, PolarsError> {
if lfs.is_empty() {
return Err(PolarsError::ComputeError("read_delta: no files".into()));
}
polars::prelude::concat(lfs, UnionArgs::default())
}
#[cfg(feature = "delta")]
pub fn read_delta(path: impl AsRef<Path>, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
read_delta_with_version(path, None, case_sensitive)
}
#[cfg(feature = "delta")]
pub fn read_delta_with_version(
path: impl AsRef<Path>,
version: Option<i64>,
case_sensitive: bool,
) -> Result<DataFrame, PolarsError> {
use deltalake::DeltaTableBuilder;
use tokio::runtime::Runtime;
let path = path.as_ref();
let table_uri = path_to_table_uri(path)?;
let rt = Runtime::new().map_err(|e| {
PolarsError::ComputeError(format!("read_delta: failed to create runtime: {}", e).into())
})?;
let url = Url::parse(table_uri.as_str()).map_err(|e| {
PolarsError::ComputeError(format!("read_delta: invalid table URI: {}", e).into())
})?;
let table = rt.block_on(async {
let builder =
DeltaTableBuilder::from_url(url.clone()).map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(format!("read_delta: {}", e).into())
})?;
let result = if let Some(v) = version {
builder.with_version(v).load().await
} else {
builder.load().await
};
result.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(format!("read_delta: {}", e).into())
})
})?;
let uris: Vec<String> = table
.get_file_uris()
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(format!("read_delta: get_file_uris: {}", e).into())
})?
.collect();
if uris.is_empty() {
return Ok(DataFrame::from_polars_with_options(
polars::prelude::DataFrame::default(),
case_sensitive,
));
}
let mut lfs: Vec<LazyFrame> = Vec::with_capacity(uris.len());
for uri in &uris {
let parquet_path = uri_to_parquet_path(uri)?;
let lf = LazyFrame::scan_parquet(parquet_path, ScanArgsParquet::default())?;
lfs.push(lf);
}
let combined = concat_lazy_frames(lfs)?;
let pl_df = combined.collect()?;
Ok(DataFrame::from_polars_with_options(pl_df, case_sensitive))
}
#[cfg(feature = "delta")]
fn path_to_table_uri(path: &Path) -> Result<String, PolarsError> {
let s = path.to_string_lossy();
if s.starts_with("file://") {
return Ok(s.to_string());
}
let abs_path = if path.exists() {
path.canonicalize().map_err(|e| {
PolarsError::ComputeError(format!("path_to_table_uri: canonicalize: {}", e).into())
})?
} else {
let base = path.parent().unwrap_or(Path::new("."));
let name = path.file_name().unwrap_or(std::ffi::OsStr::new("."));
if base.exists() {
base.canonicalize()
.map_err(|e| {
PolarsError::ComputeError(
format!("path_to_table_uri: canonicalize parent: {}", e).into(),
)
})?
.join(name)
} else {
std::env::current_dir()
.unwrap_or_else(|_| Path::new(".").to_path_buf())
.join(path)
}
};
let path_str = abs_path.to_string_lossy();
#[cfg(target_os = "windows")]
let uri = format!("file:///{}", path_str.replace('\\', "/"));
#[cfg(not(target_os = "windows"))]
let uri = format!("file://{}", path_str);
Ok(uri)
}
#[cfg(feature = "delta")]
fn uri_to_parquet_path(uri: &str) -> Result<std::path::PathBuf, PolarsError> {
if let Some(stripped) = uri.strip_prefix("file://") {
#[cfg(target_os = "windows")]
{
let s = stripped.trim_start_matches('/').replace('/', "\\");
return Ok(std::path::PathBuf::from(s));
}
#[cfg(not(target_os = "windows"))]
{
let s = stripped.trim_start_matches('/');
return Ok(std::path::PathBuf::from(format!("/{}", s)));
}
}
if uri.starts_with('/')
|| (cfg!(target_os = "windows") && uri.len() >= 2 && uri.chars().nth(1) == Some(':'))
{
return Ok(std::path::PathBuf::from(uri));
}
Err(PolarsError::ComputeError(
format!("read_delta: unsupported URI (local file only): {}", uri).into(),
))
}
#[cfg(feature = "delta")]
pub fn write_delta(
df: &polars::prelude::DataFrame,
path: impl AsRef<Path>,
overwrite: bool,
) -> Result<(), PolarsError> {
use deltalake::operations::convert_to_delta::ConvertToDeltaBuilder;
use std::fs;
use tokio::runtime::Runtime;
let path = path.as_ref();
if df.height() == 0 {
return Ok(());
}
if overwrite {
if path.exists() {
let _ = fs::remove_dir_all(path);
}
fs::create_dir_all(path).map_err(|e| {
PolarsError::ComputeError(format!("write_delta: create_dir_all: {}", e).into())
})?;
}
let table_uri = path_to_table_uri(path)?;
let rt = Runtime::new().map_err(|e| {
PolarsError::ComputeError(format!("write_delta: failed to create runtime: {}", e).into())
})?;
let url = Url::parse(table_uri.as_str()).map_err(|e| {
PolarsError::ComputeError(format!("write_delta: invalid table URI: {}", e).into())
})?;
rt.block_on(async {
let builder = deltalake::DeltaTableBuilder::from_url(url.clone()).map_err(
|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(format!("write_delta: {}", e).into())
},
)?;
let table_result = builder
.load()
.await
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(format!("write_delta: {}", e).into())
});
if overwrite {
let parquet_path = path.join("part-00000.parquet");
let mut file =
std::io::BufWriter::new(fs::File::create(&parquet_path).map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: create parquet file: {}", e).into(),
)
})?);
let mut df_mut = df.clone();
polars::prelude::ParquetWriter::new(&mut file)
.finish(&mut df_mut)
.map_err(|e| {
PolarsError::ComputeError(format!("write_delta: parquet write: {}", e).into())
})?;
drop(file);
ConvertToDeltaBuilder::new()
.with_location(&table_uri)
.await
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(
format!("write_delta: convert_to_delta: {}", e).into(),
)
})?;
} else {
match table_result {
Ok(table) => {
let uris: Vec<String> = table
.get_file_uris()
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(
format!("write_delta: get_file_uris: {}", e).into(),
)
})?
.collect();
let mut lfs: Vec<LazyFrame> = Vec::with_capacity(uris.len() + 1);
for uri in &uris {
let parquet_path = uri_to_parquet_path(uri)?;
lfs.push(LazyFrame::scan_parquet(
parquet_path,
ScanArgsParquet::default(),
)?);
}
let mut combined = if lfs.is_empty() {
polars::prelude::DataFrame::default()
} else {
concat_lazy_frames(lfs)?.collect()?
};
combined.vstack_mut(df)?;
let _ = fs::remove_dir_all(path);
fs::create_dir_all(path).map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: create_dir_all: {}", e).into(),
)
})?;
let parquet_path = path.join("part-00000.parquet");
let mut file =
std::io::BufWriter::new(fs::File::create(&parquet_path).map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: create parquet file: {}", e).into(),
)
})?);
polars::prelude::ParquetWriter::new(&mut file)
.finish(&mut combined)
.map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: parquet write: {}", e).into(),
)
})?;
drop(file);
ConvertToDeltaBuilder::new()
.with_location(&table_uri)
.await
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(
format!("write_delta: convert_to_delta: {}", e).into(),
)
})?;
}
Err(_) => {
fs::create_dir_all(path).map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: create_dir_all: {}", e).into(),
)
})?;
let parquet_path = path.join("part-00000.parquet");
let mut file =
std::io::BufWriter::new(fs::File::create(&parquet_path).map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: create parquet file: {}", e).into(),
)
})?);
let mut df_mut = df.clone();
polars::prelude::ParquetWriter::new(&mut file)
.finish(&mut df_mut)
.map_err(|e| {
PolarsError::ComputeError(
format!("write_delta: parquet write: {}", e).into(),
)
})?;
drop(file);
ConvertToDeltaBuilder::new()
.with_location(&table_uri)
.await
.map_err(|e: deltalake::DeltaTableError| {
PolarsError::ComputeError(
format!("write_delta: convert_to_delta: {}", e).into(),
)
})?;
}
}
}
Ok(())
})
}