use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct DeltaConfig {
pub base_path: PathBuf,
pub checkpoint_interval: usize,
pub max_files_to_scan: usize,
}
impl Default for DeltaConfig {
fn default() -> Self {
Self {
base_path: PathBuf::from("."),
checkpoint_interval: 10,
max_files_to_scan: 10_000,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub enum DeltaAction {
Add {
path: String,
size: u64,
modification_time: u64,
data_change: bool,
partition_values: HashMap<String, String>,
stats_json: Option<String>,
},
Remove {
path: String,
deletion_timestamp: u64,
data_change: bool,
},
Metadata {
schema: String,
partition_columns: Vec<String>,
description: Option<String>,
configuration: HashMap<String, String>,
},
Protocol {
min_reader_version: u32,
min_writer_version: u32,
},
CommitInfo {
timestamp: i64,
operation: String,
operation_parameters: HashMap<String, String>,
},
}
#[derive(Debug, Clone)]
pub struct DeltaVersion {
pub version: u64,
pub timestamp: u64,
pub actions: Vec<DeltaAction>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub data_type: String,
pub nullable: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Schema {
pub columns: Vec<ColumnSchema>,
}
impl Schema {
pub fn new(columns: Vec<ColumnSchema>) -> Self {
Self { columns }
}
pub fn to_json(&self) -> Result<String, DeltaError> {
let fields: Vec<serde_json::Value> = self
.columns
.iter()
.map(|c| {
serde_json::json!({
"name": c.name,
"type": c.data_type,
"nullable": c.nullable,
})
})
.collect();
let schema_obj = serde_json::json!({
"type": "struct",
"fields": fields,
});
serde_json::to_string(&schema_obj)
.map_err(|e| DeltaError::Serialization(format!("schema to JSON: {e}")))
}
pub fn from_json(json_str: &str) -> Result<Self, DeltaError> {
let v: serde_json::Value = serde_json::from_str(json_str)
.map_err(|e| DeltaError::Parse(format!("schema JSON parse: {e}")))?;
let fields = v
.get("fields")
.and_then(|f| f.as_array())
.ok_or_else(|| DeltaError::Parse("missing 'fields' array in schema".to_string()))?;
let mut columns = Vec::with_capacity(fields.len());
for field in fields {
let name = field
.get("name")
.and_then(|n| n.as_str())
.unwrap_or_default()
.to_string();
let data_type = field
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("string")
.to_string();
let nullable = field
.get("nullable")
.and_then(|n| n.as_bool())
.unwrap_or(true);
columns.push(ColumnSchema {
name,
data_type,
nullable,
});
}
Ok(Self { columns })
}
pub fn column_names(&self) -> Vec<&str> {
self.columns.iter().map(|c| c.name.as_str()).collect()
}
}
#[derive(Debug, Clone)]
pub struct DeltaTable {
pub config: DeltaConfig,
pub version: u64,
pub active_files: HashMap<String, FileInfo>,
pub schema: Option<Schema>,
pub partition_columns: Vec<String>,
pub protocol: Option<(u32, u32)>,
}
#[derive(Debug, Clone)]
pub struct FileInfo {
pub path: String,
pub size: u64,
pub modification_time: u64,
pub partition_values: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct DeltaTransaction {
pub actions: Vec<DeltaAction>,
pub target_version: u64,
}
impl DeltaTransaction {
pub fn new(target_version: u64) -> Self {
Self {
actions: Vec::new(),
target_version,
}
}
pub fn add_action(&mut self, action: DeltaAction) {
self.actions.push(action);
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum DeltaError {
Io(std::io::Error),
Parse(String),
Serialization(String),
VersionConflict {
expected: u64,
actual: u64,
},
TableNotFound(String),
SchemaError(String),
Other(String),
}
impl std::fmt::Display for DeltaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeltaError::Io(e) => write!(f, "Delta I/O error: {e}"),
DeltaError::Parse(msg) => write!(f, "Delta parse error: {msg}"),
DeltaError::Serialization(msg) => write!(f, "Delta serialization error: {msg}"),
DeltaError::VersionConflict { expected, actual } => {
write!(
f,
"Delta version conflict: expected {expected}, found {actual}"
)
}
DeltaError::TableNotFound(path) => write!(f, "Delta table not found: {path}"),
DeltaError::SchemaError(msg) => write!(f, "Delta schema error: {msg}"),
DeltaError::Other(msg) => write!(f, "Delta error: {msg}"),
}
}
}
impl std::error::Error for DeltaError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
DeltaError::Io(e) => Some(e),
_ => None,
}
}
}
impl From<std::io::Error> for DeltaError {
fn from(e: std::io::Error) -> Self {
DeltaError::Io(e)
}
}
impl From<DeltaError> for crate::error::IoError {
fn from(e: DeltaError) -> Self {
crate::error::IoError::Other(format!("{e}"))
}
}