1use std::path::Path;
2use std::sync::Arc;
3
4use deltalake::arrow::array::{
5 ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, Int32Array,
6 Int64Array, Int8Array, NullArray, StringArray, Time64NanosecondArray,
7 TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
8};
9use deltalake::arrow::datatypes::{Field, Schema};
10use deltalake::arrow::record_batch::RecordBatch;
11use deltalake::logstore::read_commit_entry;
12use deltalake::protocol::SaveMode;
13use deltalake::table::builder::DeltaTableBuilder;
14use polars::prelude::{DataFrame, DataType, TimeUnit};
15use serde_json::Value;
16
17use crate::checks::normalize;
18use crate::errors::RunError;
19use crate::io::format::{AcceptedSinkAdapter, AcceptedWriteMetrics, AcceptedWriteOutput};
20use crate::io::storage::{object_store, Target};
21use crate::{config, io, FloeResult};
22
23use super::metrics;
24
25struct DeltaAcceptedAdapter;
26
27static DELTA_ACCEPTED_ADAPTER: DeltaAcceptedAdapter = DeltaAcceptedAdapter;
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct DeltaWriteRuntimeOptions {
31 pub partition_by: Option<Vec<String>>,
32 pub target_file_size_bytes: Option<usize>,
33 pub small_file_threshold_bytes: u64,
34}
35
36#[derive(Debug)]
37struct DeltaWriteResult {
38 version: i64,
39 files_written: u64,
40 part_files: Vec<String>,
41 metrics: AcceptedWriteMetrics,
42}
43
44pub(crate) fn delta_accepted_adapter() -> &'static dyn AcceptedSinkAdapter {
45 &DELTA_ACCEPTED_ADAPTER
46}
47
48pub fn delta_write_runtime_options(
49 entity: &config::EntityConfig,
50) -> FloeResult<DeltaWriteRuntimeOptions> {
51 let target_file_size_bytes_u64 = entity
52 .sink
53 .accepted
54 .options
55 .as_ref()
56 .and_then(|options| options.max_size_per_file);
57 let target_file_size_bytes = match target_file_size_bytes_u64 {
58 Some(value) => Some(usize::try_from(value).map_err(|_| {
59 Box::new(RunError(format!(
60 "delta sink max_size_per_file is too large for this platform: {value}"
61 ))) as Box<dyn std::error::Error + Send + Sync>
62 })?),
63 None => None,
64 };
65 Ok(DeltaWriteRuntimeOptions {
66 partition_by: entity.sink.accepted.partition_by.clone(),
67 target_file_size_bytes,
68 small_file_threshold_bytes: metrics::default_small_file_threshold_bytes(
69 target_file_size_bytes_u64,
70 ),
71 })
72}
73
74pub fn write_delta_table(
75 df: &mut DataFrame,
76 target: &Target,
77 resolver: &config::StorageResolver,
78 entity: &config::EntityConfig,
79 mode: config::WriteMode,
80) -> FloeResult<i64> {
81 Ok(write_delta_table_with_metrics(df, target, resolver, entity, mode)?.version)
82}
83
84fn write_delta_table_with_metrics(
85 df: &mut DataFrame,
86 target: &Target,
87 resolver: &config::StorageResolver,
88 entity: &config::EntityConfig,
89 mode: config::WriteMode,
90) -> FloeResult<DeltaWriteResult> {
91 if let Target::Local { base_path, .. } = target {
92 std::fs::create_dir_all(Path::new(base_path))?;
93 }
94 let batch = dataframe_to_record_batch(df, entity)?;
95 let runtime_options = delta_write_runtime_options(entity)?;
96 let partition_by = runtime_options.partition_by.clone();
97 let target_file_size_bytes = runtime_options.target_file_size_bytes;
98 let small_file_threshold_bytes = runtime_options.small_file_threshold_bytes;
99 let store = object_store::delta_store_config(target, resolver, entity)?;
100 let table_url = store.table_url;
101 let storage_options = store.storage_options;
102 let builder = DeltaTableBuilder::from_url(table_url.clone())
103 .map_err(|err| Box::new(RunError(format!("delta builder failed: {err}"))))?
104 .with_storage_options(storage_options.clone());
105 let runtime = tokio::runtime::Builder::new_current_thread()
106 .enable_all()
107 .build()
108 .map_err(|err| Box::new(RunError(format!("delta runtime init failed: {err}"))))?;
109 let version = runtime
110 .block_on(async move {
111 let table = match builder.load().await {
112 Ok(table) => table,
113 Err(err) => match err {
114 deltalake::DeltaTableError::NotATable(_) => {
115 let builder = DeltaTableBuilder::from_url(table_url)?
116 .with_storage_options(storage_options);
117 builder.build()?
118 }
119 other => return Err(other),
120 },
121 };
122 let mut write = table
123 .write(vec![batch])
124 .with_save_mode(save_mode_for_write_mode(mode));
125 if let Some(partition_by) = partition_by.clone() {
126 write = write.with_partition_columns(partition_by);
127 }
128 if let Some(target_file_size) = target_file_size_bytes {
129 write = write.with_target_file_size(target_file_size);
130 }
131 let table = write.await?;
132 let version = table.version().ok_or_else(|| {
133 deltalake::DeltaTableError::Generic(
134 "delta table version missing after write".to_string(),
135 )
136 })?;
137 Ok::<i64, deltalake::DeltaTableError>(version)
138 })
139 .map_err(|err| Box::new(RunError(format!("delta write failed: {err}"))))?;
140
141 let (files_written, part_files, metrics) = delta_commit_metrics_for_target(
142 &runtime,
143 target,
144 resolver,
145 entity,
146 version,
147 small_file_threshold_bytes,
148 )?;
149
150 Ok(DeltaWriteResult {
151 version,
152 files_written,
153 part_files,
154 metrics,
155 })
156}
157
158impl AcceptedSinkAdapter for DeltaAcceptedAdapter {
159 fn write_accepted(
160 &self,
161 target: &Target,
162 df: &mut DataFrame,
163 mode: config::WriteMode,
164 _output_stem: &str,
165 _temp_dir: Option<&Path>,
166 _cloud: &mut io::storage::CloudClient,
167 resolver: &config::StorageResolver,
168 _catalogs: &config::CatalogResolver,
169 entity: &config::EntityConfig,
170 ) -> FloeResult<AcceptedWriteOutput> {
171 let result = write_delta_table_with_metrics(df, target, resolver, entity, mode)?;
172 Ok(AcceptedWriteOutput {
173 files_written: result.files_written,
174 parts_written: 1,
175 part_files: result.part_files,
176 table_version: Some(result.version),
177 snapshot_id: None,
178 table_root_uri: None,
179 iceberg_catalog_name: None,
180 iceberg_database: None,
181 iceberg_namespace: None,
182 iceberg_table: None,
183 metrics: result.metrics,
184 })
185 }
186}
187
188fn delta_commit_metrics_for_target(
189 runtime: &tokio::runtime::Runtime,
190 target: &Target,
191 resolver: &config::StorageResolver,
192 entity: &config::EntityConfig,
193 version: i64,
194 small_file_threshold_bytes: u64,
195) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
196 match target {
197 Target::Local { base_path, .. } => {
198 let stats = delta_commit_add_stats(Path::new(base_path), version)?;
199 Ok(delta_commit_stats_to_output(
200 stats,
201 small_file_threshold_bytes,
202 ))
203 }
204 Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
207 match delta_commit_add_stats_via_object_store(
208 runtime, target, resolver, entity, version,
209 ) {
210 Ok(stats) => Ok(delta_commit_stats_to_output(
211 stats,
212 small_file_threshold_bytes,
213 )),
214 Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
215 }
216 }
217 }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Default)]
221pub struct DeltaCommitAddStats {
222 files_written: u64,
223 part_files: Vec<String>,
224 file_sizes: Vec<u64>,
225}
226
227fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
228 let log_path = table_root
229 .join("_delta_log")
230 .join(format!("{version:020}.json"));
231 let bytes = std::fs::read(&log_path).map_err(|err| {
232 Box::new(RunError(format!(
233 "delta metrics failed to open commit log {}: {err}",
234 log_path.display()
235 )))
236 })?;
237 parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
238}
239
240fn delta_commit_add_stats_via_object_store(
241 runtime: &tokio::runtime::Runtime,
242 target: &Target,
243 resolver: &config::StorageResolver,
244 entity: &config::EntityConfig,
245 version: i64,
246) -> FloeResult<DeltaCommitAddStats> {
247 let store = object_store::delta_store_config(target, resolver, entity)?;
248 let builder = DeltaTableBuilder::from_url(store.table_url.clone())
249 .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
250 .with_storage_options(store.storage_options);
251 let log_store = builder.build_storage().map_err(|err| {
252 Box::new(RunError(format!(
253 "delta metrics log store init failed: {err}"
254 )))
255 })?;
256 let bytes = runtime
257 .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
258 .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
259 .ok_or_else(|| {
260 Box::new(RunError(format!(
261 "delta metrics commit log missing for version {version}"
262 ))) as Box<dyn std::error::Error + Send + Sync>
263 })?;
264 parse_delta_commit_add_stats_bytes_with_context(
265 bytes.as_ref(),
266 &format!("remote delta commit version {version}"),
267 )
268}
269
270#[doc(hidden)]
271pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
272 parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
273}
274
275fn parse_delta_commit_add_stats_bytes_with_context(
276 bytes: &[u8],
277 context: &str,
278) -> FloeResult<DeltaCommitAddStats> {
279 let content = std::str::from_utf8(bytes).map_err(|err| {
280 Box::new(RunError(format!(
281 "delta metrics failed to decode {context} as utf-8: {err}"
282 )))
283 })?;
284 let mut stats = DeltaCommitAddStats::default();
285 for line in content.lines() {
286 let record: Value = serde_json::from_str(line).map_err(|err| {
287 Box::new(RunError(format!(
288 "delta metrics failed to parse {context}: {err}"
289 )))
290 })?;
291 let Some(add) = record.get("add") else {
292 continue;
293 };
294 stats.files_written += 1;
295 if stats.part_files.len() < 50 {
296 if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
297 let display_name = Path::new(path)
298 .file_name()
299 .and_then(|name| name.to_str())
300 .map(ToOwned::to_owned)
301 .unwrap_or_else(|| path.to_string());
302 stats.part_files.push(display_name);
303 }
304 }
305 if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
306 stats.file_sizes.push(size);
307 }
308 }
309 Ok(stats)
310}
311
312#[doc(hidden)]
313pub fn delta_commit_metrics_from_log_bytes(
314 bytes: &[u8],
315 small_file_threshold_bytes: u64,
316) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
317 let stats = parse_delta_commit_add_stats_bytes(bytes)?;
318 Ok(delta_commit_stats_to_output(
319 stats,
320 small_file_threshold_bytes,
321 ))
322}
323
324#[doc(hidden)]
325pub fn delta_commit_metrics_from_log_bytes_best_effort(
326 bytes: &[u8],
327 small_file_threshold_bytes: u64,
328) -> (u64, Vec<String>, AcceptedWriteMetrics) {
329 match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
330 Ok(output) => output,
331 Err(_) => delta_commit_metrics_fallback_unknown(),
332 }
333}
334
335fn delta_commit_stats_to_output(
336 stats: DeltaCommitAddStats,
337 small_file_threshold_bytes: u64,
338) -> (u64, Vec<String>, AcceptedWriteMetrics) {
339 let metrics = if stats.file_sizes.len() == stats.files_written as usize {
340 metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes)
341 } else {
342 null_accepted_write_metrics()
343 };
344 (stats.files_written, stats.part_files, metrics)
345}
346
347fn delta_commit_metrics_fallback_unknown() -> (u64, Vec<String>, AcceptedWriteMetrics) {
348 (0, Vec::new(), null_accepted_write_metrics())
349}
350
351fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
352 AcceptedWriteMetrics {
353 total_bytes_written: None,
354 avg_file_size_mb: None,
355 small_files_count: None,
356 }
357}
358
359fn dataframe_to_record_batch(
360 df: &DataFrame,
361 entity: &config::EntityConfig,
362) -> FloeResult<RecordBatch> {
363 if entity.schema.columns.is_empty() {
364 let mut fields = Vec::with_capacity(df.width());
365 let mut arrays = Vec::with_capacity(df.width());
366 for column in df.get_columns() {
367 let series = column.as_materialized_series();
368 let name = series.name().to_string();
369 let array = series_to_arrow_array(series)?;
370 let nullable = array.null_count() > 0;
371 fields.push(Field::new(name, array.data_type().clone(), nullable));
372 arrays.push(array);
373 }
374 let schema = Arc::new(Schema::new(fields));
375 return RecordBatch::try_new(schema, arrays).map_err(|err| {
376 Box::new(RunError(format!("delta record batch build failed: {err}")))
377 as Box<dyn std::error::Error + Send + Sync>
378 });
379 }
380
381 let schema_columns = normalize::resolve_output_columns(
382 &entity.schema.columns,
383 normalize::resolve_normalize_strategy(entity)?.as_deref(),
384 );
385 let mut fields = Vec::with_capacity(schema_columns.len());
386 let mut arrays = Vec::with_capacity(schema_columns.len());
387 for column in &schema_columns {
388 let series = df
389 .column(column.name.as_str())
390 .map_err(|err| Box::new(RunError(format!("delta column lookup failed: {err}"))))?;
391 let series = series.as_materialized_series();
392 let array = series_to_arrow_array(series)?;
393 let nullable = column.nullable.unwrap_or(true);
394 if !nullable && array.null_count() > 0 {
395 return Err(Box::new(RunError(format!(
396 "delta write rejected nulls for non-nullable column {}",
397 column.name
398 ))));
399 }
400 fields.push(Field::new(
401 column.name.clone(),
402 array.data_type().clone(),
403 nullable,
404 ));
405 arrays.push(array);
406 }
407 let schema = Arc::new(Schema::new(fields));
408 RecordBatch::try_new(schema, arrays).map_err(|err| {
409 Box::new(RunError(format!("delta record batch build failed: {err}")))
410 as Box<dyn std::error::Error + Send + Sync>
411 })
412}
413
414fn save_mode_for_write_mode(mode: config::WriteMode) -> SaveMode {
415 match mode {
416 config::WriteMode::Overwrite => SaveMode::Overwrite,
417 config::WriteMode::Append => SaveMode::Append,
418 }
419}
420
421fn series_to_arrow_array(series: &polars::prelude::Series) -> FloeResult<ArrayRef> {
422 let array: ArrayRef = match series.dtype() {
423 DataType::String => {
424 let values = series.str()?;
425 Arc::new(StringArray::from_iter(values))
426 }
427 DataType::Boolean => {
428 let values = series.bool()?;
429 Arc::new(BooleanArray::from_iter(values))
430 }
431 DataType::Int8 => {
432 let values = series.i8()?;
433 Arc::new(Int8Array::from_iter(values))
434 }
435 DataType::Int16 => {
436 let values = series.i16()?;
437 Arc::new(Int16Array::from_iter(values))
438 }
439 DataType::Int32 => {
440 let values = series.i32()?;
441 Arc::new(Int32Array::from_iter(values))
442 }
443 DataType::Int64 => {
444 let values = series.i64()?;
445 Arc::new(Int64Array::from_iter(values))
446 }
447 DataType::UInt8 => {
448 let values = series.u8()?;
449 Arc::new(UInt8Array::from_iter(values))
450 }
451 DataType::UInt16 => {
452 let values = series.u16()?;
453 Arc::new(UInt16Array::from_iter(values))
454 }
455 DataType::UInt32 => {
456 let values = series.u32()?;
457 Arc::new(UInt32Array::from_iter(values))
458 }
459 DataType::UInt64 => {
460 let values = series.u64()?;
461 Arc::new(UInt64Array::from_iter(values))
462 }
463 DataType::Float32 => {
464 let values = series.f32()?;
465 Arc::new(Float32Array::from_iter(values))
466 }
467 DataType::Float64 => {
468 let values = series.f64()?;
469 Arc::new(Float64Array::from_iter(values))
470 }
471 DataType::Date => {
472 let values = series.date()?;
473 Arc::new(Date32Array::from_iter(values.phys.iter()))
474 }
475 DataType::Datetime(unit, _) => {
476 let values = series.datetime()?;
477 let micros = values.phys.iter().map(|opt| match unit {
478 TimeUnit::Milliseconds => opt.map(|value| value.saturating_mul(1000)),
479 TimeUnit::Microseconds => opt,
480 TimeUnit::Nanoseconds => opt.map(|value| value / 1000),
481 });
482 Arc::new(TimestampMicrosecondArray::from_iter(micros))
483 }
484 DataType::Time => {
485 let values = series.time()?;
486 Arc::new(Time64NanosecondArray::from_iter(values.phys.iter()))
487 }
488 DataType::Null => Arc::new(NullArray::new(series.len())),
489 dtype => {
490 return Err(Box::new(RunError(format!(
491 "delta sink does not support dtype {dtype:?} for {}",
492 series.name()
493 ))))
494 }
495 };
496 Ok(array)
497}