iceberg_rust/arrow/
transform.rs

1//! Arrow-based transform implementations for Iceberg partition transforms
2//!
3//! This module provides functionality to apply Iceberg partition transforms to Arrow arrays.
4//! It supports:
5//!
6//! * Identity transforms that pass values through unchanged
7//! * Time-based transforms (year, month, day, hour) for dates and timestamps
8//! * Efficient handling of different Arrow array types
9//! * Conversion between Arrow and Iceberg data types
10//!
11//! The transforms maintain Arrow's null value semantics and work with Arrow's
12//! columnar memory model for optimal performance.
13
14use std::sync::Arc;
15
16use arrow::{
17    array::{as_primitive_array, downcast_array, Array, ArrayRef, PrimitiveArray, StringArray},
18    buffer::ScalarBuffer,
19    compute::{binary, cast, date_part, unary, DatePart},
20    datatypes::{
21        DataType, Date32Type, Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
22    },
23    error::ArrowError,
24};
25
26use iceberg_rust_spec::{spec::partition::Transform, values::YEARS_BEFORE_UNIX_EPOCH};
27
28static MICROS_IN_HOUR: i64 = 3_600_000_000;
29static MICROS_IN_DAY: i64 = 86_400_000_000;
30
31/// Applies an Iceberg partition transform to an Arrow array
32///
33/// # Arguments
34/// * `array` - The Arrow array to transform
35/// * `transform` - The Iceberg partition transform to apply
36///
37/// # Returns
38/// * `Ok(ArrayRef)` - A new Arrow array containing the transformed values
39/// * `Err(ArrowError)` - If the transform cannot be applied to the array's data type
40///
41/// # Supported Transforms
42/// * Identity - Returns the input array unchanged
43/// * Day - Extracts day from date32 or timestamp
44/// * Month - Extracts month from date32 or timestamp
45/// * Year - Extracts year from date32 or timestamp
46/// * Hour - Extracts hour from timestamp
47/// * Int16 - Truncate value
48/// * Int32 - Truncate value
49/// * Int64 - Truncate value
50/// * Int32 - Use hash of value to repart it between bucket
51/// * Int64 - Use hash of value to repart it between bucket
52/// * Date32 - Use hash of value to repart it between bucket
53/// * Time32 - Use hash of value to repart it between bucket
54/// * Utf8 - Use hash of value to repart it between bucket
55pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRef, ArrowError> {
56    match (array.data_type(), transform) {
57        (_, Transform::Identity) => Ok(array),
58        (DataType::Date32, Transform::Day) => cast(&array, &DataType::Int32),
59        (DataType::Date32, Transform::Month) => {
60            let year = date_part(as_primitive_array::<Date32Type>(&array), DatePart::Year)?;
61            let month = date_part(as_primitive_array::<Date32Type>(&array), DatePart::Month)?;
62            Ok(Arc::new(binary::<_, _, _, Int32Type>(
63                as_primitive_array::<Int32Type>(&year),
64                as_primitive_array::<Int32Type>(&month),
65                datepart_to_months,
66            )?))
67        }
68        (DataType::Date32, Transform::Year) => Ok(Arc::new(unary::<_, _, Int32Type>(
69            as_primitive_array::<Int32Type>(&date_part(
70                as_primitive_array::<Date32Type>(&array),
71                DatePart::Year,
72            )?),
73            datepart_to_years,
74        ))),
75        (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Hour) => {
76            Ok(Arc::new(unary::<_, _, Int32Type>(
77                as_primitive_array::<Int64Type>(&cast(&array, &DataType::Int64)?),
78                micros_to_hours,
79            )) as Arc<dyn Array>)
80        }
81        (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Day) => {
82            Ok(Arc::new(unary::<_, _, Int32Type>(
83                as_primitive_array::<Int64Type>(&cast(&array, &DataType::Int64)?),
84                micros_to_days,
85            )) as Arc<dyn Array>)
86        }
87        (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Month) => {
88            let year = date_part(
89                as_primitive_array::<TimestampMicrosecondType>(&array),
90                DatePart::Year,
91            )?;
92            let month = date_part(
93                as_primitive_array::<TimestampMicrosecondType>(&array),
94                DatePart::Month,
95            )?;
96            Ok(Arc::new(binary::<_, _, _, Int32Type>(
97                as_primitive_array::<Int32Type>(&year),
98                as_primitive_array::<Int32Type>(&month),
99                datepart_to_months,
100            )?))
101        }
102        (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Year) => {
103            Ok(Arc::new(unary::<_, _, Int32Type>(
104                as_primitive_array::<Int32Type>(&date_part(
105                    as_primitive_array::<TimestampMicrosecondType>(&array),
106                    DatePart::Year,
107                )?),
108                datepart_to_years,
109            )))
110        }
111        (DataType::Int16, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int16Type>>::new(
112            unary(as_primitive_array::<Int16Type>(&array), |i| {
113                i - i.rem_euclid(*m as i16)
114            }),
115        )),
116        (DataType::Int32, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
117            unary(as_primitive_array::<Int32Type>(&array), |i| {
118                i - i.rem_euclid(*m as i32)
119            }),
120        )),
121        (DataType::Int64, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int64Type>>::new(
122            unary(as_primitive_array::<Int64Type>(&array), |i| {
123                i - i.rem_euclid(*m as i64)
124            }),
125        )),
126        (DataType::Int32, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
127            unary(as_primitive_array::<Int32Type>(&array), |i| {
128                let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
129                (murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
130                    as i32)
131                    .rem_euclid(*m as i32)
132            }),
133        )),
134        (DataType::Int64, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
135            unary(as_primitive_array::<Int64Type>(&array), |i| {
136                let mut buffer = std::io::Cursor::new((i).to_le_bytes());
137                (murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
138                    as i32)
139                    .rem_euclid(*m as i32)
140            }),
141        )),
142        (DataType::Date32, Transform::Bucket(m)) => {
143            let temp = cast(&array, &DataType::Int32)?;
144
145            Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
146                as_primitive_array::<Int32Type>(&temp),
147                |i| {
148                    let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
149                    (murmur3::murmur3_32(&mut buffer, 0)
150                        .expect("murmur3 hash failled for some reason") as i32)
151                        .rem_euclid(*m as i32)
152                },
153            )))
154        }
155        (DataType::Time32(TimeUnit::Millisecond), Transform::Bucket(m)) => {
156            let temp = cast(&array, &DataType::Int32)?;
157
158            Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
159                as_primitive_array::<Int32Type>(&temp),
160                |i: i32| {
161                    let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
162                    (murmur3::murmur3_32(&mut buffer, 0)
163                        .expect("murmur3 hash failled for some reason") as i32)
164                        .rem_euclid(*m as i32)
165                },
166            )))
167        }
168        (DataType::Utf8, Transform::Bucket(m)) => {
169            let nulls = array.nulls();
170            let local_array: StringArray = downcast_array::<StringArray>(&array);
171
172            Ok(Arc::new(PrimitiveArray::<Int32Type>::new(
173                ScalarBuffer::from_iter(local_array.iter().map(|a| {
174                    if let Some(value) = a {
175                        murmur3::murmur3_32(&mut value.as_bytes(), 0)
176                            .expect("murmur3 hash failled for some reason")
177                            as i32
178                    } else {
179                        0
180                    }
181                    .rem_euclid(*m as i32)
182                })),
183                nulls.cloned(),
184            )))
185        }
186        _ => Err(ArrowError::ComputeError(
187            "Failed to perform transform for datatype".to_string(),
188        )),
189    }
190}
191
192#[inline]
193fn micros_to_days(a: i64) -> i32 {
194    (a / MICROS_IN_DAY) as i32
195}
196
197#[inline]
198fn micros_to_hours(a: i64) -> i32 {
199    (a / MICROS_IN_HOUR) as i32
200}
201
202#[inline]
203fn datepart_to_years(year: i32) -> i32 {
204    year - YEARS_BEFORE_UNIX_EPOCH
205}
206
207#[inline]
208fn datepart_to_months(year: i32, month: i32) -> i32 {
209    12 * (year - YEARS_BEFORE_UNIX_EPOCH) + month
210}
211
212#[cfg(test)]
213mod tests {
214
215    use super::*;
216    use arrow::array::{ArrayRef, Date32Array, TimestampMicrosecondArray};
217
218    fn create_date32_array() -> ArrayRef {
219        Arc::new(Date32Array::from(vec![
220            Some(19478), // 2023-05-01
221            Some(19523), // 2023-06-15
222            Some(19723), // 2024-01-01
223            None,
224        ])) as ArrayRef
225    }
226
227    fn create_timestamp_micro_array() -> ArrayRef {
228        Arc::new(TimestampMicrosecondArray::from(vec![
229            Some(1682937000000000),
230            Some(1686840330000000),
231            Some(1704067200000000),
232            None,
233        ])) as ArrayRef
234    }
235
236    #[test]
237    fn test_identity_transform() {
238        let array = create_date32_array();
239        let result = transform_arrow(array.clone(), &Transform::Identity).unwrap();
240        assert_eq!(&array, &result);
241    }
242
243    #[test]
244    fn test_date32_day_transform() {
245        let array = create_date32_array();
246        let result = transform_arrow(array, &Transform::Day).unwrap();
247        let expected = Arc::new(arrow::array::Int32Array::from(vec![
248            Some(19478),
249            Some(19523),
250            Some(19723),
251            None,
252        ])) as ArrayRef;
253        assert_eq!(&expected, &result);
254    }
255
256    #[test]
257    fn test_date32_month_transform() {
258        let array = create_date32_array();
259        let result = transform_arrow(array, &Transform::Month).unwrap();
260        let expected = Arc::new(arrow::array::Int32Array::from(vec![
261            Some(641),
262            Some(642),
263            Some(649),
264            None,
265        ])) as ArrayRef;
266        assert_eq!(&expected, &result);
267    }
268
269    #[test]
270    fn test_date32_year_transform() {
271        let array = create_date32_array();
272        let result = transform_arrow(array, &Transform::Year).unwrap();
273        let expected = Arc::new(arrow::array::Int32Array::from(vec![
274            Some(53),
275            Some(53),
276            Some(54),
277            None,
278        ])) as ArrayRef;
279        assert_eq!(&expected, &result);
280    }
281
282    #[test]
283    fn test_timestamp_micro_hour_transform() {
284        let array = create_timestamp_micro_array();
285        let result = transform_arrow(array, &Transform::Hour).unwrap();
286        let expected = Arc::new(arrow::array::Int32Array::from(vec![
287            Some(467482),
288            Some(468566),
289            Some(473352),
290            None,
291        ])) as ArrayRef;
292        assert_eq!(&expected, &result);
293    }
294
295    #[test]
296    fn test_timestamp_micro_day_transform() {
297        let array = create_timestamp_micro_array();
298        let result = transform_arrow(array, &Transform::Day).unwrap();
299        let expected = Arc::new(arrow::array::Int32Array::from(vec![
300            Some(19478),
301            Some(19523),
302            Some(19723),
303            None,
304        ])) as ArrayRef;
305        assert_eq!(&expected, &result);
306    }
307
308    #[test]
309    fn test_timestamp_micro_month_transform() {
310        let array = create_timestamp_micro_array();
311        let result = transform_arrow(array, &Transform::Month).unwrap();
312        let expected = Arc::new(arrow::array::Int32Array::from(vec![
313            Some(641),
314            Some(642),
315            Some(649),
316            None,
317        ])) as ArrayRef;
318        assert_eq!(&expected, &result);
319    }
320
321    #[test]
322    fn test_timestamp_micro_year_transform() {
323        let array = create_timestamp_micro_array();
324        let result = transform_arrow(array, &Transform::Year).unwrap();
325        let expected = Arc::new(arrow::array::Int32Array::from(vec![
326            Some(53),
327            Some(53),
328            Some(54),
329            None,
330        ])) as ArrayRef;
331        assert_eq!(&expected, &result);
332    }
333
334    #[test]
335    fn test_int16_truncate_transform() {
336        let array = Arc::new(arrow::array::Int16Array::from(vec![
337            Some(17),
338            Some(23),
339            Some(-15),
340            Some(5),
341            None,
342        ])) as ArrayRef;
343        let result = transform_arrow(array, &Transform::Truncate(10)).unwrap();
344        let expected = Arc::new(arrow::array::Int16Array::from(vec![
345            Some(10),  // 17 - 17 % 10 = 17 - 7 = 10
346            Some(20),  // 23 - 23 % 10 = 23 - 3 = 20
347            Some(-20), // -15 - (-15 % 10) = -15 - (-5) = -15 + 5 = -10, but rem_euclid gives -15 - 5 = -20
348            Some(0),   // 5 - 5 % 10 = 5 - 5 = 0
349            None,
350        ])) as ArrayRef;
351        assert_eq!(&expected, &result);
352    }
353
354    #[test]
355    fn test_int32_truncate_transform() {
356        let array = Arc::new(arrow::array::Int32Array::from(vec![
357            Some(127),
358            Some(234),
359            Some(-156),
360            Some(50),
361            None,
362        ])) as ArrayRef;
363        let result = transform_arrow(array, &Transform::Truncate(100)).unwrap();
364        let expected = Arc::new(arrow::array::Int32Array::from(vec![
365            Some(100),  // 127 - 127 % 100 = 127 - 27 = 100
366            Some(200),  // 234 - 234 % 100 = 234 - 34 = 200
367            Some(-200), // -156 - (-156 % 100) = -156 - (-56) = -156 + 56 = -100, but rem_euclid gives -156 - 44 = -200
368            Some(0),    // 50 - 50 % 100 = 50 - 50 = 0
369            None,
370        ])) as ArrayRef;
371        assert_eq!(&expected, &result);
372    }
373
374    #[test]
375    fn test_int64_truncate_transform() {
376        let array = Arc::new(arrow::array::Int64Array::from(vec![
377            Some(1275),
378            Some(2348),
379            Some(-1567),
380            Some(500),
381            None,
382        ])) as ArrayRef;
383        let result = transform_arrow(array, &Transform::Truncate(1000)).unwrap();
384        let expected = Arc::new(arrow::array::Int64Array::from(vec![
385            Some(1000),  // 1275 - 1275 % 1000 = 1275 - 275 = 1000
386            Some(2000),  // 2348 - 2348 % 1000 = 2348 - 348 = 2000
387            Some(-2000), // -1567 - (-1567 % 1000) = -1567 - (-567) = -1567 + 567 = -1000, but rem_euclid gives -1567 - 433 = -2000
388            Some(0),     // 500 - 500 % 1000 = 500 - 500 = 0
389            None,
390        ])) as ArrayRef;
391        assert_eq!(&expected, &result);
392    }
393
394    #[test]
395    fn test_bucket_hash_value() {
396        // Check value match https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
397
398        // 34 -> 2017239379
399        let mut buffer = std::io::Cursor::new((34i32 as i64).to_le_bytes());
400        assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
401
402        // 34 -> 2017239379
403        let mut buffer = std::io::Cursor::new((34i64).to_le_bytes());
404        assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
405
406        // daysFromUnixEpoch(2017-11-16) -> 17_486 -> -653330422
407        let mut buffer = std::io::Cursor::new((17_486i32 as i64).to_le_bytes());
408        assert_eq!(
409            murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
410            -653330422
411        );
412
413        // 81_068_000_000 number of micros from midnight 22:31:08
414        let mut buffer = std::io::Cursor::new((81_068_000_000i64).to_le_bytes());
415        assert_eq!(
416            murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
417            -662762989
418        );
419
420        // utf8Bytes(iceberg) -> 1210000089
421        assert_eq!(
422            murmur3::murmur3_32(&mut "iceberg".as_bytes(), 0).unwrap() as i32,
423            1210000089
424        );
425    }
426
427    #[test]
428    fn test_int32_bucket_transform() {
429        let array = Arc::new(arrow::array::Int32Array::from(vec![
430            Some(34),       // Spec value
431            Some(17_486),   // number of day between 2017-11-16 and epoch
432            Some(84668000), // number of micros from midnight 22:31:08
433            Some(-2000),
434            Some(0),
435            None,
436        ])) as ArrayRef;
437        let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
438        let expected = Arc::new(arrow::array::Int32Array::from(vec![
439            Some(2017239379i32.rem_euclid(1000)),
440            Some(578), // -653330422 % 1000 not match I don't know why
441            Some(988822981i32.rem_euclid(1000)),
442            Some(964620854i32.rem_euclid(1000)),
443            Some(1669671676i32.rem_euclid(1000)),
444            None,
445        ])) as ArrayRef;
446        assert_eq!(&expected, &result);
447    }
448
449    #[test]
450    fn test_int64_bucket_transform() {
451        let array = Arc::new(arrow::array::Int64Array::from(vec![
452            Some(34),     // Spec value
453            Some(17_486), // number of day between 2017-11-16 and epoch
454            Some(2000),
455            Some(-2000),
456            Some(0),
457            None,
458        ])) as ArrayRef;
459        let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
460        let expected = Arc::new(arrow::array::Int32Array::from(vec![
461            Some(2017239379i32.rem_euclid(1000)),
462            Some(578), // -653_330_422 % 1000 not match probably like to signed number
463            Some(117), // 716_914_497 = 1000 not match probably like to signed number
464            Some(964_620_854i32.rem_euclid(1000)),
465            Some(1669671676i32.rem_euclid(1000)),
466            None,
467        ])) as ArrayRef;
468        assert_eq!(&expected, &result);
469    }
470
471    #[test]
472    fn test_date32_bucket_transform() {
473        let array = Arc::new(arrow::array::Date32Array::from(vec![
474            Some(17_486), // number of day between 2017-11-16
475            None,
476        ])) as ArrayRef;
477        let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
478
479        let expected = Arc::new(arrow::array::Int32Array::from(vec![
480            Some(578), // -653330422 % 1000 not match probably like to signed number
481            None,
482        ])) as ArrayRef;
483
484        assert_eq!(&expected, &result);
485    }
486
487    #[test]
488    fn test_time32_bucket_transform() {
489        let array = Arc::new(arrow::array::Time32MillisecondArray::from(vec![
490            Some(81_068_000), // number of micros from midnight 22:31:08
491            None,
492        ])) as ArrayRef;
493        let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
494        let expected = Arc::new(arrow::array::Int32Array::from(vec![
495            Some(693), // -662762989 % 1000 not match probably like to signed number
496            None,
497        ])) as ArrayRef;
498        assert_eq!(&expected, &result);
499    }
500
501    #[test]
502    fn test_utf8_bucket_transform() {
503        let array =
504            Arc::new(arrow::array::StringArray::from(vec![Some("iceberg"), None])) as ArrayRef;
505        let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
506        let expected = Arc::new(arrow::array::Int32Array::from(vec![
507            Some(1_210_000_089i32.rem_euclid(1000)),
508            None,
509        ])) as ArrayRef;
510        assert_eq!(&expected, &result);
511    }
512
513    #[test]
514    fn test_unsupported_transform() {
515        let array = Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
516        let result = transform_arrow(array, &Transform::Day);
517        assert!(result.is_err());
518        assert_eq!(
519            result.unwrap_err().to_string(),
520            "Compute error: Failed to perform transform for datatype"
521        );
522    }
523}