Skip to main content

scirs2_io/delta/
types.rs

1//! Delta Lake type definitions.
2//!
3//! Core types for the Delta Lake transaction log format including actions,
4//! versions, tables, transactions, and errors.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8
9/// Configuration for a Delta Lake table.
10#[derive(Debug, Clone)]
11pub struct DeltaConfig {
12    /// Base path of the Delta table on disk.
13    pub base_path: PathBuf,
14    /// How often (in commits) to write a checkpoint file.
15    pub checkpoint_interval: usize,
16    /// Maximum number of data files to scan (soft cap).
17    pub max_files_to_scan: usize,
18}
19
20impl Default for DeltaConfig {
21    fn default() -> Self {
22        Self {
23            base_path: PathBuf::from("."),
24            checkpoint_interval: 10,
25            max_files_to_scan: 10_000,
26        }
27    }
28}
29
30/// A single action stored in a Delta log commit file.
31///
32/// Delta actions are the fundamental unit of change in the transaction log.
33/// Each commit consists of one or more actions that describe file additions,
34/// removals, metadata changes, or protocol upgrades.
35#[non_exhaustive]
36#[derive(Debug, Clone, PartialEq)]
37pub enum DeltaAction {
38    /// A data file was added to the table.
39    Add {
40        /// Relative path of the data file.
41        path: String,
42        /// Size in bytes.
43        size: u64,
44        /// Unix epoch milliseconds of last modification.
45        modification_time: u64,
46        /// Whether this add represents a data change (true) or just a compaction/optimization.
47        data_change: bool,
48        /// Optional partition values for this file.
49        partition_values: HashMap<String, String>,
50        /// Optional serialised statistics JSON.
51        stats_json: Option<String>,
52    },
53    /// A data file was logically removed (tombstoned) from the table.
54    Remove {
55        /// Relative path of the data file.
56        path: String,
57        /// Unix epoch milliseconds when the deletion was recorded.
58        deletion_timestamp: u64,
59        /// Whether this remove is a data change (true) or reorganization.
60        data_change: bool,
61    },
62    /// Table metadata (schema, partition columns, etc.).
63    Metadata {
64        /// Schema definition as a JSON string.
65        schema: String,
66        /// Columns used for partitioning.
67        partition_columns: Vec<String>,
68        /// Optional table description.
69        description: Option<String>,
70        /// Additional configuration properties.
71        configuration: HashMap<String, String>,
72    },
73    /// Protocol version requirements.
74    Protocol {
75        /// Minimum reader version required to read this table.
76        min_reader_version: u32,
77        /// Minimum writer version required to write to this table.
78        min_writer_version: u32,
79    },
80    /// Commit information (operation metadata).
81    CommitInfo {
82        /// Unix epoch milliseconds at commit time.
83        timestamp: i64,
84        /// Human-readable operation name (e.g., "WRITE", "DELETE", "MERGE").
85        operation: String,
86        /// Optional operation parameters.
87        operation_parameters: HashMap<String, String>,
88    },
89}
90
91/// A versioned set of actions forming a single commit.
92#[derive(Debug, Clone)]
93pub struct DeltaVersion {
94    /// Log version number (0-based, monotonically increasing).
95    pub version: u64,
96    /// Unix epoch milliseconds when this version was committed.
97    pub timestamp: u64,
98    /// All actions in this version.
99    pub actions: Vec<DeltaAction>,
100}
101
102/// Column schema definition for Delta tables.
103#[derive(Debug, Clone, PartialEq)]
104pub struct ColumnSchema {
105    /// Column name.
106    pub name: String,
107    /// Column data type as a string (e.g., "double", "string", "long").
108    pub data_type: String,
109    /// Whether the column is nullable.
110    pub nullable: bool,
111}
112
113/// Schema definition for a Delta table.
114#[derive(Debug, Clone, PartialEq)]
115pub struct Schema {
116    /// Ordered list of columns.
117    pub columns: Vec<ColumnSchema>,
118}
119
120impl Schema {
121    /// Create a new schema from a list of columns.
122    pub fn new(columns: Vec<ColumnSchema>) -> Self {
123        Self { columns }
124    }
125
126    /// Serialize the schema to a JSON string.
127    pub fn to_json(&self) -> Result<String, DeltaError> {
128        let fields: Vec<serde_json::Value> = self
129            .columns
130            .iter()
131            .map(|c| {
132                serde_json::json!({
133                    "name": c.name,
134                    "type": c.data_type,
135                    "nullable": c.nullable,
136                })
137            })
138            .collect();
139        let schema_obj = serde_json::json!({
140            "type": "struct",
141            "fields": fields,
142        });
143        serde_json::to_string(&schema_obj)
144            .map_err(|e| DeltaError::Serialization(format!("schema to JSON: {e}")))
145    }
146
147    /// Deserialize a schema from a JSON string.
148    pub fn from_json(json_str: &str) -> Result<Self, DeltaError> {
149        let v: serde_json::Value = serde_json::from_str(json_str)
150            .map_err(|e| DeltaError::Parse(format!("schema JSON parse: {e}")))?;
151        let fields = v
152            .get("fields")
153            .and_then(|f| f.as_array())
154            .ok_or_else(|| DeltaError::Parse("missing 'fields' array in schema".to_string()))?;
155        let mut columns = Vec::with_capacity(fields.len());
156        for field in fields {
157            let name = field
158                .get("name")
159                .and_then(|n| n.as_str())
160                .unwrap_or_default()
161                .to_string();
162            let data_type = field
163                .get("type")
164                .and_then(|t| t.as_str())
165                .unwrap_or("string")
166                .to_string();
167            let nullable = field
168                .get("nullable")
169                .and_then(|n| n.as_bool())
170                .unwrap_or(true);
171            columns.push(ColumnSchema {
172                name,
173                data_type,
174                nullable,
175            });
176        }
177        Ok(Self { columns })
178    }
179
180    /// Get column names.
181    pub fn column_names(&self) -> Vec<&str> {
182        self.columns.iter().map(|c| c.name.as_str()).collect()
183    }
184}
185
186/// Represents the reconstructed logical state of a Delta table.
187#[derive(Debug, Clone)]
188pub struct DeltaTable {
189    /// Configuration for this table.
190    pub config: DeltaConfig,
191    /// Current table version.
192    pub version: u64,
193    /// Active data files (path -> Add action details).
194    pub active_files: HashMap<String, FileInfo>,
195    /// Current schema, if known.
196    pub schema: Option<Schema>,
197    /// Partition columns.
198    pub partition_columns: Vec<String>,
199    /// Protocol version.
200    pub protocol: Option<(u32, u32)>,
201}
202
203/// Information about an active data file.
204#[derive(Debug, Clone)]
205pub struct FileInfo {
206    /// File path relative to table root.
207    pub path: String,
208    /// Size in bytes.
209    pub size: u64,
210    /// Last modification time (Unix epoch ms).
211    pub modification_time: u64,
212    /// Partition values for this file.
213    pub partition_values: HashMap<String, String>,
214}
215
216/// An in-progress transaction that can be committed atomically.
217#[derive(Debug, Clone)]
218pub struct DeltaTransaction {
219    /// Actions accumulated in this transaction.
220    pub actions: Vec<DeltaAction>,
221    /// The version this transaction will be committed as.
222    pub target_version: u64,
223}
224
225impl DeltaTransaction {
226    /// Create a new transaction targeting the given version.
227    pub fn new(target_version: u64) -> Self {
228        Self {
229            actions: Vec::new(),
230            target_version,
231        }
232    }
233
234    /// Add an action to this transaction.
235    pub fn add_action(&mut self, action: DeltaAction) {
236        self.actions.push(action);
237    }
238}
239
240/// Errors specific to Delta Lake operations.
241#[derive(Debug)]
242#[non_exhaustive]
243pub enum DeltaError {
244    /// I/O error during file operations.
245    Io(std::io::Error),
246    /// JSON parsing or serialization error.
247    Parse(String),
248    /// Serialization error.
249    Serialization(String),
250    /// Version conflict during optimistic concurrency.
251    VersionConflict {
252        /// The version the transaction expected to write.
253        expected: u64,
254        /// The actual latest version found on disk.
255        actual: u64,
256    },
257    /// Table not found or not initialized.
258    TableNotFound(String),
259    /// Schema evolution error.
260    SchemaError(String),
261    /// General error.
262    Other(String),
263}
264
265impl std::fmt::Display for DeltaError {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        match self {
268            DeltaError::Io(e) => write!(f, "Delta I/O error: {e}"),
269            DeltaError::Parse(msg) => write!(f, "Delta parse error: {msg}"),
270            DeltaError::Serialization(msg) => write!(f, "Delta serialization error: {msg}"),
271            DeltaError::VersionConflict { expected, actual } => {
272                write!(
273                    f,
274                    "Delta version conflict: expected {expected}, found {actual}"
275                )
276            }
277            DeltaError::TableNotFound(path) => write!(f, "Delta table not found: {path}"),
278            DeltaError::SchemaError(msg) => write!(f, "Delta schema error: {msg}"),
279            DeltaError::Other(msg) => write!(f, "Delta error: {msg}"),
280        }
281    }
282}
283
284impl std::error::Error for DeltaError {
285    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
286        match self {
287            DeltaError::Io(e) => Some(e),
288            _ => None,
289        }
290    }
291}
292
293impl From<std::io::Error> for DeltaError {
294    fn from(e: std::io::Error) -> Self {
295        DeltaError::Io(e)
296    }
297}
298
299impl From<DeltaError> for crate::error::IoError {
300    fn from(e: DeltaError) -> Self {
301        crate::error::IoError::Other(format!("{e}"))
302    }
303}