1use std::collections::HashMap;
7use std::path::PathBuf;
8
9#[derive(Debug, Clone)]
11pub struct DeltaConfig {
12 pub base_path: PathBuf,
14 pub checkpoint_interval: usize,
16 pub max_files_to_scan: usize,
18}
19
20impl Default for DeltaConfig {
21 fn default() -> Self {
22 Self {
23 base_path: PathBuf::from("."),
24 checkpoint_interval: 10,
25 max_files_to_scan: 10_000,
26 }
27 }
28}
29
30#[non_exhaustive]
36#[derive(Debug, Clone, PartialEq)]
37pub enum DeltaAction {
38 Add {
40 path: String,
42 size: u64,
44 modification_time: u64,
46 data_change: bool,
48 partition_values: HashMap<String, String>,
50 stats_json: Option<String>,
52 },
53 Remove {
55 path: String,
57 deletion_timestamp: u64,
59 data_change: bool,
61 },
62 Metadata {
64 schema: String,
66 partition_columns: Vec<String>,
68 description: Option<String>,
70 configuration: HashMap<String, String>,
72 },
73 Protocol {
75 min_reader_version: u32,
77 min_writer_version: u32,
79 },
80 CommitInfo {
82 timestamp: i64,
84 operation: String,
86 operation_parameters: HashMap<String, String>,
88 },
89}
90
91#[derive(Debug, Clone)]
93pub struct DeltaVersion {
94 pub version: u64,
96 pub timestamp: u64,
98 pub actions: Vec<DeltaAction>,
100}
101
102#[derive(Debug, Clone, PartialEq)]
104pub struct ColumnSchema {
105 pub name: String,
107 pub data_type: String,
109 pub nullable: bool,
111}
112
113#[derive(Debug, Clone, PartialEq)]
115pub struct Schema {
116 pub columns: Vec<ColumnSchema>,
118}
119
120impl Schema {
121 pub fn new(columns: Vec<ColumnSchema>) -> Self {
123 Self { columns }
124 }
125
126 pub fn to_json(&self) -> Result<String, DeltaError> {
128 let fields: Vec<serde_json::Value> = self
129 .columns
130 .iter()
131 .map(|c| {
132 serde_json::json!({
133 "name": c.name,
134 "type": c.data_type,
135 "nullable": c.nullable,
136 })
137 })
138 .collect();
139 let schema_obj = serde_json::json!({
140 "type": "struct",
141 "fields": fields,
142 });
143 serde_json::to_string(&schema_obj)
144 .map_err(|e| DeltaError::Serialization(format!("schema to JSON: {e}")))
145 }
146
147 pub fn from_json(json_str: &str) -> Result<Self, DeltaError> {
149 let v: serde_json::Value = serde_json::from_str(json_str)
150 .map_err(|e| DeltaError::Parse(format!("schema JSON parse: {e}")))?;
151 let fields = v
152 .get("fields")
153 .and_then(|f| f.as_array())
154 .ok_or_else(|| DeltaError::Parse("missing 'fields' array in schema".to_string()))?;
155 let mut columns = Vec::with_capacity(fields.len());
156 for field in fields {
157 let name = field
158 .get("name")
159 .and_then(|n| n.as_str())
160 .unwrap_or_default()
161 .to_string();
162 let data_type = field
163 .get("type")
164 .and_then(|t| t.as_str())
165 .unwrap_or("string")
166 .to_string();
167 let nullable = field
168 .get("nullable")
169 .and_then(|n| n.as_bool())
170 .unwrap_or(true);
171 columns.push(ColumnSchema {
172 name,
173 data_type,
174 nullable,
175 });
176 }
177 Ok(Self { columns })
178 }
179
180 pub fn column_names(&self) -> Vec<&str> {
182 self.columns.iter().map(|c| c.name.as_str()).collect()
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct DeltaTable {
189 pub config: DeltaConfig,
191 pub version: u64,
193 pub active_files: HashMap<String, FileInfo>,
195 pub schema: Option<Schema>,
197 pub partition_columns: Vec<String>,
199 pub protocol: Option<(u32, u32)>,
201}
202
203#[derive(Debug, Clone)]
205pub struct FileInfo {
206 pub path: String,
208 pub size: u64,
210 pub modification_time: u64,
212 pub partition_values: HashMap<String, String>,
214}
215
216#[derive(Debug, Clone)]
218pub struct DeltaTransaction {
219 pub actions: Vec<DeltaAction>,
221 pub target_version: u64,
223}
224
225impl DeltaTransaction {
226 pub fn new(target_version: u64) -> Self {
228 Self {
229 actions: Vec::new(),
230 target_version,
231 }
232 }
233
234 pub fn add_action(&mut self, action: DeltaAction) {
236 self.actions.push(action);
237 }
238}
239
240#[derive(Debug)]
242#[non_exhaustive]
243pub enum DeltaError {
244 Io(std::io::Error),
246 Parse(String),
248 Serialization(String),
250 VersionConflict {
252 expected: u64,
254 actual: u64,
256 },
257 TableNotFound(String),
259 SchemaError(String),
261 Other(String),
263}
264
265impl std::fmt::Display for DeltaError {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 match self {
268 DeltaError::Io(e) => write!(f, "Delta I/O error: {e}"),
269 DeltaError::Parse(msg) => write!(f, "Delta parse error: {msg}"),
270 DeltaError::Serialization(msg) => write!(f, "Delta serialization error: {msg}"),
271 DeltaError::VersionConflict { expected, actual } => {
272 write!(
273 f,
274 "Delta version conflict: expected {expected}, found {actual}"
275 )
276 }
277 DeltaError::TableNotFound(path) => write!(f, "Delta table not found: {path}"),
278 DeltaError::SchemaError(msg) => write!(f, "Delta schema error: {msg}"),
279 DeltaError::Other(msg) => write!(f, "Delta error: {msg}"),
280 }
281 }
282}
283
284impl std::error::Error for DeltaError {
285 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
286 match self {
287 DeltaError::Io(e) => Some(e),
288 _ => None,
289 }
290 }
291}
292
293impl From<std::io::Error> for DeltaError {
294 fn from(e: std::io::Error) -> Self {
295 DeltaError::Io(e)
296 }
297}
298
299impl From<DeltaError> for crate::error::IoError {
300 fn from(e: DeltaError) -> Self {
301 crate::error::IoError::Other(format!("{e}"))
302 }
303}