1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Iceberg writer module.
//!
//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
//! or other function (Such as partition writer, delta writer).
//!
//! The IcebergWriter will use the inner FileWriter to write physical files.
//!
//! The writer interface is designed to be extensible and flexible. Writers can be independently configured
//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`,
//! you can build a writer that automatically partitions the data and writes it in the Parquet format.
//!
//! For this purpose, there are four trait corresponding to these writer:
//! - IcebergWriterBuilder
//! - IcebergWriter
//! - FileWriterBuilder
//! - FileWriter
//!
//! Users can create specific writer builders, combine them, and build the final writer.
//! They can also define custom writers by implementing the `Writer` trait,
//! allowing seamless integration with existing writers. (See the example below.)
//!
//! # Simple example for the data file writer used parquet physical format:
//! ```rust, no_run
//! use std::collections::HashMap;
//! use std::sync::Arc;
//!
//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
//! use async_trait::async_trait;
//! use iceberg::io::{FileIO, FileIOBuilder};
//! use iceberg::spec::DataFile;
//! use iceberg::transaction::Transaction;
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
//! DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
//! use parquet::file::properties::WriterProperties;
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Connect to a catalog.
//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
//! use iceberg::writer::file_writer::rolling_writer::{
//! RollingFileWriter, RollingFileWriterBuilder,
//! };
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
//! HashMap::from([(
//! MEMORY_CATALOG_WAREHOUSE.to_string(),
//! "file:///path/to/warehouse".to_string(),
//! )]),
//! )
//! .await?;
//! // Add customized code to create a table first.
//!
//! // Load table from catalog.
//! let table = catalog
//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
//! .await?;
//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! let file_name_generator = DefaultFileNameGenerator::new(
//! "test".to_string(),
//! None,
//! iceberg::spec::DataFileFormat::Parquet,
//! );
//!
//! // Create a parquet file writer builder. The parameter can get from table.
//! let parquet_writer_builder = ParquetWriterBuilder::new(
//! WriterProperties::default(),
//! table.metadata().current_schema().clone(),
//! );
//!
//! // Create a rolling file writer using parquet file writer builder.
//! let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
//! parquet_writer_builder,
//! table.file_io().clone(),
//! location_generator.clone(),
//! file_name_generator.clone(),
//! );
//!
//! // Create a data file writer using parquet file writer builder.
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
//! // Build the data file writer
//! let mut data_file_writer = data_file_writer_builder.build(None).await?;
//!
//! // Write the data using data_file_writer...
//!
//! // Close the write and it will return data files back
//! let data_files = data_file_writer.close().await.unwrap();
//!
//! Ok(())
//! }
//! ```
//!
//! # Custom writer to record latency
//! ```rust, no_run
//! use std::collections::HashMap;
//! use std::time::Instant;
//!
//! use arrow_array::RecordBatch;
//! use iceberg::io::FileIOBuilder;
//! use iceberg::memory::MemoryCatalogBuilder;
//! use iceberg::spec::{DataFile, PartitionKey};
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
//! DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
//! use parquet::file::properties::WriterProperties;
//!
//! #[derive(Clone)]
//! struct LatencyRecordWriterBuilder<B> {
//! inner_writer_builder: B,
//! }
//!
//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
//! pub fn new(inner_writer_builder: B) -> Self {
//! Self {
//! inner_writer_builder,
//! }
//! }
//! }
//!
//! #[async_trait::async_trait]
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
//! type R = LatencyRecordWriter<B::R>;
//!
//! async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
//! Ok(LatencyRecordWriter {
//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
//! })
//! }
//! }
//! struct LatencyRecordWriter<W> {
//! inner_writer: W,
//! }
//!
//! #[async_trait::async_trait]
//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
//! async fn write(&mut self, input: RecordBatch) -> Result<()> {
//! let start = Instant::now();
//! self.inner_writer.write(input).await?;
//! let _latency = start.elapsed();
//! // record latency...
//! Ok(())
//! }
//!
//! async fn close(&mut self) -> Result<Vec<DataFile>> {
//! let start = Instant::now();
//! let res = self.inner_writer.close().await?;
//! let _latency = start.elapsed();
//! // record latency...
//! Ok(res)
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Connect to a catalog.
//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
//! use iceberg::writer::file_writer::rolling_writer::{
//! RollingFileWriter, RollingFileWriterBuilder,
//! };
//!
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
//! HashMap::from([(
//! MEMORY_CATALOG_WAREHOUSE.to_string(),
//! "file:///path/to/warehouse".to_string(),
//! )]),
//! )
//! .await?;
//!
//! // Add customized code to create a table first.
//!
//! // Load table from catalog.
//! let table = catalog
//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
//! .await?;
//! let partition_key = PartitionKey::new(
//! table.metadata().default_partition_spec().as_ref().clone(),
//! table.metadata().current_schema().clone(),
//! Struct::from_iter(vec![Some(Literal::string("Seattle"))]),
//! );
//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! let file_name_generator = DefaultFileNameGenerator::new(
//! "test".to_string(),
//! None,
//! iceberg::spec::DataFileFormat::Parquet,
//! );
//!
//! // Create a parquet file writer builder. The parameter can get from table.
//! let parquet_writer_builder = ParquetWriterBuilder::new(
//! WriterProperties::default(),
//! table.metadata().current_schema().clone(),
//! );
//!
//! // Create a rolling file writer
//! let rolling_file_writer_builder = RollingFileWriterBuilder::new(
//! parquet_writer_builder,
//! 512 * 1024 * 1024,
//! table.file_io().clone(),
//! location_generator.clone(),
//! file_name_generator.clone(),
//! );
//!
//! // Create a data file writer builder using rolling file writer.
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
//! // Create latency record writer using data file writer builder.
//! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
//! // Build the final writer
//! let mut latency_record_data_file_writer = latency_record_builder
//! .build(Some(partition_key))
//! .await
//! .unwrap();
//!
//! Ok(())
//! }
//! ```
//!
//! # Adding Partitioning to Data File Writers
//!
//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables.
//! Iceberg provides two partitioning strategies:
//!
//! ## FanoutWriter - For Unsorted Data
//!
//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers.
//! Use this when your data is not pre-sorted by partition key. Writes to different partitions
//! can happen in any order, even interleaved.
//!
//! ```rust, no_run
//! # // Same setup as the simple example above...
//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
//! # use iceberg::writer::file_writer::location_generator::{
//! # DefaultFileNameGenerator, DefaultLocationGenerator,
//! # };
//! # use parquet::file::properties::WriterProperties;
//! # use std::collections::HashMap;
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let catalog = MemoryCatalogBuilder::default()
//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
//! # .await?;
//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone());
//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator);
//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
//!
//! // Wrap the data file writer with FanoutWriter for partitioning
//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
//! use iceberg::writer::partitioning::PartitioningWriter;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
//!
//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder);
//!
//! // Create partition keys for different regions
//! let schema = table.metadata().current_schema().clone();
//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
//!
//! let partition_key_us = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("US"))]),
//! );
//!
//! let partition_key_eu = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("EU"))]),
//! );
//!
//! // Write to different partitions in any order - can interleave partition writes
//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?;
//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?;
//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK!
//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK!
//!
//! let data_files = fanout_writer.close().await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## ClusteredWriter - For Sorted Data
//!
//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains
//! only one active writer at a time, but requires input sorted by partition key.
//!
//! ```rust, no_run
//! # // Same setup as the simple example above...
//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
//! # use iceberg::writer::file_writer::location_generator::{
//! # DefaultFileNameGenerator, DefaultLocationGenerator,
//! # };
//! # use parquet::file::properties::WriterProperties;
//! # use std::collections::HashMap;
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let catalog = MemoryCatalogBuilder::default()
//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
//! # .await?;
//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone());
//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator);
//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
//!
//! // Wrap the data file writer with ClusteredWriter for sorted partitioning
//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
//! use iceberg::writer::partitioning::PartitioningWriter;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
//!
//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder);
//!
//! // Create partition keys (must write in sorted order)
//! let schema = table.metadata().current_schema().clone();
//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
//!
//! let partition_key_asia = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("ASIA"))]),
//! );
//!
//! let partition_key_eu = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("EU"))]),
//! );
//!
//! let partition_key_us = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("US"))]),
//! );
//!
//! // Write to partitions in sorted order (ASIA -> EU -> US)
//! // clustered_writer.write(partition_key_asia, batch_asia).await?;
//! // clustered_writer.write(partition_key_eu, batch_eu).await?;
//! // clustered_writer.write(partition_key_us, batch_us).await?;
//! // Writing back to ASIA would fail since data must be sorted!
//!
//! let data_files = clustered_writer.close().await?;
//!
//! Ok(())
//! }
//! ```
use RecordBatch;
use crateResult;
use crate;
type DefaultInput = RecordBatch;
type DefaultOutput = ;
/// The builder for iceberg writer.
/// The iceberg writer used to write data to iceberg table.
/// The current file status of the Iceberg writer.
/// This is implemented for writers that write a single file at a time.