use crate::prelude::*;
async fn glean_arrow_schema(df: &DataFrame) -> ElusionResult<DeltaSchemaRef> {
let limited_df = df.clone().limit(0, Some(1))
.map_err(|e| ElusionError::InvalidOperation {
operation: "Schema Inference".to_string(),
reason: format!("Failed to limit DataFrame: {}", e),
suggestion: "💡 Check if the DataFrame is valid".to_string()
})?;
let batches = limited_df.collect().await
.map_err(|e| ElusionError::SchemaError {
message: format!("Failed to collect sample batch: {}", e),
schema: None,
suggestion: "💡 Verify DataFrame contains valid data".to_string()
})?;
if let Some(first_batch) = batches.get(0) {
Ok(first_batch.schema())
} else {
let empty_fields: Vec<Field> = vec![];
let empty_schema = Schema::new(empty_fields);
Ok(Arc::new(empty_schema))
}
}
fn arrow_to_delta_type(arrow_type: &ArrowDataType) -> DeltaType {
match arrow_type {
ArrowDataType::Boolean => DeltaType::BOOLEAN,
ArrowDataType::Int8 => DeltaType::BYTE,
ArrowDataType::Int16 => DeltaType::SHORT,
ArrowDataType::Int32 => DeltaType::INTEGER,
ArrowDataType::Int64 => DeltaType::LONG,
ArrowDataType::Float32 => DeltaType::FLOAT,
ArrowDataType::Float64 => DeltaType::DOUBLE,
ArrowDataType::Utf8 => DeltaType::STRING,
ArrowDataType::Date32 => DeltaType::DATE,
ArrowDataType::Date64 => DeltaType::DATE,
ArrowDataType::Timestamp(TimeUnit::Second, _) => DeltaType::TIMESTAMP,
ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => DeltaType::TIMESTAMP,
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => DeltaType::TIMESTAMP,
ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => DeltaType::TIMESTAMP,
ArrowDataType::Binary => DeltaType::BINARY,
_ => DeltaType::STRING,
}
}
#[derive(Clone)]
pub struct DeltaPathManager {
base_path: PathBuf,
}
impl DeltaPathManager {
pub fn new<P: AsRef<LocalPath>>(path: P) -> Self {
let normalized = path
.as_ref()
.to_string_lossy()
.replace('\\', "/")
.trim_end_matches('/')
.to_string();
Self {
base_path: PathBuf::from(normalized),
}
}
pub fn base_path_str(&self) -> String {
self.base_path.to_string_lossy().replace('\\', "/")
}
pub fn delta_log_path(&self) -> DeltaPath {
let base = self.base_path_str();
DeltaPath::from(format!("{base}/_delta_log"))
}
pub fn table_path(&self) -> String {
self.base_path_str()
}
pub fn drive_prefix(&self) -> String {
let base_path = self.base_path_str();
if let Some(colon_pos) = base_path.find(':') {
base_path[..colon_pos + 2].to_string()
} else {
"/".to_string()
}
}
pub fn normalize_uri(&self, uri: &str) -> String {
let drive_prefix = self.drive_prefix();
let path = uri.trim_start_matches(|c| c != '/' && c != '\\')
.trim_start_matches(['/', '\\']);
format!("{}{}", drive_prefix, path).replace('\\', "/")
}
pub fn is_delta_table(&self) -> bool {
let delta_log = self.base_path.join("_delta_log");
let delta_log_exists = delta_log.is_dir();
if delta_log_exists {
if let Ok(entries) = fs::read_dir(&delta_log) {
for entry in entries {
if let Ok(entry) = entry {
if let Some(ext) = entry.path().extension() {
if ext == "json" {
return true;
}
}
}
}
}
}
false
}
pub fn table_url(&self) -> Result<url::Url, DeltaTableError> {
let path = self.base_path_str();
let abs_path = std::path::Path::new(&path);
let url = url::Url::from_file_path(abs_path)
.map_err(|_| DeltaTableError::Generic(
format!("Invalid table path '{}': could not convert to file URL", path)
))?;
Ok(url)
}
}
pub async fn write_to_delta_impl(
df: &DataFrame,
path: &str,
partition_columns: Option<Vec<String>>,
overwrite: bool,
write_mode: WriteMode,
) -> Result<(), DeltaTableError> {
validate_delta_path_simple(path)?;
let path_manager = DeltaPathManager::new(path);
let table_url = path_manager.table_url()?;
let batches = df
.clone()
.collect()
.await
.map_err(|e| DeltaTableError::Generic(format!("DataFusion collect error: {e}")))?;
if batches.is_empty() {
return Err(DeltaTableError::Generic("No data to write".to_string()));
}
let arrow_schema_ref = glean_arrow_schema(df)
.await
.map_err(|e| DeltaTableError::Generic(format!("Could not glean Arrow schema: {e}")))?;
let delta_fields: Vec<StructField> = arrow_schema_ref
.fields()
.iter()
.map(|field| {
StructField::new(
field.name().clone(),
arrow_to_delta_type(field.data_type()),
field.is_nullable(),
)
})
.collect();
let parts = partition_columns.clone().unwrap_or_default();
let mut config: HashMap<String, Option<String>> = HashMap::new();
config.insert("delta.minWriterVersion".to_string(), Some("7".to_string()));
config.insert("delta.minReaderVersion".to_string(), Some("3".to_string()));
let table: DeltaTable = if overwrite {
if let Err(e) = fs::remove_dir_all(&path_manager.base_path) {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(DeltaTableError::Generic(format!(
"Failed to remove existing directory at '{}': {e}", path
)));
}
}
println!("🗑️ Cleared existing Delta table, creating fresh...");
#[allow(deprecated)]
DeltaOps::try_from_url(table_url.clone())
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to init DeltaOps: {e}")))?
.create()
.with_columns(delta_fields.clone())
.with_partition_columns(parts.clone())
.with_save_mode(SaveMode::Overwrite)
.with_configuration(config.clone())
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to create Delta table: {e}")))?
} else {
let table_exists = DeltaTableBuilder::from_url(table_url.clone())
.map(|b| b.build().map(|t| t.version().is_some()).unwrap_or(false))
.unwrap_or(false);
if table_exists {
println!("📂 Appending to existing Delta table...");
DeltaTableBuilder::from_url(table_url.clone())
.map_err(|e| DeltaTableError::Generic(format!("Failed to build Delta table: {e}")))?
.load()
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to load Delta table: {e}")))?
} else {
println!("🆕 Creating new Delta table for append...");
#[allow(deprecated)]
DeltaOps::try_from_url(table_url.clone())
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to init DeltaOps: {e}")))?
.create()
.with_columns(delta_fields.clone())
.with_partition_columns(parts.clone())
.with_save_mode(SaveMode::Append)
.with_configuration(config.clone())
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to create Delta table: {e}")))?
}
};
println!("✅ Delta table ready at version: {:?}", table.version());
let delta_schema_ref = convert_schema_to_delta(&arrow_schema_ref)
.map_err(|e| DeltaTableError::Generic(format!("Schema conversion failed: {e}")))?;
let mut writer_config = HashMap::new();
writer_config.insert("delta.protocol.minWriterVersion".to_string(), "7".to_string());
writer_config.insert("delta.protocol.minReaderVersion".to_string(), "3".to_string());
let mut writer = RecordBatchWriter::try_new(
table_url.as_str(),
delta_schema_ref,
partition_columns,
Some(writer_config),
)?;
let batch_count = batches.len();
for (i, batch) in batches.into_iter().enumerate() {
let delta_batch = convert_batch_to_delta(&batch)
.map_err(|e| DeltaTableError::Generic(format!("Batch {i} conversion failed: {e}")))?;
writer.write_with_mode(delta_batch, write_mode).await?;
println!(" ✍️ Wrote batch {}/{}", i + 1, batch_count);
}
let mut table = table;
let version = writer
.flush_and_commit(&mut table)
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to flush and commit: {e}")))?;
println!("✅ Wrote data to Delta table at version: {version}");
Ok(())
}
fn convert_schema_to_delta(
schema: &DeltaSchemaRef,
) -> Result<deltalake::arrow::datatypes::SchemaRef, String> {
use datafusion::arrow::ipc::writer::StreamWriter as DFStreamWriter;
let mut buf = Vec::new();
{
let mut writer = DFStreamWriter::try_new(&mut buf, schema.as_ref())
.map_err(|e| format!("IPC writer error: {e}"))?;
writer.finish().map_err(|e| format!("IPC finish error: {e}"))?;
}
let cursor = std::io::Cursor::new(buf);
let reader = deltalake::arrow::ipc::reader::StreamReader::try_new(cursor, None)
.map_err(|e| format!("IPC reader error: {e}"))?;
Ok(reader.schema())
}
fn convert_batch_to_delta(
batch: &datafusion::arrow::array::RecordBatch,
) -> Result<deltalake::arrow::array::RecordBatch, String> {
use datafusion::arrow::ipc::writer::StreamWriter as DFStreamWriter;
use deltalake::arrow::ipc::reader::StreamReader as DeltaStreamReader;
let mut buf = Vec::new();
{
let mut writer = DFStreamWriter::try_new(&mut buf, batch.schema().as_ref())
.map_err(|e| format!("IPC writer error: {e}"))?;
writer.write(batch).map_err(|e| format!("IPC write error: {e}"))?;
writer.finish().map_err(|e| format!("IPC finish error: {e}"))?;
}
let cursor = std::io::Cursor::new(buf);
let mut reader = DeltaStreamReader::try_new(cursor, None)
.map_err(|e| format!("IPC reader error: {e}"))?;
reader
.next()
.ok_or_else(|| "No batch in IPC stream".to_string())?
.map_err(|e| format!("IPC batch read error: {e}"))
}
fn validate_delta_path_simple(path: &str) -> Result<(), DeltaTableError> {
let common_file_extensions = [
".csv", ".json", ".parquet", ".txt", ".xlsx",
".xml", ".avro", ".orc", ".sql", ".yaml", ".yml",
];
let path_lower = path.to_lowercase();
for ext in &common_file_extensions {
if path_lower.ends_with(ext) {
return Err(DeltaTableError::Generic(format!(
"❌ Invalid Delta table path. Delta tables are directories, not files. \
Remove the '{}' extension from '{}'",
ext, path
)));
}
}
Ok(())
}