Skip to main content

floe_core/io/write/
delta.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use deltalake::arrow::array::{
5    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, Int32Array,
6    Int64Array, Int8Array, NullArray, StringArray, Time64NanosecondArray,
7    TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
8};
9use deltalake::arrow::datatypes::{Field, Schema};
10use deltalake::arrow::record_batch::RecordBatch;
11use deltalake::logstore::read_commit_entry;
12use deltalake::protocol::SaveMode;
13use deltalake::table::builder::DeltaTableBuilder;
14use polars::prelude::{DataFrame, DataType, TimeUnit};
15use serde_json::Value;
16
17use crate::checks::normalize;
18use crate::errors::RunError;
19use crate::io::format::{AcceptedSinkAdapter, AcceptedWriteMetrics, AcceptedWriteOutput};
20use crate::io::storage::{object_store, Target};
21use crate::{config, io, FloeResult};
22
23use super::metrics;
24
25struct DeltaAcceptedAdapter;
26
27static DELTA_ACCEPTED_ADAPTER: DeltaAcceptedAdapter = DeltaAcceptedAdapter;
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct DeltaWriteRuntimeOptions {
31    pub partition_by: Option<Vec<String>>,
32    pub target_file_size_bytes: Option<usize>,
33    pub small_file_threshold_bytes: u64,
34}
35
36#[derive(Debug)]
37struct DeltaWriteResult {
38    version: i64,
39    files_written: u64,
40    part_files: Vec<String>,
41    metrics: AcceptedWriteMetrics,
42}
43
44pub(crate) fn delta_accepted_adapter() -> &'static dyn AcceptedSinkAdapter {
45    &DELTA_ACCEPTED_ADAPTER
46}
47
48pub fn delta_write_runtime_options(
49    entity: &config::EntityConfig,
50) -> FloeResult<DeltaWriteRuntimeOptions> {
51    let target_file_size_bytes_u64 = entity
52        .sink
53        .accepted
54        .options
55        .as_ref()
56        .and_then(|options| options.max_size_per_file);
57    let target_file_size_bytes = match target_file_size_bytes_u64 {
58        Some(value) => Some(usize::try_from(value).map_err(|_| {
59            Box::new(RunError(format!(
60                "delta sink max_size_per_file is too large for this platform: {value}"
61            ))) as Box<dyn std::error::Error + Send + Sync>
62        })?),
63        None => None,
64    };
65    Ok(DeltaWriteRuntimeOptions {
66        partition_by: entity.sink.accepted.partition_by.clone(),
67        target_file_size_bytes,
68        small_file_threshold_bytes: metrics::default_small_file_threshold_bytes(
69            target_file_size_bytes_u64,
70        ),
71    })
72}
73
74pub fn write_delta_table(
75    df: &mut DataFrame,
76    target: &Target,
77    resolver: &config::StorageResolver,
78    entity: &config::EntityConfig,
79    mode: config::WriteMode,
80) -> FloeResult<i64> {
81    Ok(write_delta_table_with_metrics(df, target, resolver, entity, mode)?.version)
82}
83
84fn write_delta_table_with_metrics(
85    df: &mut DataFrame,
86    target: &Target,
87    resolver: &config::StorageResolver,
88    entity: &config::EntityConfig,
89    mode: config::WriteMode,
90) -> FloeResult<DeltaWriteResult> {
91    if let Target::Local { base_path, .. } = target {
92        std::fs::create_dir_all(Path::new(base_path))?;
93    }
94    let batch = dataframe_to_record_batch(df, entity)?;
95    let runtime_options = delta_write_runtime_options(entity)?;
96    let partition_by = runtime_options.partition_by.clone();
97    let target_file_size_bytes = runtime_options.target_file_size_bytes;
98    let small_file_threshold_bytes = runtime_options.small_file_threshold_bytes;
99    let store = object_store::delta_store_config(target, resolver, entity)?;
100    let table_url = store.table_url;
101    let storage_options = store.storage_options;
102    let builder = DeltaTableBuilder::from_url(table_url.clone())
103        .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))?
104        .with_storage_options(storage_options.clone());
105    let runtime = tokio::runtime::Builder::new_current_thread()
106        .enable_all()
107        .build()
108        .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?;
109    let version = runtime
110        .block_on(async move {
111            let table = match builder.load().await {
112                Ok(table) => table,
113                Err(err) => match err {
114                    deltalake::DeltaTableError::NotATable(_) => {
115                        let builder = DeltaTableBuilder::from_url(table_url)?
116                            .with_storage_options(storage_options);
117                        builder.build()?
118                    }
119                    other => return Err(other),
120                },
121            };
122            let mut write = table
123                .write(vec![batch])
124                .with_save_mode(save_mode_for_write_mode(mode));
125            if let Some(partition_by) = partition_by.clone() {
126                write = write.with_partition_columns(partition_by);
127            }
128            if let Some(target_file_size) = target_file_size_bytes {
129                write = write.with_target_file_size(target_file_size);
130            }
131            let table = write.await?;
132            let version = table.version().ok_or_else(|| {
133                deltalake::DeltaTableError::Generic(
134                    "delta table version missing after write".to_string(),
135                )
136            })?;
137            Ok::<i64, deltalake::DeltaTableError>(version)
138        })
139        .map_err(|err| Box::new(RunError(format!("delta write failed: {err}"))))?;
140
141    let (files_written, part_files, metrics) = delta_commit_metrics_for_target(
142        &runtime,
143        target,
144        resolver,
145        entity,
146        version,
147        small_file_threshold_bytes,
148    )?;
149
150    Ok(DeltaWriteResult {
151        version,
152        files_written,
153        part_files,
154        metrics,
155    })
156}
157
158impl AcceptedSinkAdapter for DeltaAcceptedAdapter {
159    fn write_accepted(
160        &self,
161        target: &Target,
162        df: &mut DataFrame,
163        mode: config::WriteMode,
164        _output_stem: &str,
165        _temp_dir: Option<&Path>,
166        _cloud: &mut io::storage::CloudClient,
167        resolver: &config::StorageResolver,
168        _catalogs: &config::CatalogResolver,
169        entity: &config::EntityConfig,
170    ) -> FloeResult<AcceptedWriteOutput> {
171        let result = write_delta_table_with_metrics(df, target, resolver, entity, mode)?;
172        Ok(AcceptedWriteOutput {
173            files_written: result.files_written,
174            parts_written: 1,
175            part_files: result.part_files,
176            table_version: Some(result.version),
177            snapshot_id: None,
178            table_root_uri: None,
179            iceberg_catalog_name: None,
180            iceberg_database: None,
181            iceberg_namespace: None,
182            iceberg_table: None,
183            metrics: result.metrics,
184        })
185    }
186}
187
188fn delta_commit_metrics_for_target(
189    runtime: &tokio::runtime::Runtime,
190    target: &Target,
191    resolver: &config::StorageResolver,
192    entity: &config::EntityConfig,
193    version: i64,
194    small_file_threshold_bytes: u64,
195) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
196    match target {
197        Target::Local { base_path, .. } => {
198            let stats = delta_commit_add_stats(Path::new(base_path), version)?;
199            Ok(delta_commit_stats_to_output(
200                stats,
201                small_file_threshold_bytes,
202            ))
203        }
204        // Best-effort metrics for remote targets: never fail a successful write because the
205        // commit log could not be read or parsed after commit.
206        Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
207            match delta_commit_add_stats_via_object_store(
208                runtime, target, resolver, entity, version,
209            ) {
210                Ok(stats) => Ok(delta_commit_stats_to_output(
211                    stats,
212                    small_file_threshold_bytes,
213                )),
214                Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
215            }
216        }
217    }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Default)]
221pub struct DeltaCommitAddStats {
222    files_written: u64,
223    part_files: Vec<String>,
224    file_sizes: Vec<u64>,
225}
226
227fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
228    let log_path = table_root
229        .join("_delta_log")
230        .join(format!("{version:020}.json"));
231    let bytes = std::fs::read(&log_path).map_err(|err| {
232        Box::new(RunError(format!(
233            "delta metrics failed to open commit log {}: {err}",
234            log_path.display()
235        )))
236    })?;
237    parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
238}
239
240fn delta_commit_add_stats_via_object_store(
241    runtime: &tokio::runtime::Runtime,
242    target: &Target,
243    resolver: &config::StorageResolver,
244    entity: &config::EntityConfig,
245    version: i64,
246) -> FloeResult<DeltaCommitAddStats> {
247    let store = object_store::delta_store_config(target, resolver, entity)?;
248    let builder = DeltaTableBuilder::from_url(store.table_url.clone())
249        .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
250        .with_storage_options(store.storage_options);
251    let log_store = builder.build_storage().map_err(|err| {
252        Box::new(RunError(format!(
253            "delta metrics log store init failed: {err}"
254        )))
255    })?;
256    let bytes = runtime
257        .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
258        .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
259        .ok_or_else(|| {
260            Box::new(RunError(format!(
261                "delta metrics commit log missing for version {version}"
262            ))) as Box<dyn std::error::Error + Send + Sync>
263        })?;
264    parse_delta_commit_add_stats_bytes_with_context(
265        bytes.as_ref(),
266        &format!("remote delta commit version {version}"),
267    )
268}
269
270#[doc(hidden)]
271pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
272    parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
273}
274
275fn parse_delta_commit_add_stats_bytes_with_context(
276    bytes: &[u8],
277    context: &str,
278) -> FloeResult<DeltaCommitAddStats> {
279    let content = std::str::from_utf8(bytes).map_err(|err| {
280        Box::new(RunError(format!(
281            "delta metrics failed to decode {context} as utf-8: {err}"
282        )))
283    })?;
284    let mut stats = DeltaCommitAddStats::default();
285    for line in content.lines() {
286        let record: Value = serde_json::from_str(line).map_err(|err| {
287            Box::new(RunError(format!(
288                "delta metrics failed to parse {context}: {err}"
289            )))
290        })?;
291        let Some(add) = record.get("add") else {
292            continue;
293        };
294        stats.files_written += 1;
295        if stats.part_files.len() < 50 {
296            if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
297                let display_name = Path::new(path)
298                    .file_name()
299                    .and_then(|name| name.to_str())
300                    .map(ToOwned::to_owned)
301                    .unwrap_or_else(|| path.to_string());
302                stats.part_files.push(display_name);
303            }
304        }
305        if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
306            stats.file_sizes.push(size);
307        }
308    }
309    Ok(stats)
310}
311
312#[doc(hidden)]
313pub fn delta_commit_metrics_from_log_bytes(
314    bytes: &[u8],
315    small_file_threshold_bytes: u64,
316) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
317    let stats = parse_delta_commit_add_stats_bytes(bytes)?;
318    Ok(delta_commit_stats_to_output(
319        stats,
320        small_file_threshold_bytes,
321    ))
322}
323
324#[doc(hidden)]
325pub fn delta_commit_metrics_from_log_bytes_best_effort(
326    bytes: &[u8],
327    small_file_threshold_bytes: u64,
328) -> (u64, Vec<String>, AcceptedWriteMetrics) {
329    match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
330        Ok(output) => output,
331        Err(_) => delta_commit_metrics_fallback_unknown(),
332    }
333}
334
335fn delta_commit_stats_to_output(
336    stats: DeltaCommitAddStats,
337    small_file_threshold_bytes: u64,
338) -> (u64, Vec<String>, AcceptedWriteMetrics) {
339    let metrics = if stats.file_sizes.len() == stats.files_written as usize {
340        metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes)
341    } else {
342        null_accepted_write_metrics()
343    };
344    (stats.files_written, stats.part_files, metrics)
345}
346
347fn delta_commit_metrics_fallback_unknown() -> (u64, Vec<String>, AcceptedWriteMetrics) {
348    (0, Vec::new(), null_accepted_write_metrics())
349}
350
351fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
352    AcceptedWriteMetrics {
353        total_bytes_written: None,
354        avg_file_size_mb: None,
355        small_files_count: None,
356    }
357}
358
359fn dataframe_to_record_batch(
360    df: &DataFrame,
361    entity: &config::EntityConfig,
362) -> FloeResult<RecordBatch> {
363    if entity.schema.columns.is_empty() {
364        let mut fields = Vec::with_capacity(df.width());
365        let mut arrays = Vec::with_capacity(df.width());
366        for column in df.get_columns() {
367            let series = column.as_materialized_series();
368            let name = series.name().to_string();
369            let array = series_to_arrow_array(series)?;
370            let nullable = array.null_count() > 0;
371            fields.push(Field::new(name, array.data_type().clone(), nullable));
372            arrays.push(array);
373        }
374        let schema = Arc::new(Schema::new(fields));
375        return RecordBatch::try_new(schema, arrays).map_err(|err| {
376            Box::new(RunError(format!("delta record batch build failed: {err}")))
377                as Box<dyn std::error::Error + Send + Sync>
378        });
379    }
380
381    let schema_columns = normalize::resolve_output_columns(
382        &entity.schema.columns,
383        normalize::resolve_normalize_strategy(entity)?.as_deref(),
384    );
385    let mut fields = Vec::with_capacity(schema_columns.len());
386    let mut arrays = Vec::with_capacity(schema_columns.len());
387    for column in &schema_columns {
388        let series = df
389            .column(column.name.as_str())
390            .map_err(|err| Box::new(RunError(format!("delta column lookup failed: {err}"))))?;
391        let series = series.as_materialized_series();
392        let array = series_to_arrow_array(series)?;
393        let nullable = column.nullable.unwrap_or(true);
394        if !nullable && array.null_count() > 0 {
395            return Err(Box::new(RunError(format!(
396                "delta write rejected nulls for non-nullable column {}",
397                column.name
398            ))));
399        }
400        fields.push(Field::new(
401            column.name.clone(),
402            array.data_type().clone(),
403            nullable,
404        ));
405        arrays.push(array);
406    }
407    let schema = Arc::new(Schema::new(fields));
408    RecordBatch::try_new(schema, arrays).map_err(|err| {
409        Box::new(RunError(format!("delta record batch build failed: {err}")))
410            as Box<dyn std::error::Error + Send + Sync>
411    })
412}
413
414fn save_mode_for_write_mode(mode: config::WriteMode) -> SaveMode {
415    match mode {
416        config::WriteMode::Overwrite => SaveMode::Overwrite,
417        config::WriteMode::Append => SaveMode::Append,
418    }
419}
420
421fn series_to_arrow_array(series: &polars::prelude::Series) -> FloeResult<ArrayRef> {
422    let array: ArrayRef = match series.dtype() {
423        DataType::String => {
424            let values = series.str()?;
425            Arc::new(StringArray::from_iter(values))
426        }
427        DataType::Boolean => {
428            let values = series.bool()?;
429            Arc::new(BooleanArray::from_iter(values))
430        }
431        DataType::Int8 => {
432            let values = series.i8()?;
433            Arc::new(Int8Array::from_iter(values))
434        }
435        DataType::Int16 => {
436            let values = series.i16()?;
437            Arc::new(Int16Array::from_iter(values))
438        }
439        DataType::Int32 => {
440            let values = series.i32()?;
441            Arc::new(Int32Array::from_iter(values))
442        }
443        DataType::Int64 => {
444            let values = series.i64()?;
445            Arc::new(Int64Array::from_iter(values))
446        }
447        DataType::UInt8 => {
448            let values = series.u8()?;
449            Arc::new(UInt8Array::from_iter(values))
450        }
451        DataType::UInt16 => {
452            let values = series.u16()?;
453            Arc::new(UInt16Array::from_iter(values))
454        }
455        DataType::UInt32 => {
456            let values = series.u32()?;
457            Arc::new(UInt32Array::from_iter(values))
458        }
459        DataType::UInt64 => {
460            let values = series.u64()?;
461            Arc::new(UInt64Array::from_iter(values))
462        }
463        DataType::Float32 => {
464            let values = series.f32()?;
465            Arc::new(Float32Array::from_iter(values))
466        }
467        DataType::Float64 => {
468            let values = series.f64()?;
469            Arc::new(Float64Array::from_iter(values))
470        }
471        DataType::Date => {
472            let values = series.date()?;
473            Arc::new(Date32Array::from_iter(values.phys.iter()))
474        }
475        DataType::Datetime(unit, _) => {
476            let values = series.datetime()?;
477            let micros = values.phys.iter().map(|opt| match unit {
478                TimeUnit::Milliseconds => opt.map(|value| value.saturating_mul(1000)),
479                TimeUnit::Microseconds => opt,
480                TimeUnit::Nanoseconds => opt.map(|value| value / 1000),
481            });
482            Arc::new(TimestampMicrosecondArray::from_iter(micros))
483        }
484        DataType::Time => {
485            let values = series.time()?;
486            Arc::new(Time64NanosecondArray::from_iter(values.phys.iter()))
487        }
488        DataType::Null => Arc::new(NullArray::new(series.len())),
489        dtype => {
490            return Err(Box::new(RunError(format!(
491                "delta sink does not support dtype {dtype:?} for {}",
492                series.name()
493            ))))
494        }
495    };
496    Ok(array)
497}