1use futures::{
36 channel::mpsc::{channel, Receiver, Sender},
37 StreamExt, TryStreamExt,
38};
39use object_store::{buffered::BufWriter, ObjectStore};
40use std::fmt::Write;
41use std::sync::Arc;
42use tokio::task::JoinSet;
43
44use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError, record_batch::RecordBatch};
45use futures::Stream;
46use iceberg_rust_spec::{
47 partition::BoundPartitionField,
48 spec::{manifest::DataFile, schema::Schema, values::Value},
49 table_metadata::{self, WRITE_DATA_PATH, WRITE_OBJECT_STORAGE_ENABLED},
50 util::strip_prefix,
51};
52use parquet::{
53 arrow::AsyncArrowWriter,
54 basic::{Compression, ZstdLevel},
55 file::properties::WriterProperties,
56 format::FileMetaData,
57};
58use uuid::Uuid;
59
60use crate::{
61 error::Error, file_format::parquet::parquet_to_datafile, object_store::Bucket, table::Table,
62};
63
64use super::partition::PartitionStream;
65
66const MAX_PARQUET_SIZE: usize = 512_000_000;
67
68#[inline]
69pub async fn write_parquet_partitioned(
90 table: &Table,
91 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
92 branch: Option<&str>,
93) -> Result<Vec<DataFile>, ArrowError> {
94 store_parquet_partitioned(table, batches, branch, None).await
95}
96
97#[inline]
98pub async fn write_equality_deletes_parquet_partitioned(
120 table: &Table,
121 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
122 branch: Option<&str>,
123 equality_ids: &[i32],
124) -> Result<Vec<DataFile>, ArrowError> {
125 store_parquet_partitioned(table, batches, branch, Some(equality_ids)).await
126}
127
128async fn store_parquet_partitioned(
150 table: &Table,
151 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static,
152 branch: Option<&str>,
153 equality_ids: Option<&[i32]>,
154) -> Result<Vec<DataFile>, ArrowError> {
155 let metadata = table.metadata();
156 let object_store = table.object_store();
157 let schema = Arc::new(
158 metadata
159 .current_schema(branch)
160 .map_err(Error::from)?
161 .clone(),
162 );
163 let schema = if let Some(equality_ids) = equality_ids {
165 Arc::new(schema.project(equality_ids))
166 } else {
167 schema
168 };
169
170 let partition_spec = Arc::new(
171 metadata
172 .default_partition_spec()
173 .map_err(Error::from)?
174 .clone(),
175 );
176
177 let partition_fields = &metadata
178 .current_partition_fields(branch)
179 .map_err(Error::from)?;
180
181 let data_location = &metadata
182 .properties
183 .get(WRITE_DATA_PATH)
184 .map(ToOwned::to_owned)
185 .unwrap_or(metadata.location.clone() + "/data/");
186
187 let arrow_schema: Arc<ArrowSchema> =
188 Arc::new((schema.fields()).try_into().map_err(Error::from)?);
189
190 if partition_fields.is_empty() {
191 let partition_path = if metadata
192 .properties
193 .get(WRITE_OBJECT_STORAGE_ENABLED)
194 .is_some_and(|x| x == "true")
195 {
196 Some("".to_owned())
197 } else {
198 None
199 };
200 let files = write_parquet_files(
201 data_location,
202 &schema,
203 &arrow_schema,
204 partition_fields,
205 partition_path,
206 batches,
207 object_store.clone(),
208 equality_ids,
209 )
210 .await?;
211 Ok(files)
212 } else {
213 let mut streams = PartitionStream::new(Box::pin(batches), partition_fields);
214
215 let mut set = JoinSet::new();
216
217 while let Some(result) = streams.next().await {
218 let (partition_values, batches) = result?;
219 set.spawn({
220 let arrow_schema = arrow_schema.clone();
221 let object_store = object_store.clone();
222 let data_location = data_location.clone();
223 let schema = schema.clone();
224 let partition_spec = partition_spec.clone();
225 let equality_ids = equality_ids.map(Vec::from);
226 let partition_path = if metadata
227 .properties
228 .get(WRITE_OBJECT_STORAGE_ENABLED)
229 .is_some_and(|x| x == "true")
230 {
231 None
232 } else {
233 Some(generate_partition_path(
234 partition_fields,
235 &partition_values,
236 )?)
237 };
238 async move {
239 let partition_fields =
240 table_metadata::partition_fields(&partition_spec, &schema)
241 .map_err(Error::from)?;
242 let files = write_parquet_files(
243 &data_location,
244 &schema,
245 &arrow_schema,
246 &partition_fields,
247 partition_path,
248 batches,
249 object_store.clone(),
250 equality_ids.as_deref(),
251 )
252 .await?;
253 Ok::<_, Error>(files)
254 }
255 });
256 }
257
258 let mut files = Vec::new();
259
260 while let Some(handle) = set.join_next().await {
261 files.extend(handle.map_err(Error::from)??);
262 }
263
264 Ok(files)
265 }
266}
267
268type ArrowSender = Sender<(String, FileMetaData)>;
269type ArrowReciever = Receiver<(String, FileMetaData)>;
270
271#[allow(clippy::too_many_arguments)]
296async fn write_parquet_files(
297 data_location: &str,
298 schema: &Schema,
299 arrow_schema: &ArrowSchema,
300 partition_fields: &[BoundPartitionField<'_>],
301 partition_path: Option<String>,
302 batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
303 object_store: Arc<dyn ObjectStore>,
304 equality_ids: Option<&[i32]>,
305) -> Result<Vec<DataFile>, ArrowError> {
306 let bucket = Bucket::from_path(data_location)?;
307 let (mut writer_sender, writer_reciever): (ArrowSender, ArrowReciever) = channel(1);
308
309 let initial_writer = create_arrow_writer(
311 data_location,
312 partition_path.clone(),
313 arrow_schema,
314 object_store.clone(),
315 )
316 .await?;
317
318 struct WriterState {
320 writer: (String, AsyncArrowWriter<BufWriter>),
321 bytes_written: usize,
322 }
323
324 let final_state = batches
325 .try_fold(
326 WriterState {
327 writer: initial_writer,
328 bytes_written: 0,
329 },
330 |mut state, batch| {
331 let object_store = object_store.clone();
332 let data_location = data_location.to_owned();
333 let partition_path = partition_path.clone();
334 let arrow_schema = arrow_schema.clone();
335 let mut writer_sender = writer_sender.clone();
336
337 async move {
338 let batch_size = record_batch_size(&batch);
339 let new_size = state.bytes_written + batch_size;
340
341 if new_size > MAX_PARQUET_SIZE {
342 let finished_writer = state.writer;
344 let file = finished_writer.1.close().await?;
345 writer_sender
346 .try_send((finished_writer.0, file))
347 .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
348
349 let new_writer = create_arrow_writer(
351 &data_location,
352 partition_path,
353 &arrow_schema,
354 object_store,
355 )
356 .await?;
357
358 state.writer = new_writer;
359 state.bytes_written = batch_size;
360 } else {
361 state.bytes_written = new_size;
362 if new_size % 64_000_000 >= 32_000_000 {
363 state.writer.1.flush().await?;
364 }
365 }
366
367 state.writer.1.write(&batch).await?;
368 Ok(state)
369 }
370 },
371 )
372 .await?;
373
374 let file = final_state.writer.1.close().await?;
376 writer_sender
377 .try_send((final_state.writer.0, file))
378 .map_err(|err| ArrowError::ComputeError(err.to_string()))?;
379 writer_sender.close_channel();
380
381 if final_state.bytes_written == 0 {
382 return Ok(Vec::new());
383 }
384
385 writer_reciever
386 .then(|writer| {
387 let object_store = object_store.clone();
388 let bucket = bucket.to_string();
389 async move {
390 let metadata = writer.1;
391 let size = object_store
392 .head(&writer.0.as_str().into())
393 .await
394 .map_err(|err| ArrowError::from_external_error(err.into()))?
395 .size;
396 Ok(parquet_to_datafile(
397 &(bucket + &writer.0),
398 size,
399 &metadata,
400 schema,
401 partition_fields,
402 equality_ids,
403 )?)
404 }
405 })
406 .try_collect::<Vec<_>>()
407 .await
408}
409
410#[inline]
427fn generate_partition_path(
428 partition_fields: &[BoundPartitionField<'_>],
429 partition_values: &[Value],
430) -> Result<String, ArrowError> {
431 partition_fields
432 .iter()
433 .zip(partition_values.iter())
434 .map(|(field, value)| {
435 let name = field.name().to_owned();
436 Ok(name + "=" + &value.to_string() + "/")
437 })
438 .collect::<Result<String, ArrowError>>()
439}
440
441async fn create_arrow_writer(
461 data_location: &str,
462 partition_path: Option<String>,
463 schema: &arrow::datatypes::Schema,
464 object_store: Arc<dyn ObjectStore>,
465) -> Result<(String, AsyncArrowWriter<BufWriter>), ArrowError> {
466 let mut rand = [0u8; 6];
467 getrandom::fill(&mut rand)
468 .map_err(|err| ArrowError::ExternalError(Box::new(err)))
469 .unwrap();
470
471 let path = partition_path.unwrap_or_else(|| {
472 rand[0..3]
473 .iter()
474 .fold(String::with_capacity(8), |mut acc, x| {
475 write!(&mut acc, "{x:x}").unwrap();
476 acc
477 })
478 + "/"
479 });
480
481 let parquet_path =
482 strip_prefix(data_location) + &path + &Uuid::now_v1(&rand).to_string() + ".parquet";
483
484 let writer = BufWriter::new(object_store.clone(), parquet_path.clone().into());
485
486 Ok((
487 parquet_path,
488 AsyncArrowWriter::try_new(
489 writer,
490 Arc::new(schema.clone()),
491 Some(
492 WriterProperties::builder()
493 .set_compression(Compression::ZSTD(ZstdLevel::try_new(1)?))
494 .build(),
495 ),
496 )?,
497 ))
498}
499
500#[inline]
511fn record_batch_size(batch: &RecordBatch) -> usize {
512 batch
513 .schema()
514 .fields()
515 .iter()
516 .fold(0, |acc, x| acc + x.size())
517 * batch.num_rows()
518}
519
520#[cfg(test)]
521mod tests {
522 use iceberg_rust_spec::{
523 partition::BoundPartitionField,
524 types::{StructField, Type},
525 };
526
527 use crate::spec::{
528 partition::{PartitionField, Transform},
529 values::Value,
530 };
531
532 #[test]
533 fn test_generate_partition_location_success() {
534 let field = StructField {
535 id: 0,
536 name: "date".to_owned(),
537 required: false,
538 field_type: Type::Primitive(iceberg_rust_spec::types::PrimitiveType::Date),
539 doc: None,
540 };
541 let partfield = PartitionField::new(1, 1001, "month", Transform::Month);
542 let partition_fields = vec![BoundPartitionField::new(&partfield, &field)];
543 let partition_values = vec![Value::Int(10)];
544
545 let result = super::generate_partition_path(&partition_fields, &partition_values);
546
547 assert!(result.is_ok());
548 assert_eq!(result.unwrap(), "month=10/");
549 }
550}