iceberg_rust/arrow/write.rs
1//! Arrow writing module for converting Arrow record batches to Iceberg data files.
2//!
3//! This module provides functionality to:
4//! - Write Arrow record batches to Parquet files
5//! - Handle partitioned data writing
6//! - Support equality delete files
7//! - Manage file sizes and buffering
8//!
9//! The main entry points are:
10//! - [`write_parquet_partitioned`]: Write regular data files
11//! - [`write_equality_deletes_parquet_partitioned`]: Write equality delete files
12//!
13//! The module handles:
14//! - Automatic file size management and splitting
15//! - Parquet compression and encoding
16//! - Partition path generation
17//! - Object store integration
18//! - Metadata collection for written files
19//!
20//! # Example
21//!
22//! ```no_run
23//! # use arrow::record_batch::RecordBatch;
24//! # use futures::Stream;
25//! # use iceberg_rust::table::Table;
26//! # async fn example(table: &Table, batches: impl Stream<Item = Result<RecordBatch, arrow::error::ArrowError>>) {
27//! let data_files = write_parquet_partitioned(
28//! table,
29//! batches,
30//! None // no specific branch
31//! ).await.unwrap();
32//! # }
33//! ```
34
35use futures::{
36 channel::mpsc::{channel, Receiver, Sender},
37 SinkExt, StreamExt, TryStreamExt,
38};
39use lru::LruCache;
40use object_store::{buffered::BufWriter, ObjectStore};
41use std::sync::Arc;
42use std::{fmt::Write, thread::available_parallelism};
43use tokio::task::JoinSet;
44use tracing::instrument;
45
46use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
47use futures::Stream;
48use iceberg_rust_spec::{
49 partition::BoundPartitionField,
50 spec::{manifest::DataFile, schema::Schema, values::Value},
51 table_metadata::{self, WRITE_DATA_PATH, WRITE_OBJECT_STORAGE_ENABLED},
52 util::strip_prefix,
53};
54use parquet::{
55 arrow::AsyncArrowWriter,
56 basic::{Compression, ZstdLevel},
57 file::properties::WriterProperties,
58 format::FileMetaData,
59};
60use uuid::Uuid;
61
62use crate::{
63 error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket, table::Table,
64};
65
66use super::partition::partition_record_batch;
67
68const MAX_PARQUET_SIZE: usize = 512_000_000;
69const COMPRESSION_FACTOR: usize = 200;
70
71#[instrument(skip(table, batches), fields(table_name = %table.identifier().name()))]
72/// Writes Arrow record batches as partitioned Parquet files.
73///
74/// This function writes Arrow record batches to Parquet files, partitioning them according
75/// to the table's partition spec.
76///
77/// # Arguments
78/// * `table` - The Iceberg table to write data for
79/// * `batches` - Stream of Arrow record batches to write
80/// * `branch` - Optional branch name to write to
81///
82/// # Returns
83/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
84///
85/// # Errors
86/// Returns an error if:
87/// * The table metadata cannot be accessed
88/// * The schema projection fails
89/// * The object store operations fail
90/// * The Parquet writing fails
91/// * The partition path generation fails
92pub async fn write_parquet_partitioned(
93 table: &Table,
94 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
95 branch: Option<&str>,
96) -> Result<Vec<DataFile>, ArrowError> {
97 store_parquet_partitioned(table, batches, branch, None).await
98}
99
100#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
101/// Writes equality delete records as partitioned Parquet files.
102///
103/// This function writes Arrow record batches containing equality delete records to Parquet files,
104/// partitioning them according to the table's partition spec.
105///
106/// # Arguments
107/// * `table` - The Iceberg table to write delete records for
108/// * `batches` - Stream of Arrow record batches containing the delete records
109/// * `branch` - Optional branch name to write to
110/// * `equality_ids` - Field IDs that define equality deletion
111///
112/// # Returns
113/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written delete files
114///
115/// # Errors
116/// Returns an error if:
117/// * The table metadata cannot be accessed
118/// * The schema projection fails
119/// * The object store operations fail
120/// * The Parquet writing fails
121/// * The partition path generation fails
122pub async fn write_equality_deletes_parquet_partitioned(
123 table: &Table,
124 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
125 branch: Option<&str>,
126 equality_ids: &[i32],
127) -> Result<Vec<DataFile>, ArrowError> {
128 store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
129}
130
131#[instrument(skip(table, batches), fields(table_name = %table.identifier().name(), equality_ids = ?equality_ids))]
132/// Stores Arrow record batches as partitioned Parquet files.
133///
134/// This is an internal function that handles the core storage logic for both regular data files
135/// and equality delete files.
136///
137/// # Arguments
138/// * `table` - The Iceberg table to store data for
139/// * `batches` - Stream of Arrow record batches to write
140/// * `branch` - Optional branch name to write to
141/// * `equality_ids` - Optional list of field IDs for equality deletes
142///
143/// # Returns
144/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written data files
145///
146/// # Errors
147/// Returns an error if:
148/// * The table metadata cannot be accessed
149/// * The schema projection fails
150/// * The object store operations fail
151/// * The Parquet writing fails
152/// * The partition path generation fails
153async fn store_parquet_partitioned(
154 table: &Table,
155 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
156 branch: Option<&str>,
157 equality_ids: Option<&[i32]>,
158) -> Result<Vec<DataFile>, ArrowError> {
159 let metadata = table.metadata();
160 let object_store = table.object_store();
161 let schema = Arc::new(
162 metadata
163 .current_schema(branch)
164 .map_err(Error::from)?
165 .clone(),
166 );
167 // project the schema on to the equality_ids for equality deletes
168 let schema = if let Some(equality_ids) = equality_ids {
169 Arc::new(schema.project(equality_ids))
170 } else {
171 schema
172 };
173
174 let partition_spec = Arc::new(
175 metadata
176 .default_partition_spec()
177 .map_err(Error::from)?
178 .clone(),
179 );
180
181 let partition_fields = &metadata
182 .current_partition_fields(branch)
183 .map_err(Error::from)?;
184
185 let data_location = &metadata
186 .properties
187 .get(WRITE_DATA_PATH)
188 .map(ToOwned::to_owned)
189 .unwrap_or(metadata.location.clone() + "/data/");
190
191 let arrow_schema: Arc<ArrowSchema> =
192 Arc::new((schema.fields()).try_into().map_err(Error::from)?);
193
194 if partition_fields.is_empty() {
195 let partition_path = if metadata
196 .properties
197 .get(WRITE_OBJECT_STORAGE_ENABLED)
198 .is_some_and(|x| x == "true")
199 {
200 Some("".to_owned())
201 } else {
202 None
203 };
204 let files = write_parquet_files(
205 data_location,
206 &schema,
207 &arrow_schema,
208 partition_fields,
209 partition_path,
210 batches,
211 object_store.clone(),
212 equality_ids,
213 )
214 .await?;
215 Ok(files)
216 } else {
217 let mut senders: LruCache<Vec<Value>, Sender<Result<RecordBatch, ArrowError>>> =
218 LruCache::unbounded();
219
220 let mut set = JoinSet::new();
221 // let receiver_handles = Vec::new();
222
223 let mut batches = Box::pin(batches);
224
225 while let Some(batch) = batches.next().await {
226 // Limit the number of concurrent senders
227 if senders.len() > available_parallelism().unwrap().get() {
228 if let Some((_, mut sender)) = senders.pop_lru() {
229 sender.close_channel();
230 }
231 }
232
233 for result in partition_record_batch(&batch?, partition_fields)? {
234 let (partition_values, batch) = result?;
235
236 if let Some(sender) = senders.get_mut(&partition_values) {
237 sender.send(Ok(batch)).await.unwrap();
238 } else {
239 let (mut sender, reciever) = channel(1);
240 sender.send(Ok(batch)).await.unwrap();
241 senders.push(partition_values.clone(), sender);
242 set.spawn({
243 let arrow_schema = arrow_schema.clone();
244 let object_store = object_store.clone();
245 let data_location = data_location.clone();
246 let schema = schema.clone();
247 let partition_spec = partition_spec.clone();
248 let equality_ids = equality_ids.map(Vec::from);
249 let partition_path = if metadata
250 .properties
251 .get(WRITE_OBJECT_STORAGE_ENABLED)
252 .is_some_and(|x| x == "true")
253 {
254 None
255 } else {
256 Some(generate_partition_path(
257 partition_fields,
258 &partition_values,
259 )?)
260 };
261 async move {
262 let partition_fields =
263 table_metadata::partition_fields(&partition_spec, &schema)
264 .map_err(Error::from)?;
265 let files = write_parquet_files(
266 &data_location,
267 &schema,
268 &arrow_schema,
269 &partition_fields,
270 partition_path,
271 reciever,
272 object_store.clone(),
273 equality_ids.as_deref(),
274 )
275 .await?;
276 Ok::<_, Error>(files)
277 }
278 });
279 };
280 }
281 }
282
283 while let Some((_, mut sender)) = senders.pop_lru() {
284 sender.close_channel();
285 }
286
287 let mut files = Vec::new();
288
289 while let Some(handle) = set.join_next().await {
290 files.extend(handle.map_err(Error::from)??);
291 }
292
293 Ok(files)
294 }
295}
296
297type ArrowSender = Sender<(String, FileMetaData)>;
298type ArrowReciever = Receiver<(String, FileMetaData)>;
299
300#[instrument(skip(batches, object_store), fields(data_location, equality_ids = ?equality_ids))]
301/// Writes a stream of Arrow record batches to multiple Parquet files.
302///
303/// This internal function handles the low-level details of writing record batches to Parquet files,
304/// managing file sizes, and collecting metadata.
305///
306/// # Arguments
307/// * `data_location` - Base path where data files should be written
308/// * `schema` - Iceberg schema for the data
309/// * `arrow_schema` - Arrow schema for the record batches
310/// * `partition_fields` - List of partition fields if data is partitioned
311/// * `partition_path` - Optional partition path component
312/// * `batches` - Stream of record batches to write
313/// * `object_store` - Object store to write files to
314/// * `equality_ids` - Optional list of field IDs for equality deletes
315///
316/// # Returns
317/// * `Result<Vec<DataFile>, ArrowError>` - List of metadata for the written files
318///
319/// # Errors
320/// Returns an error if:
321/// * File creation fails
322/// * Writing record batches fails
323/// * Object store operations fail
324/// * Metadata collection fails
325#[allow(clippy::too_many_arguments)]
326async fn write_parquet_files(
327 data_location: &str,
328 schema: &Schema,
329 arrow_schema: &ArrowSchema,
330 partition_fields: &[BoundPartitionField<'_>],
331 partition_path: Option<String>,
332 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
333 object_store: Arc<dyn ObjectStore>,
334 equality_ids: Option<&[i32]>,
335) -> Result<Vec<DataFile>, ArrowError> {
336 let bucket = Bucket::from_path(data_location)?;
337 let (mut writer_sender, writer_reciever): (ArrowSender, ArrowReciever) = channel(0);
338
339 // Create initial writer
340 let initial_writer = create_arrow_writer(
341 data_location,
342 partition_path.clone(),
343 arrow_schema,
344 object_store.clone(),
345 )
346 .await?;
347
348 // Structure to hold writer state
349 struct WriterState {
350 writer: (String, AsyncArrowWriter<BufWriter>),
351 bytes_written: usize,
352 }
353
354 let final_state = batches
355 .try_fold(
356 WriterState {
357 writer: initial_writer,
358 bytes_written: 0,
359 },
360 |mut state, batch| {
361 let object_store = object_store.clone();
362 let data_location = data_location.to_owned();
363 let partition_path = partition_path.clone();
364 let arrow_schema = arrow_schema.clone();
365 let mut writer_sender = writer_sender.clone();
366
367 async move {
368 let batch_size = record_batch_size(&batch);
369 let new_size = state.bytes_written + batch_size;
370
371 if new_size > COMPRESSION_FACTOR * MAX_PARQUET_SIZE {
372 // Send current writer to channel
373 let finished_writer = state.writer;
374 let file = finished_writer.1.close().await?;
375 writer_sender
376 .try_send((finished_writer.0, file))
377 .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
378
379 // Create new writer
380 let new_writer = create_arrow_writer(
381 &data_location,
382 partition_path,
383 &arrow_schema,
384 object_store,
385 )
386 .await?;
387
388 state.writer = new_writer;
389 state.bytes_written = batch_size;
390 } else {
391 state.bytes_written = new_size;
392 }
393
394 state.writer.1.write(&batch).await?;
395 Ok(state)
396 }
397 },
398 )
399 .await?;
400
401 // Handle the last writer
402 let file = final_state.writer.1.close().await?;
403 writer_sender
404 .try_send((final_state.writer.0, file))
405 .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
406 writer_sender.close_channel();
407
408 if final_state.bytes_written == 0 {
409 return Ok(Vec::new());
410 }
411
412 writer_reciever
413 .then(|writer| {
414 let object_store = object_store.clone();
415 let bucket = bucket.to_string();
416 async move {
417 let metadata = writer.1;
418 let size = object_store
419 .head(&writer.0.as_str().into())
420 .await
421 .map_err(|err| ArrowError::from_external_error(err.into()))?
422 .size;
423 Ok(parquet_to_datafile(
424 &(bucket + &writer.0),
425 size,
426 &metadata,
427 schema,
428 partition_fields,
429 equality_ids,
430 )?)
431 }
432 })
433 .try_collect::<Vec<_>>()
434 .await
435}
436
437/// Generates a partition path string from partition fields and their values.
438///
439/// Creates a path string in the format "field1=value1/field2=value2/..." for each
440/// partition field and its corresponding value.
441///
442/// # Arguments
443/// * `partition_fields` - List of bound partition fields defining the partitioning
444/// * `partition_values` - List of values for each partition field
445///
446/// # Returns
447/// * `Result<String, ArrowError>` - The generated partition path string
448///
449/// # Errors
450/// Returns an error if:
451/// * The partition field name cannot be processed
452/// * The partition value cannot be converted to a string
453#[inline]
454pub fn generate_partition_path(
455 partition_fields: &[BoundPartitionField<'_>],
456 partition_values: &[Value],
457) -> Result<String, ArrowError> {
458 partition_fields
459 .iter()
460 .zip(partition_values.iter())
461 .map(|(field, value)| {
462 let name = field.name().to_owned();
463 Ok(name + "=" + &value.to_string() + "/")
464 })
465 .collect::<Result<String, ArrowError>>()
466}
467
468#[instrument(skip(schema, object_store), fields(data_location))]
469/// Creates a new Arrow writer for writing record batches to a Parquet file.
470///
471/// This internal function creates a new buffered writer and configures it with
472/// appropriate Parquet compression settings.
473///
474/// # Arguments
475/// * `data_location` - Base path where data files should be written
476/// * `partition_path` - Optional partition path component
477/// * `schema` - Arrow schema for the record batches
478/// * `object_store` - Object store to write files to
479///
480/// # Returns
481/// * `Result<(String, AsyncArrowWriter<BufWriter>), ArrowError>` - The file path and configured writer
482///
483/// # Errors
484/// Returns an error if:
485/// * Random number generation fails
486/// * The writer properties cannot be configured
487/// * The Arrow writer cannot be created
488async fn create_arrow_writer(
489 data_location: &str,
490 partition_path: Option<String>,
491 schema: &arrow::datatypes::Schema,
492 object_store: Arc<dyn ObjectStore>,
493) -> Result<(String, AsyncArrowWriter<BufWriter>), ArrowError> {
494 let parquet_path = generate_file_path(data_location, partition_path);
495
496 let writer = BufWriter::new(object_store.clone(), parquet_path.clone().into());
497
498 Ok((
499 parquet_path,
500 AsyncArrowWriter::try_new(
501 writer,
502 Arc::new(schema.clone()),
503 Some(
504 WriterProperties::builder()
505 .set_compression(Compression::ZSTD(ZstdLevel::try_new(1)?))
506 .build(),
507 ),
508 )?,
509 ))
510}
511
512/// Generates a unique file path for a Parquet data file.
513///
514/// This function creates a unique file path by combining the data location, partition path,
515/// and a UUID-based filename. If no partition path is provided, it generates a random
516/// directory path using hex-encoded random bytes.
517///
518/// # Arguments
519/// * `data_location` - Base directory where data files should be stored
520/// * `partition_path` - Optional partition path component (e.g., "year=2024/month=01/")
521///
522/// # Returns
523/// * `String` - Complete file path ending with ".parquet"
524///
525/// # File Path Structure
526/// The generated path follows this pattern:
527/// * With partition: `{data_location}/{partition_path}{uuid}.parquet`
528/// * Without partition: `{data_location}/{random_hex}/{uuid}.parquet`
529///
530/// # Examples
531/// ```
532/// use iceberg_rust::arrow::write::generate_file_path;
533///
534/// // With partition path
535/// let path1 = generate_file_path("/data", Some("year=2024/month=01/".to_string()));
536/// // Result: "/data/year=2024/month=01/01234567-89ab-cdef-0123-456789abcdef.parquet"
537///
538/// // Without partition path (generates random directory)
539/// let path2 = generate_file_path("/data", None);
540/// // Result: "/data/a1b/01234567-89ab-cdef-0123-456789abcdef.parquet"
541/// ```
542///
543/// # Implementation Details
544/// * Uses cryptographically secure random bytes for UUID generation
545/// * Creates a UUID v1 timestamp-based identifier for uniqueness
546/// * Random directory names use 3 bytes of entropy (6 hex characters)
547/// * Automatically strips path prefixes using `strip_prefix()`
548pub fn generate_file_path(data_location: &str, partition_path: Option<String>) -> String {
549 let mut rand = [0u8; 6];
550 getrandom::fill(&mut rand)
551 .map_err(|err| ArrowError::ExternalError(Box::new(err)))
552 .unwrap();
553
554 let path = partition_path.unwrap_or_else(|| {
555 rand[0..3]
556 .iter()
557 .fold(String::with_capacity(8), |mut acc, x| {
558 write!(&mut acc, "{x:x}").unwrap();
559 acc
560 })
561 + "/"
562 });
563
564 strip_prefix(data_location) + &path + &Uuid::now_v1(&rand).to_string() + ".parquet"
565}
566
567/// Calculates the approximate size in bytes of an Arrow record batch.
568///
569/// This function estimates the memory footprint of a record batch by multiplying
570/// the total size of all fields by the number of rows.
571///
572/// # Arguments
573/// * `batch` - The record batch to calculate size for
574///
575/// # Returns
576/// * `usize` - Estimated size of the record batch in bytes
577#[inline]
578fn record_batch_size(batch: &RecordBatch) -> usize {
579 batch
580 .schema()
581 .fields()
582 .iter()
583 .fold(0, |acc, x| acc + x.size())
584 * batch.num_rows()
585}
586
587#[cfg(test)]
588mod tests {
589 use iceberg_rust_spec::{
590 partition::BoundPartitionField,
591 types::{StructField, Type},
592 };
593
594 use crate::spec::{
595 partition::{PartitionField, Transform},
596 values::Value,
597 };
598
599 #[test]
600 fn test_generate_partition_location_success() {
601 let field = StructField {
602 id: 0,
603 name: "date".to_owned(),
604 required: false,
605 field_type: Type::Primitive(iceberg_rust_spec::types::PrimitiveType::Date),
606 doc: None,
607 };
608 let partfield = PartitionField::new(1, 1001, "month", Transform::Month);
609 let partition_fields = vec![BoundPartitionField::new(&partfield, &field)];
610 let partition_values = vec![Value::Int(10)];
611
612 let result = super::generate_partition_path(&partition_fields, &partition_values);
613
614 assert!(result.is_ok());
615 assert_eq!(result.unwrap(), "month=10/");
616 }
617}