1use std::sync::Arc;
15
16use arrow::{
17 array::{as_primitive_array, downcast_array, Array, ArrayRef, PrimitiveArray, StringArray},
18 buffer::ScalarBuffer,
19 compute::{binary, cast, date_part, unary, DatePart},
20 datatypes::{
21 DataType, Date32Type, Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
22 },
23 error::ArrowError,
24};
25
26use iceberg_rust_spec::{spec::partition::Transform, values::YEARS_BEFORE_UNIX_EPOCH};
27
28static MICROS_IN_HOUR: i64 = 3_600_000_000;
29static MICROS_IN_DAY: i64 = 86_400_000_000;
30
31pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRef, ArrowError> {
56 match (array.data_type(), transform) {
57 (_, Transform::Identity) => Ok(array),
58 (DataType::Date32, Transform::Day) => cast(&array, &DataType::Int32),
59 (DataType::Date32, Transform::Month) => {
60 let year = date_part(as_primitive_array::<Date32Type>(&array), DatePart::Year)?;
61 let month = date_part(as_primitive_array::<Date32Type>(&array), DatePart::Month)?;
62 Ok(Arc::new(binary::<_, _, _, Int32Type>(
63 as_primitive_array::<Int32Type>(&year),
64 as_primitive_array::<Int32Type>(&month),
65 datepart_to_months,
66 )?))
67 }
68 (DataType::Date32, Transform::Year) => Ok(Arc::new(unary::<_, _, Int32Type>(
69 as_primitive_array::<Int32Type>(&date_part(
70 as_primitive_array::<Date32Type>(&array),
71 DatePart::Year,
72 )?),
73 datepart_to_years,
74 ))),
75 (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Hour) => {
76 Ok(Arc::new(unary::<_, _, Int32Type>(
77 as_primitive_array::<Int64Type>(&cast(&array, &DataType::Int64)?),
78 micros_to_hours,
79 )) as Arc<dyn Array>)
80 }
81 (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Day) => {
82 Ok(Arc::new(unary::<_, _, Int32Type>(
83 as_primitive_array::<Int64Type>(&cast(&array, &DataType::Int64)?),
84 micros_to_days,
85 )) as Arc<dyn Array>)
86 }
87 (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Month) => {
88 let year = date_part(
89 as_primitive_array::<TimestampMicrosecondType>(&array),
90 DatePart::Year,
91 )?;
92 let month = date_part(
93 as_primitive_array::<TimestampMicrosecondType>(&array),
94 DatePart::Month,
95 )?;
96 Ok(Arc::new(binary::<_, _, _, Int32Type>(
97 as_primitive_array::<Int32Type>(&year),
98 as_primitive_array::<Int32Type>(&month),
99 datepart_to_months,
100 )?))
101 }
102 (DataType::Timestamp(TimeUnit::Microsecond, None), Transform::Year) => {
103 Ok(Arc::new(unary::<_, _, Int32Type>(
104 as_primitive_array::<Int32Type>(&date_part(
105 as_primitive_array::<TimestampMicrosecondType>(&array),
106 DatePart::Year,
107 )?),
108 datepart_to_years,
109 )))
110 }
111 (DataType::Int16, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int16Type>>::new(
112 unary(as_primitive_array::<Int16Type>(&array), |i| {
113 i - i.rem_euclid(*m as i16)
114 }),
115 )),
116 (DataType::Int32, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
117 unary(as_primitive_array::<Int32Type>(&array), |i| {
118 i - i.rem_euclid(*m as i32)
119 }),
120 )),
121 (DataType::Int64, Transform::Truncate(m)) => Ok(Arc::<PrimitiveArray<Int64Type>>::new(
122 unary(as_primitive_array::<Int64Type>(&array), |i| {
123 i - i.rem_euclid(*m as i64)
124 }),
125 )),
126 (DataType::Int32, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
127 unary(as_primitive_array::<Int32Type>(&array), |i| {
128 let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
129 (murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
130 as i32)
131 .rem_euclid(*m as i32)
132 }),
133 )),
134 (DataType::Int64, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
135 unary(as_primitive_array::<Int64Type>(&array), |i| {
136 let mut buffer = std::io::Cursor::new((i).to_le_bytes());
137 (murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
138 as i32)
139 .rem_euclid(*m as i32)
140 }),
141 )),
142 (DataType::Date32, Transform::Bucket(m)) => {
143 let temp = cast(&array, &DataType::Int32)?;
144
145 Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
146 as_primitive_array::<Int32Type>(&temp),
147 |i| {
148 let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
149 (murmur3::murmur3_32(&mut buffer, 0)
150 .expect("murmur3 hash failled for some reason") as i32)
151 .rem_euclid(*m as i32)
152 },
153 )))
154 }
155 (DataType::Time32(TimeUnit::Millisecond), Transform::Bucket(m)) => {
156 let temp = cast(&array, &DataType::Int32)?;
157
158 Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
159 as_primitive_array::<Int32Type>(&temp),
160 |i: i32| {
161 let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
162 (murmur3::murmur3_32(&mut buffer, 0)
163 .expect("murmur3 hash failled for some reason") as i32)
164 .rem_euclid(*m as i32)
165 },
166 )))
167 }
168 (DataType::Utf8, Transform::Bucket(m)) => {
169 let nulls = array.nulls();
170 let local_array: StringArray = downcast_array::<StringArray>(&array);
171
172 Ok(Arc::new(PrimitiveArray::<Int32Type>::new(
173 ScalarBuffer::from_iter(local_array.iter().map(|a| {
174 if let Some(value) = a {
175 murmur3::murmur3_32(&mut value.as_bytes(), 0)
176 .expect("murmur3 hash failled for some reason")
177 as i32
178 } else {
179 0
180 }
181 .rem_euclid(*m as i32)
182 })),
183 nulls.cloned(),
184 )))
185 }
186 _ => Err(ArrowError::ComputeError(
187 "Failed to perform transform for datatype".to_string(),
188 )),
189 }
190}
191
192#[inline]
193fn micros_to_days(a: i64) -> i32 {
194 (a / MICROS_IN_DAY) as i32
195}
196
197#[inline]
198fn micros_to_hours(a: i64) -> i32 {
199 (a / MICROS_IN_HOUR) as i32
200}
201
202#[inline]
203fn datepart_to_years(year: i32) -> i32 {
204 year - YEARS_BEFORE_UNIX_EPOCH
205}
206
207#[inline]
208fn datepart_to_months(year: i32, month: i32) -> i32 {
209 12 * (year - YEARS_BEFORE_UNIX_EPOCH) + month
210}
211
212#[cfg(test)]
213mod tests {
214
215 use super::*;
216 use arrow::array::{ArrayRef, Date32Array, TimestampMicrosecondArray};
217
218 fn create_date32_array() -> ArrayRef {
219 Arc::new(Date32Array::from(vec![
220 Some(19478), Some(19523), Some(19723), None,
224 ])) as ArrayRef
225 }
226
227 fn create_timestamp_micro_array() -> ArrayRef {
228 Arc::new(TimestampMicrosecondArray::from(vec![
229 Some(1682937000000000),
230 Some(1686840330000000),
231 Some(1704067200000000),
232 None,
233 ])) as ArrayRef
234 }
235
236 #[test]
237 fn test_identity_transform() {
238 let array = create_date32_array();
239 let result = transform_arrow(array.clone(), &Transform::Identity).unwrap();
240 assert_eq!(&array, &result);
241 }
242
243 #[test]
244 fn test_date32_day_transform() {
245 let array = create_date32_array();
246 let result = transform_arrow(array, &Transform::Day).unwrap();
247 let expected = Arc::new(arrow::array::Int32Array::from(vec![
248 Some(19478),
249 Some(19523),
250 Some(19723),
251 None,
252 ])) as ArrayRef;
253 assert_eq!(&expected, &result);
254 }
255
256 #[test]
257 fn test_date32_month_transform() {
258 let array = create_date32_array();
259 let result = transform_arrow(array, &Transform::Month).unwrap();
260 let expected = Arc::new(arrow::array::Int32Array::from(vec![
261 Some(641),
262 Some(642),
263 Some(649),
264 None,
265 ])) as ArrayRef;
266 assert_eq!(&expected, &result);
267 }
268
269 #[test]
270 fn test_date32_year_transform() {
271 let array = create_date32_array();
272 let result = transform_arrow(array, &Transform::Year).unwrap();
273 let expected = Arc::new(arrow::array::Int32Array::from(vec![
274 Some(53),
275 Some(53),
276 Some(54),
277 None,
278 ])) as ArrayRef;
279 assert_eq!(&expected, &result);
280 }
281
282 #[test]
283 fn test_timestamp_micro_hour_transform() {
284 let array = create_timestamp_micro_array();
285 let result = transform_arrow(array, &Transform::Hour).unwrap();
286 let expected = Arc::new(arrow::array::Int32Array::from(vec![
287 Some(467482),
288 Some(468566),
289 Some(473352),
290 None,
291 ])) as ArrayRef;
292 assert_eq!(&expected, &result);
293 }
294
295 #[test]
296 fn test_timestamp_micro_day_transform() {
297 let array = create_timestamp_micro_array();
298 let result = transform_arrow(array, &Transform::Day).unwrap();
299 let expected = Arc::new(arrow::array::Int32Array::from(vec![
300 Some(19478),
301 Some(19523),
302 Some(19723),
303 None,
304 ])) as ArrayRef;
305 assert_eq!(&expected, &result);
306 }
307
308 #[test]
309 fn test_timestamp_micro_month_transform() {
310 let array = create_timestamp_micro_array();
311 let result = transform_arrow(array, &Transform::Month).unwrap();
312 let expected = Arc::new(arrow::array::Int32Array::from(vec![
313 Some(641),
314 Some(642),
315 Some(649),
316 None,
317 ])) as ArrayRef;
318 assert_eq!(&expected, &result);
319 }
320
321 #[test]
322 fn test_timestamp_micro_year_transform() {
323 let array = create_timestamp_micro_array();
324 let result = transform_arrow(array, &Transform::Year).unwrap();
325 let expected = Arc::new(arrow::array::Int32Array::from(vec![
326 Some(53),
327 Some(53),
328 Some(54),
329 None,
330 ])) as ArrayRef;
331 assert_eq!(&expected, &result);
332 }
333
334 #[test]
335 fn test_int16_truncate_transform() {
336 let array = Arc::new(arrow::array::Int16Array::from(vec![
337 Some(17),
338 Some(23),
339 Some(-15),
340 Some(5),
341 None,
342 ])) as ArrayRef;
343 let result = transform_arrow(array, &Transform::Truncate(10)).unwrap();
344 let expected = Arc::new(arrow::array::Int16Array::from(vec![
345 Some(10), Some(20), Some(-20), Some(0), None,
350 ])) as ArrayRef;
351 assert_eq!(&expected, &result);
352 }
353
354 #[test]
355 fn test_int32_truncate_transform() {
356 let array = Arc::new(arrow::array::Int32Array::from(vec![
357 Some(127),
358 Some(234),
359 Some(-156),
360 Some(50),
361 None,
362 ])) as ArrayRef;
363 let result = transform_arrow(array, &Transform::Truncate(100)).unwrap();
364 let expected = Arc::new(arrow::array::Int32Array::from(vec![
365 Some(100), Some(200), Some(-200), Some(0), None,
370 ])) as ArrayRef;
371 assert_eq!(&expected, &result);
372 }
373
374 #[test]
375 fn test_int64_truncate_transform() {
376 let array = Arc::new(arrow::array::Int64Array::from(vec![
377 Some(1275),
378 Some(2348),
379 Some(-1567),
380 Some(500),
381 None,
382 ])) as ArrayRef;
383 let result = transform_arrow(array, &Transform::Truncate(1000)).unwrap();
384 let expected = Arc::new(arrow::array::Int64Array::from(vec![
385 Some(1000), Some(2000), Some(-2000), Some(0), None,
390 ])) as ArrayRef;
391 assert_eq!(&expected, &result);
392 }
393
394 #[test]
395 fn test_bucket_hash_value() {
396 let mut buffer = std::io::Cursor::new((34i32 as i64).to_le_bytes());
400 assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
401
402 let mut buffer = std::io::Cursor::new((34i64).to_le_bytes());
404 assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
405
406 let mut buffer = std::io::Cursor::new((17_486i32 as i64).to_le_bytes());
408 assert_eq!(
409 murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
410 -653330422
411 );
412
413 let mut buffer = std::io::Cursor::new((81_068_000_000i64).to_le_bytes());
415 assert_eq!(
416 murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
417 -662762989
418 );
419
420 assert_eq!(
422 murmur3::murmur3_32(&mut "iceberg".as_bytes(), 0).unwrap() as i32,
423 1210000089
424 );
425 }
426
427 #[test]
428 fn test_int32_bucket_transform() {
429 let array = Arc::new(arrow::array::Int32Array::from(vec![
430 Some(34), Some(17_486), Some(84668000), Some(-2000),
434 Some(0),
435 None,
436 ])) as ArrayRef;
437 let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
438 let expected = Arc::new(arrow::array::Int32Array::from(vec![
439 Some(2017239379i32.rem_euclid(1000)),
440 Some(578), Some(988822981i32.rem_euclid(1000)),
442 Some(964620854i32.rem_euclid(1000)),
443 Some(1669671676i32.rem_euclid(1000)),
444 None,
445 ])) as ArrayRef;
446 assert_eq!(&expected, &result);
447 }
448
449 #[test]
450 fn test_int64_bucket_transform() {
451 let array = Arc::new(arrow::array::Int64Array::from(vec![
452 Some(34), Some(17_486), Some(2000),
455 Some(-2000),
456 Some(0),
457 None,
458 ])) as ArrayRef;
459 let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
460 let expected = Arc::new(arrow::array::Int32Array::from(vec![
461 Some(2017239379i32.rem_euclid(1000)),
462 Some(578), Some(117), Some(964_620_854i32.rem_euclid(1000)),
465 Some(1669671676i32.rem_euclid(1000)),
466 None,
467 ])) as ArrayRef;
468 assert_eq!(&expected, &result);
469 }
470
471 #[test]
472 fn test_date32_bucket_transform() {
473 let array = Arc::new(arrow::array::Date32Array::from(vec![
474 Some(17_486), None,
476 ])) as ArrayRef;
477 let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
478
479 let expected = Arc::new(arrow::array::Int32Array::from(vec![
480 Some(578), None,
482 ])) as ArrayRef;
483
484 assert_eq!(&expected, &result);
485 }
486
487 #[test]
488 fn test_time32_bucket_transform() {
489 let array = Arc::new(arrow::array::Time32MillisecondArray::from(vec![
490 Some(81_068_000), None,
492 ])) as ArrayRef;
493 let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
494 let expected = Arc::new(arrow::array::Int32Array::from(vec![
495 Some(693), None,
497 ])) as ArrayRef;
498 assert_eq!(&expected, &result);
499 }
500
501 #[test]
502 fn test_utf8_bucket_transform() {
503 let array =
504 Arc::new(arrow::array::StringArray::from(vec![Some("iceberg"), None])) as ArrayRef;
505 let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
506 let expected = Arc::new(arrow::array::Int32Array::from(vec![
507 Some(1_210_000_089i32.rem_euclid(1000)),
508 None,
509 ])) as ArrayRef;
510 assert_eq!(&expected, &result);
511 }
512
513 #[test]
514 fn test_unsupported_transform() {
515 let array = Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
516 let result = transform_arrow(array, &Transform::Day);
517 assert!(result.is_err());
518 assert_eq!(
519 result.unwrap_err().to_string(),
520 "Compute error: Failed to perform transform for datatype"
521 );
522 }
523}