iceberg_rust/arrow/
write.rs

1//! Arrow writing module for converting Arrow record batches to Iceberg data files.
2//!
3//! This module provides functionality to:
4//! - Write Arrow record batches to Parquet files
5//! - Handle partitioned data writing
6//! - Support equality delete files
7//! - Manage file sizes and buffering
8//!
9//! The main entry points are:
10//! - [`write_parquet_partitioned`]: Write regular data files
11//! - [`write_equality_deletes_parquet_partitioned`]: Write equality delete files
12//!
13//! The module handles:
14//! - Automatic file size management and splitting
15//! - Parquet compression and encoding
16//! - Partition path generation
17//! - Object store integration
18//! - Metadata collection for written files
19//!
20//! # Example
21//!
22//! ```no_run
23//! # use arrow::record_batch::RecordBatch;
24//! # use futures::Stream;
25//! # use iceberg_rust::table::Table;
26//! # async fn example(table: &Table, batches: impl Stream<Item = Result<RecordBatch, arrow::error::ArrowError>>) {
27//! let data_files = write_parquet_partitioned(
28//!     table,
29//!     batches,
30//!     None // no specific branch
31//! ).await.unwrap();
32//! # }
33//! ```
34
35use futures::{
36    channel::mpsc::{channel, Receiver, Sender},
37    SinkExt, StreamExt, TryStreamExt,
38};
39use lru::LruCache;
40use object_store::{buffered::BufWriter, ObjectStore};
41use std::sync::Arc;
42use std::{fmt::Write, thread::available_parallelism};
43use tokio::task::JoinSet;
44use tracing::instrument;
45
46use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
47use futures::Stream;
48use iceberg_rust_spec::{
49    partition::BoundPartitionField,
50    spec::{manifest::DataFile, schema::Schema, values::Value},
51    table_metadata::{self, WRITE_DATA_PATH, WRITE_OBJECT_STORAGE_ENABLED},
52    util::strip_prefix,
53};
54use parquet::{
55    arrow::AsyncArrowWriter,
56    basic::{Compression, ZstdLevel},
57    file::properties::WriterProperties,
58    format::FileMetaData,
59};
60use uuid::Uuid;
61
62use crate::{
63    error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket, table::Table,
64};
65
66use super::partition::partition_record_batch;
67
68const MAX_PARQUET_SIZE: usize = 512_000_000;
69const COMPRESSION_FACTOR: usize = 200;
70
71#[instrument(skip(table, batches), fields(table_name = %table.identifier().name()))]
72/// Writes Arrow record batches as partitioned Parquet files.
73///
74/// This function writes Arrow record batches to Parquet files, partitioning them according
75/// to the table's partition spec.
76///
77/// # Arguments
78/// * `table` - The Iceberg table to write data for
79/// * `batches` - Stream of Arrow record batches to write
80/// * `branch` - Optional branch name to write to
81///
82/// # Returns
83/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
84///
85/// # Errors
86/// Returns an error if:
87/// * The table metadata cannot be accessed
88/// * The schema projection fails
89/// * The object store operations fail
90/// * The Parquet writing fails
91/// * The partition path generation fails
92pub async fn write_parquet_partitioned(
93    table: &Table,
94    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
95    branch: Option<&str>,
96) -> Result<Vec<DataFile>, ArrowError> {
97    store_parquet_partitioned(table, batches, branch, None).await
98}
99
100#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
101/// Writes equality delete records as partitioned Parquet files.
102///
103/// This function writes Arrow record batches containing equality delete records to Parquet files,
104/// partitioning them according to the table's partition spec.
105///
106/// # Arguments
107/// * `table` - The Iceberg table to write delete records for
108/// * `batches` - Stream of Arrow record batches containing the delete records
109/// * `branch` - Optional branch name to write to
110/// * `equality_ids` - Field IDs that define equality deletion
111///
112/// # Returns
113/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written delete files
114///
115/// # Errors
116/// Returns an error if:
117/// * The table metadata cannot be accessed
118/// * The schema projection fails
119/// * The object store operations fail
120/// * The Parquet writing fails
121/// * The partition path generation fails
122pub async fn write_equality_deletes_parquet_partitioned(
123    table: &Table,
124    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
125    branch: Option<&str>,
126    equality_ids: &[i32],
127) -> Result<Vec<DataFile>, ArrowError> {
128    store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
129}
130
131#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
132/// Stores Arrow record batches as partitioned Parquet files.
133///
134/// This is an internal function that handles the core storage logic for both regular data files
135/// and equality delete files.
136///
137/// # Arguments
138/// * `table` - The Iceberg table to store data for
139/// * `batches` - Stream of Arrow record batches to write
140/// * `branch` - Optional branch name to write to
141/// * `equality_ids` - Optional list of field IDs for equality deletes
142///
143/// # Returns
144/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
145///
146/// # Errors
147/// Returns an error if:
148/// * The table metadata cannot be accessed
149/// * The schema projection fails
150/// * The object store operations fail
151/// * The Parquet writing fails
152/// * The partition path generation fails
153async fn store_parquet_partitioned(
154    table: &Table,
155    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
156    branch: Option<&str>,
157    equality_ids: Option<&[i32]>,
158) -> Result<Vec<DataFile>, ArrowError> {
159    let metadata = table.metadata();
160    let object_store = table.object_store();
161    let schema = Arc::new(
162        metadata
163            .current_schema(branch)
164            .map_err(Error::from)?
165            .clone(),
166    );
167    // project the schema on to the equality_ids for equality deletes
168    let schema = if let Some(equality_ids) = equality_ids {
169        Arc::new(schema.project(equality_ids))
170    } else {
171        schema
172    };
173
174    let partition_spec = Arc::new(
175        metadata
176            .default_partition_spec()
177            .map_err(Error::from)?
178            .clone(),
179    );
180
181    let partition_fields = &metadata
182        .current_partition_fields(branch)
183        .map_err(Error::from)?;
184
185    let data_location = &metadata
186        .properties
187        .get(WRITE_DATA_PATH)
188        .map(ToOwned::to_owned)
189        .unwrap_or(metadata.location.clone() + "/data/");
190
191    let arrow_schema: Arc<ArrowSchema> =
192        Arc::new((schema.fields()).try_into().map_err(Error::from)?);
193
194    if partition_fields.is_empty() {
195        let partition_path = if metadata
196            .properties
197            .get(WRITE_OBJECT_STORAGE_ENABLED)
198            .is_some_and(|x| x == "true")
199        {
200            Some("".to_owned())
201        } else {
202            None
203        };
204        let files = write_parquet_files(
205            data_location,
206            &schema,
207            &arrow_schema,
208            partition_fields,
209            partition_path,
210            batches,
211            object_store.clone(),
212            equality_ids,
213        )
214        .await?;
215        Ok(files)
216    } else {
217        let mut senders: LruCache<Vec<Value>, Sender<Result<RecordBatch, ArrowError>>> =
218            LruCache::unbounded();
219
220        let mut set = JoinSet::new();
221        // let receiver_handles = Vec::new();
222
223        let mut batches = Box::pin(batches);
224
225        while let Some(batch) = batches.next().await {
226            // Limit the number of concurrent senders
227            if senders.len() > available_parallelism().unwrap().get() {
228                if let Some((_, mut sender)) = senders.pop_lru() {
229                    sender.close_channel();
230                }
231            }
232
233            for result in partition_record_batch(&batch?, partition_fields)? {
234                let (partition_values, batch) = result?;
235
236                if let Some(sender) = senders.get_mut(&partition_values) {
237                    sender.send(Ok(batch)).await.unwrap();
238                } else {
239                    let (mut sender, reciever) = channel(1);
240                    sender.send(Ok(batch)).await.unwrap();
241                    senders.push(partition_values.clone(), sender);
242                    set.spawn({
243                        let arrow_schema = arrow_schema.clone();
244                        let object_store = object_store.clone();
245                        let data_location = data_location.clone();
246                        let schema = schema.clone();
247                        let partition_spec = partition_spec.clone();
248                        let equality_ids = equality_ids.map(Vec::from);
249                        let partition_path = if metadata
250                            .properties
251                            .get(WRITE_OBJECT_STORAGE_ENABLED)
252                            .is_some_and(|x| x == "true")
253                        {
254                            None
255                        } else {
256                            Some(generate_partition_path(
257                                partition_fields,
258                                &partition_values,
259                            )?)
260                        };
261                        async move {
262                            let partition_fields =
263                                table_metadata::partition_fields(&partition_spec, &schema)
264                                    .map_err(Error::from)?;
265                            let files = write_parquet_files(
266                                &data_location,
267                                &schema,
268                                &arrow_schema,
269                                &partition_fields,
270                                partition_path,
271                                reciever,
272                                object_store.clone(),
273                                equality_ids.as_deref(),
274                            )
275                            .await?;
276                            Ok::<_, Error>(files)
277                        }
278                    });
279                };
280            }
281        }
282
283        while let Some((_, mut sender)) = senders.pop_lru() {
284            sender.close_channel();
285        }
286
287        let mut files = Vec::new();
288
289        while let Some(handle) = set.join_next().await {
290            files.extend(handle.map_err(Error::from)??);
291        }
292
293        Ok(files)
294    }
295}
296
297type ArrowSender = Sender<(String, FileMetaData)>;
298type ArrowReciever = Receiver<(String, FileMetaData)>;
299
300#[instrument(skip(batches, object_store), fields(data_location, equality_ids = ?equality_ids))]
301/// Writes a stream of Arrow record batches to multiple Parquet files.
302///
303/// This internal function handles the low-level details of writing record batches to Parquet files,
304/// managing file sizes, and collecting metadata.
305///
306/// # Arguments
307/// * `data_location` - Base path where data files should be written
308/// * `schema` - Iceberg schema for the data
309/// * `arrow_schema` - Arrow schema for the record batches
310/// * `partition_fields` - List of partition fields if data is partitioned
311/// * `partition_path` - Optional partition path component
312/// * `batches` - Stream of record batches to write
313/// * `object_store` - Object store to write files to
314/// * `equality_ids` - Optional list of field IDs for equality deletes
315///
316/// # Returns
317/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written files
318///
319/// # Errors
320/// Returns an error if:
321/// * File creation fails
322/// * Writing record batches fails
323/// * Object store operations fail
324/// * Metadata collection fails
325#[allow(clippy::too_many_arguments)]
326async fn write_parquet_files(
327    data_location: &str,
328    schema: &Schema,
329    arrow_schema: &ArrowSchema,
330    partition_fields: &[BoundPartitionField<'_>],
331    partition_path: Option<String>,
332    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
333    object_store: Arc<dyn ObjectStore>,
334    equality_ids: Option<&[i32]>,
335) -> Result<Vec<DataFile>, ArrowError> {
336    let bucket = Bucket::from_path(data_location)?;
337    let (mut writer_sender, writer_reciever): (ArrowSender, ArrowReciever) = channel(0);
338
339    // Create initial writer
340    let initial_writer = create_arrow_writer(
341        data_location,
342        partition_path.clone(),
343        arrow_schema,
344        object_store.clone(),
345    )
346    .await?;
347
348    // Structure to hold writer state
349    struct WriterState {
350        writer: (String, AsyncArrowWriter<BufWriter>),
351        bytes_written: usize,
352    }
353
354    let final_state = batches
355        .try_fold(
356            WriterState {
357                writer: initial_writer,
358                bytes_written: 0,
359            },
360            |mut state, batch| {
361                let object_store = object_store.clone();
362                let data_location = data_location.to_owned();
363                let partition_path = partition_path.clone();
364                let arrow_schema = arrow_schema.clone();
365                let mut writer_sender = writer_sender.clone();
366
367                async move {
368                    let batch_size = record_batch_size(&batch);
369                    let new_size = state.bytes_written + batch_size;
370
371                    if new_size > COMPRESSION_FACTOR * MAX_PARQUET_SIZE {
372                        // Send current writer to channel
373                        let finished_writer = state.writer;
374                        let file = finished_writer.1.close().await?;
375                        writer_sender
376                            .try_send((finished_writer.0, file))
377                            .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
378
379                        // Create new writer
380                        let new_writer = create_arrow_writer(
381                            &data_location,
382                            partition_path,
383                            &arrow_schema,
384                            object_store,
385                        )
386                        .await?;
387
388                        state.writer = new_writer;
389                        state.bytes_written = batch_size;
390                    } else {
391                        state.bytes_written = new_size;
392                    }
393
394                    state.writer.1.write(&batch).await?;
395                    Ok(state)
396                }
397            },
398        )
399        .await?;
400
401    // Handle the last writer
402    let file = final_state.writer.1.close().await?;
403    writer_sender
404        .try_send((final_state.writer.0, file))
405        .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
406    writer_sender.close_channel();
407
408    if final_state.bytes_written == 0 {
409        return Ok(Vec::new());
410    }
411
412    writer_reciever
413        .then(|writer| {
414            let object_store = object_store.clone();
415            let bucket = bucket.to_string();
416            async move {
417                let metadata = writer.1;
418                let size = object_store
419                    .head(&writer.0.as_str().into())
420                    .await
421                    .map_err(|err| ArrowError::from_external_error(err.into()))?
422                    .size;
423                Ok(parquet_to_datafile(
424                    &(bucket + &writer.0),
425                    size,
426                    &metadata,
427                    schema,
428                    partition_fields,
429                    equality_ids,
430                )?)
431            }
432        })
433        .try_collect::<Vec<_>>()
434        .await
435}
436
437/// Generates a partition path string from partition fields and their values.
438///
439/// Creates a path string in the format "field1=value1/field2=value2/..." for each
440/// partition field and its corresponding value.
441///
442/// # Arguments
443/// * `partition_fields` - List of bound partition fields defining the partitioning
444/// * `partition_values` - List of values for each partition field
445///
446/// # Returns
447/// * `Result<String, ArrowError>` - The generated partition path string
448///
449/// # Errors
450/// Returns an error if:
451/// * The partition field name cannot be processed
452/// * The partition value cannot be converted to a string
453#[inline]
454pub fn generate_partition_path(
455    partition_fields: &[BoundPartitionField<'_>],
456    partition_values: &[Value],
457) -> Result<String, ArrowError> {
458    partition_fields
459        .iter()
460        .zip(partition_values.iter())
461        .map(|(field, value)| {
462            let name = field.name().to_owned();
463            Ok(name + "=" + &value.to_string() + "/")
464        })
465        .collect::<Result<String, ArrowError>>()
466}
467
468#[instrument(skip(schema, object_store), fields(data_location))]
469/// Creates a new Arrow writer for writing record batches to a Parquet file.
470///
471/// This internal function creates a new buffered writer and configures it with
472/// appropriate Parquet compression settings.
473///
474/// # Arguments
475/// * `data_location` - Base path where data files should be written
476/// * `partition_path` - Optional partition path component
477/// * `schema` - Arrow schema for the record batches
478/// * `object_store` - Object store to write files to
479///
480/// # Returns
481/// * `Result<(String, AsyncArrowWriter<BufWriter>), ArrowError>` - The file path and configured writer
482///
483/// # Errors
484/// Returns an error if:
485/// * Random number generation fails
486/// * The writer properties cannot be configured
487/// * The Arrow writer cannot be created
488async fn create_arrow_writer(
489    data_location: &str,
490    partition_path: Option<String>,
491    schema: &arrow::datatypes::Schema,
492    object_store: Arc<dyn ObjectStore>,
493) -> Result<(String, AsyncArrowWriter<BufWriter>), ArrowError> {
494    let parquet_path = generate_file_path(data_location, partition_path);
495
496    let writer = BufWriter::new(object_store.clone(), parquet_path.clone().into());
497
498    Ok((
499        parquet_path,
500        AsyncArrowWriter::try_new(
501            writer,
502            Arc::new(schema.clone()),
503            Some(
504                WriterProperties::builder()
505                    .set_compression(Compression::ZSTD(ZstdLevel::try_new(1)?))
506                    .build(),
507            ),
508        )?,
509    ))
510}
511
512/// Generates a unique file path for a Parquet data file.
513///
514/// This function creates a unique file path by combining the data location, partition path,
515/// and a UUID-based filename. If no partition path is provided, it generates a random
516/// directory path using hex-encoded random bytes.
517///
518/// # Arguments
519/// * `data_location` - Base directory where data files should be stored
520/// * `partition_path` - Optional partition path component (e.g., "year=2024/month=01/")
521///
522/// # Returns
523/// * `String` - Complete file path ending with ".parquet"
524///
525/// # File Path Structure
526/// The generated path follows this pattern:
527/// * With partition: `{data_location}/{partition_path}{uuid}.parquet`
528/// * Without partition: `{data_location}/{random_hex}/{uuid}.parquet`
529///
530/// # Examples
531/// ```
532/// use iceberg_rust::arrow::write::generate_file_path;
533///
534/// // With partition path
535/// let path1 = generate_file_path("/data", Some("year=2024/month=01/".to_string()));
536/// // Result: "/data/year=2024/month=01/01234567-89ab-cdef-0123-456789abcdef.parquet"
537///
538/// // Without partition path (generates random directory)
539/// let path2 = generate_file_path("/data", None);
540/// // Result: "/data/a1b/01234567-89ab-cdef-0123-456789abcdef.parquet"
541/// ```
542///
543/// # Implementation Details
544/// * Uses cryptographically secure random bytes for UUID generation
545/// * Creates a UUID v1 timestamp-based identifier for uniqueness
546/// * Random directory names use 3 bytes of entropy (6 hex characters)
547/// * Automatically strips path prefixes using `strip_prefix()`
548pub fn generate_file_path(data_location: &str, partition_path: Option<String>) -> String {
549    let mut rand = [0u8; 6];
550    getrandom::fill(&mut rand)
551        .map_err(|err| ArrowError::ExternalError(Box::new(err)))
552        .unwrap();
553
554    let path = partition_path.unwrap_or_else(|| {
555        rand[0..3]
556            .iter()
557            .fold(String::with_capacity(8), |mut acc, x| {
558                write!(&mut acc, "{x:x}").unwrap();
559                acc
560            })
561            + "/"
562    });
563
564    strip_prefix(data_location) + &path + &Uuid::now_v1(&rand).to_string() + ".parquet"
565}
566
567/// Calculates the approximate size in bytes of an Arrow record batch.
568///
569/// This function estimates the memory footprint of a record batch by multiplying
570/// the total size of all fields by the number of rows.
571///
572/// # Arguments
573/// * `batch` - The record batch to calculate size for
574///
575/// # Returns
576/// * `usize` - Estimated size of the record batch in bytes
577#[inline]
578fn record_batch_size(batch: &RecordBatch) -> usize {
579    batch
580        .schema()
581        .fields()
582        .iter()
583        .fold(0, |acc, x| acc + x.size())
584        * batch.num_rows()
585}
586
587#[cfg(test)]
588mod tests {
589    use iceberg_rust_spec::{
590        partition::BoundPartitionField,
591        types::{StructField, Type},
592    };
593
594    use crate::spec::{
595        partition::{PartitionField, Transform},
596        values::Value,
597    };
598
599    #[test]
600    fn test_generate_partition_location_success() {
601        let field = StructField {
602            id: 0,
603            name: "date".to_owned(),
604            required: false,
605            field_type: Type::Primitive(iceberg_rust_spec::types::PrimitiveType::Date),
606            doc: None,
607        };
608        let partfield = PartitionField::new(1, 1001, "month", Transform::Month);
609        let partition_fields = vec![BoundPartitionField::new(&partfield, &field)];
610        let partition_values = vec![Value::Int(10)];
611
612        let result = super::generate_partition_path(&partition_fields, &partition_values);
613
614        assert!(result.is_ok());
615        assert_eq!(result.unwrap(), "month=10/");
616    }
617}