iceberg_rust/arrow/
write.rs

1//! Arrow writing module for converting Arrow record batches to Iceberg data files.
2//!
3//! This module provides functionality to:
4//! - Write Arrow record batches to Parquet files
5//! - Handle partitioned data writing
6//! - Support equality delete files
7//! - Manage file sizes and buffering
8//!
9//! The main entry points are:
10//! - [`write_parquet_partitioned`]: Write regular data files
11//! - [`write_equality_deletes_parquet_partitioned`]: Write equality delete files
12//!
13//! The module handles:
14//! - Automatic file size management and splitting
15//! - Parquet compression and encoding
16//! - Partition path generation
17//! - Object store integration
18//! - Metadata collection for written files
19//!
20//! # Example
21//!
22//! ```no_run
23//! # use arrow::record_batch::RecordBatch;
24//! # use futures::Stream;
25//! # use iceberg_rust::table::Table;
26//! # async fn example(table: &Table, batches: impl Stream<Item = Result<RecordBatch, arrow::error::ArrowError>>) {
27//! let data_files = write_parquet_partitioned(
28//!     table,
29//!     batches,
30//!     None // no specific branch
31//! ).await.unwrap();
32//! # }
33//! ```
34
35use futures::{
36    channel::mpsc::{channel, Receiver, Sender},
37    StreamExt, TryStreamExt,
38};
39use object_store::{buffered::BufWriter, ObjectStore};
40use std::fmt::Write;
41use std::sync::Arc;
42use tokio::task::JoinSet;
43
44use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
45use futures::Stream;
46use iceberg_rust_spec::{
47    partition::BoundPartitionField,
48    spec::{manifest::DataFile, schema::Schema, values::Value},
49    table_metadata::{self, WRITE_DATA_PATH, WRITE_OBJECT_STORAGE_ENABLED},
50    util::strip_prefix,
51};
52use parquet::{
53    arrow::AsyncArrowWriter,
54    basic::{Compression, ZstdLevel},
55    file::properties::WriterProperties,
56    format::FileMetaData,
57};
58use uuid::Uuid;
59
60use crate::{
61    error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket, table::Table,
62};
63
64use super::partition::PartitionStream;
65
66const MAX_PARQUET_SIZE: usize = 512_000_000;
67
68#[inline]
69/// Writes Arrow record batches as partitioned Parquet files.
70///
71/// This function writes Arrow record batches to Parquet files, partitioning them according
72/// to the table's partition spec.
73///
74/// # Arguments
75/// * `table` - The Iceberg table to write data for
76/// * `batches` - Stream of Arrow record batches to write
77/// * `branch` - Optional branch name to write to
78///
79/// # Returns
80/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
81///
82/// # Errors
83/// Returns an error if:
84/// * The table metadata cannot be accessed
85/// * The schema projection fails
86/// * The object store operations fail
87/// * The Parquet writing fails
88/// * The partition path generation fails
89pub async fn write_parquet_partitioned(
90    table: &Table,
91    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
92    branch: Option<&str>,
93) -> Result<Vec<DataFile>, ArrowError> {
94    store_parquet_partitioned(table, batches, branch, None).await
95}
96
97#[inline]
98/// Writes equality delete records as partitioned Parquet files.
99///
100/// This function writes Arrow record batches containing equality delete records to Parquet files,
101/// partitioning them according to the table's partition spec.
102///
103/// # Arguments
104/// * `table` - The Iceberg table to write delete records for
105/// * `batches` - Stream of Arrow record batches containing the delete records
106/// * `branch` - Optional branch name to write to
107/// * `equality_ids` - Field IDs that define equality deletion
108///
109/// # Returns
110/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written delete files
111///
112/// # Errors
113/// Returns an error if:
114/// * The table metadata cannot be accessed
115/// * The schema projection fails
116/// * The object store operations fail
117/// * The Parquet writing fails
118/// * The partition path generation fails
119pub async fn write_equality_deletes_parquet_partitioned(
120    table: &Table,
121    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
122    branch: Option<&str>,
123    equality_ids: &[i32],
124) -> Result<Vec<DataFile>, ArrowError> {
125    store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
126}
127
128/// Stores Arrow record batches as partitioned Parquet files.
129///
130/// This is an internal function that handles the core storage logic for both regular data files
131/// and equality delete files.
132///
133/// # Arguments
134/// * `table` - The Iceberg table to store data for
135/// * `batches` - Stream of Arrow record batches to write
136/// * `branch` - Optional branch name to write to
137/// * `equality_ids` - Optional list of field IDs for equality deletes
138///
139/// # Returns
140/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
141///
142/// # Errors
143/// Returns an error if:
144/// * The table metadata cannot be accessed
145/// * The schema projection fails
146/// * The object store operations fail
147/// * The Parquet writing fails
148/// * The partition path generation fails
149async fn store_parquet_partitioned(
150    table: &Table,
151    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
152    branch: Option<&str>,
153    equality_ids: Option<&[i32]>,
154) -> Result<Vec<DataFile>, ArrowError> {
155    let metadata = table.metadata();
156    let object_store = table.object_store();
157    let schema = Arc::new(
158        metadata
159            .current_schema(branch)
160            .map_err(Error::from)?
161            .clone(),
162    );
163    // project the schema on to the equality_ids for equality deletes
164    let schema = if let Some(equality_ids) = equality_ids {
165        Arc::new(schema.project(equality_ids))
166    } else {
167        schema
168    };
169
170    let partition_spec = Arc::new(
171        metadata
172            .default_partition_spec()
173            .map_err(Error::from)?
174            .clone(),
175    );
176
177    let partition_fields = &metadata
178        .current_partition_fields(branch)
179        .map_err(Error::from)?;
180
181    let data_location = &metadata
182        .properties
183        .get(WRITE_DATA_PATH)
184        .map(ToOwned::to_owned)
185        .unwrap_or(metadata.location.clone() + "/data/");
186
187    let arrow_schema: Arc<ArrowSchema> =
188        Arc::new((schema.fields()).try_into().map_err(Error::from)?);
189
190    if partition_fields.is_empty() {
191        let partition_path = if metadata
192            .properties
193            .get(WRITE_OBJECT_STORAGE_ENABLED)
194            .is_some_and(|x| x == "true")
195        {
196            Some("".to_owned())
197        } else {
198            None
199        };
200        let files = write_parquet_files(
201            data_location,
202            &schema,
203            &arrow_schema,
204            partition_fields,
205            partition_path,
206            batches,
207            object_store.clone(),
208            equality_ids,
209        )
210        .await?;
211        Ok(files)
212    } else {
213        let mut streams = PartitionStream::new(Box::pin(batches), partition_fields);
214
215        let mut set = JoinSet::new();
216
217        while let Some(result) = streams.next().await {
218            let (partition_values, batches) = result?;
219            set.spawn({
220                let arrow_schema = arrow_schema.clone();
221                let object_store = object_store.clone();
222                let data_location = data_location.clone();
223                let schema = schema.clone();
224                let partition_spec = partition_spec.clone();
225                let equality_ids = equality_ids.map(Vec::from);
226                let partition_path = if metadata
227                    .properties
228                    .get(WRITE_OBJECT_STORAGE_ENABLED)
229                    .is_some_and(|x| x == "true")
230                {
231                    None
232                } else {
233                    Some(generate_partition_path(
234                        partition_fields,
235                        &partition_values,
236                    )?)
237                };
238                async move {
239                    let partition_fields =
240                        table_metadata::partition_fields(&partition_spec, &schema)
241                            .map_err(Error::from)?;
242                    let files = write_parquet_files(
243                        &data_location,
244                        &schema,
245                        &arrow_schema,
246                        &partition_fields,
247                        partition_path,
248                        batches,
249                        object_store.clone(),
250                        equality_ids.as_deref(),
251                    )
252                    .await?;
253                    Ok::<_, Error>(files)
254                }
255            });
256        }
257
258        let mut files = Vec::new();
259
260        while let Some(handle) = set.join_next().await {
261            files.extend(handle.map_err(Error::from)??);
262        }
263
264        Ok(files)
265    }
266}
267
268type ArrowSender = Sender<(String, FileMetaData)>;
269type ArrowReciever = Receiver<(String, FileMetaData)>;
270
271/// Writes a stream of Arrow record batches to multiple Parquet files.
272///
273/// This internal function handles the low-level details of writing record batches to Parquet files,
274/// managing file sizes, and collecting metadata.
275///
276/// # Arguments
277/// * `data_location` - Base path where data files should be written
278/// * `schema` - Iceberg schema for the data
279/// * `arrow_schema` - Arrow schema for the record batches
280/// * `partition_fields` - List of partition fields if data is partitioned
281/// * `partition_path` - Optional partition path component
282/// * `batches` - Stream of record batches to write
283/// * `object_store` - Object store to write files to
284/// * `equality_ids` - Optional list of field IDs for equality deletes
285///
286/// # Returns
287/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written files
288///
289/// # Errors
290/// Returns an error if:
291/// * File creation fails
292/// * Writing record batches fails
293/// * Object store operations fail
294/// * Metadata collection fails
295#[allow(clippy::too_many_arguments)]
296async fn write_parquet_files(
297    data_location: &str,
298    schema: &Schema,
299    arrow_schema: &ArrowSchema,
300    partition_fields: &[BoundPartitionField<'_>],
301    partition_path: Option<String>,
302    batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
303    object_store: Arc<dyn ObjectStore>,
304    equality_ids: Option<&[i32]>,
305) -> Result<Vec<DataFile>, ArrowError> {
306    let bucket = Bucket::from_path(data_location)?;
307    let (mut writer_sender, writer_reciever): (ArrowSender, ArrowReciever) = channel(1);
308
309    // Create initial writer
310    let initial_writer = create_arrow_writer(
311        data_location,
312        partition_path.clone(),
313        arrow_schema,
314        object_store.clone(),
315    )
316    .await?;
317
318    // Structure to hold writer state
319    struct WriterState {
320        writer: (String, AsyncArrowWriter<BufWriter>),
321        bytes_written: usize,
322    }
323
324    let final_state = batches
325        .try_fold(
326            WriterState {
327                writer: initial_writer,
328                bytes_written: 0,
329            },
330            |mut state, batch| {
331                let object_store = object_store.clone();
332                let data_location = data_location.to_owned();
333                let partition_path = partition_path.clone();
334                let arrow_schema = arrow_schema.clone();
335                let mut writer_sender = writer_sender.clone();
336
337                async move {
338                    let batch_size = record_batch_size(&batch);
339                    let new_size = state.bytes_written + batch_size;
340
341                    if new_size > MAX_PARQUET_SIZE {
342                        // Send current writer to channel
343                        let finished_writer = state.writer;
344                        let file = finished_writer.1.close().await?;
345                        writer_sender
346                            .try_send((finished_writer.0, file))
347                            .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
348
349                        // Create new writer
350                        let new_writer = create_arrow_writer(
351                            &data_location,
352                            partition_path,
353                            &arrow_schema,
354                            object_store,
355                        )
356                        .await?;
357
358                        state.writer = new_writer;
359                        state.bytes_written = batch_size;
360                    } else {
361                        state.bytes_written = new_size;
362                        if new_size % 64_000_000 >= 32_000_000 {
363                            state.writer.1.flush().await?;
364                        }
365                    }
366
367                    state.writer.1.write(&batch).await?;
368                    Ok(state)
369                }
370            },
371        )
372        .await?;
373
374    // Handle the last writer
375    let file = final_state.writer.1.close().await?;
376    writer_sender
377        .try_send((final_state.writer.0, file))
378        .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
379    writer_sender.close_channel();
380
381    if final_state.bytes_written == 0 {
382        return Ok(Vec::new());
383    }
384
385    writer_reciever
386        .then(|writer| {
387            let object_store = object_store.clone();
388            let bucket = bucket.to_string();
389            async move {
390                let metadata = writer.1;
391                let size = object_store
392                    .head(&writer.0.as_str().into())
393                    .await
394                    .map_err(|err| ArrowError::from_external_error(err.into()))?
395                    .size;
396                Ok(parquet_to_datafile(
397                    &(bucket + &writer.0),
398                    size,
399                    &metadata,
400                    schema,
401                    partition_fields,
402                    equality_ids,
403                )?)
404            }
405        })
406        .try_collect::<Vec<_>>()
407        .await
408}
409
410/// Generates a partition path string from partition fields and their values.
411///
412/// Creates a path string in the format "field1=value1/field2=value2/..." for each
413/// partition field and its corresponding value.
414///
415/// # Arguments
416/// * `partition_fields` - List of bound partition fields defining the partitioning
417/// * `partition_values` - List of values for each partition field
418///
419/// # Returns
420/// * `Result<String, ArrowError>` - The generated partition path string
421///
422/// # Errors
423/// Returns an error if:
424/// * The partition field name cannot be processed
425/// * The partition value cannot be converted to a string
426#[inline]
427fn generate_partition_path(
428    partition_fields: &[BoundPartitionField<'_>],
429    partition_values: &[Value],
430) -> Result<String, ArrowError> {
431    partition_fields
432        .iter()
433        .zip(partition_values.iter())
434        .map(|(field, value)| {
435            let name = field.name().to_owned();
436            Ok(name + "=" + &value.to_string() + "/")
437        })
438        .collect::<Result<String, ArrowError>>()
439}
440
441/// Creates a new Arrow writer for writing record batches to a Parquet file.
442///
443/// This internal function creates a new buffered writer and configures it with
444/// appropriate Parquet compression settings.
445///
446/// # Arguments
447/// * `data_location` - Base path where data files should be written
448/// * `partition_path` - Optional partition path component
449/// * `schema` - Arrow schema for the record batches
450/// * `object_store` - Object store to write files to
451///
452/// # Returns
453/// * `Result<(String, AsyncArrowWriter<BufWriter>), ArrowError>` - The file path and configured writer
454///
455/// # Errors
456/// Returns an error if:
457/// * Random number generation fails
458/// * The writer properties cannot be configured
459/// * The Arrow writer cannot be created
460async fn create_arrow_writer(
461    data_location: &str,
462    partition_path: Option<String>,
463    schema: &arrow::datatypes::Schema,
464    object_store: Arc<dyn ObjectStore>,
465) -> Result<(String, AsyncArrowWriter<BufWriter>), ArrowError> {
466    let mut rand = [0u8; 6];
467    getrandom::fill(&mut rand)
468        .map_err(|err| ArrowError::ExternalError(Box::new(err)))
469        .unwrap();
470
471    let path = partition_path.unwrap_or_else(|| {
472        rand[0..3]
473            .iter()
474            .fold(String::with_capacity(8), |mut acc, x| {
475                write!(&mut acc, "{x:x}").unwrap();
476                acc
477            })
478            + "/"
479    });
480
481    let parquet_path =
482        strip_prefix(data_location) + &path + &Uuid::now_v1(&rand).to_string() + ".parquet";
483
484    let writer = BufWriter::new(object_store.clone(), parquet_path.clone().into());
485
486    Ok((
487        parquet_path,
488        AsyncArrowWriter::try_new(
489            writer,
490            Arc::new(schema.clone()),
491            Some(
492                WriterProperties::builder()
493                    .set_compression(Compression::ZSTD(ZstdLevel::try_new(1)?))
494                    .build(),
495            ),
496        )?,
497    ))
498}
499
500/// Calculates the approximate size in bytes of an Arrow record batch.
501///
502/// This function estimates the memory footprint of a record batch by multiplying
503/// the total size of all fields by the number of rows.
504///
505/// # Arguments
506/// * `batch` - The record batch to calculate size for
507///
508/// # Returns
509/// * `usize` - Estimated size of the record batch in bytes
510#[inline]
511fn record_batch_size(batch: &RecordBatch) -> usize {
512    batch
513        .schema()
514        .fields()
515        .iter()
516        .fold(0, |acc, x| acc + x.size())
517        * batch.num_rows()
518}
519
520#[cfg(test)]
521mod tests {
522    use iceberg_rust_spec::{
523        partition::BoundPartitionField,
524        types::{StructField, Type},
525    };
526
527    use crate::spec::{
528        partition::{PartitionField, Transform},
529        values::Value,
530    };
531
532    #[test]
533    fn test_generate_partition_location_success() {
534        let field = StructField {
535            id: 0,
536            name: "date".to_owned(),
537            required: false,
538            field_type: Type::Primitive(iceberg_rust_spec::types::PrimitiveType::Date),
539            doc: None,
540        };
541        let partfield = PartitionField::new(1, 1001, "month", Transform::Month);
542        let partition_fields = vec![BoundPartitionField::new(&partfield, &field)];
543        let partition_values = vec![Value::Int(10)];
544
545        let result = super::generate_partition_path(&partition_fields, &partition_values);
546
547        assert!(result.is_ok());
548        assert_eq!(result.unwrap(), "month=10/");
549    }
550}