datafusion_comet_spark_expr/nondetermenistic_funcs/
monotonically_increasing_id.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Int64Array, RecordBatch};
19use arrow::datatypes::{DataType, Schema};
20use datafusion::common::Result;
21use datafusion::logical_expr::ColumnarValue;
22use datafusion::physical_expr::PhysicalExpr;
23use std::any::Any;
24use std::fmt::{Debug, Display, Formatter};
25use std::hash::{Hash, Hasher};
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28
29#[derive(Debug)]
30pub struct MonotonicallyIncreasingId {
31    initial_offset: i64,
32    current_offset: AtomicI64,
33}
34
35impl MonotonicallyIncreasingId {
36    pub fn from_offset(offset: i64) -> Self {
37        Self {
38            initial_offset: offset,
39            current_offset: AtomicI64::new(offset),
40        }
41    }
42
43    pub fn from_partition_id(partition: i32) -> Self {
44        Self::from_offset((partition as i64) << 33)
45    }
46}
47
48impl Display for MonotonicallyIncreasingId {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        write!(f, "monotonically_increasing_id()")
51    }
52}
53
54impl PartialEq for MonotonicallyIncreasingId {
55    fn eq(&self, other: &Self) -> bool {
56        self.initial_offset == other.initial_offset
57    }
58}
59
60impl Eq for MonotonicallyIncreasingId {}
61
62impl Hash for MonotonicallyIncreasingId {
63    fn hash<H: Hasher>(&self, state: &mut H) {
64        self.initial_offset.hash(state);
65    }
66}
67
68impl PhysicalExpr for MonotonicallyIncreasingId {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
74        let start = self
75            .current_offset
76            .fetch_add(batch.num_rows() as i64, Ordering::Relaxed);
77        let end = start + batch.num_rows() as i64;
78        let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
79        Ok(ColumnarValue::Array(array_ref))
80    }
81
82    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
83        vec![]
84    }
85
86    fn with_new_children(
87        self: Arc<Self>,
88        _: Vec<Arc<dyn PhysicalExpr>>,
89    ) -> Result<Arc<dyn PhysicalExpr>> {
90        Ok(self)
91    }
92
93    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
94        unimplemented!()
95    }
96
97    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
98        Ok(DataType::Int64)
99    }
100
101    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
102        Ok(false)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use arrow::array::{Array, Int64Array};
110    use arrow::compute::concat;
111    use arrow::{array::StringArray, datatypes::*};
112    use datafusion::common::cast::as_int64_array;
113
114    #[test]
115    fn test_monotonically_increasing_id_single_batch() -> Result<()> {
116        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
117        let data = StringArray::from(vec![Some("foo"), None, None, Some("bar"), None]);
118        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
119        let mid_expr = MonotonicallyIncreasingId::from_offset(0);
120        let result = mid_expr.evaluate(&batch)?.into_array(batch.num_rows())?;
121        let result = as_int64_array(&result)?;
122        let expected = &Int64Array::from_iter_values(0..batch.num_rows() as i64);
123        assert_eq!(result, expected);
124        Ok(())
125    }
126
127    #[test]
128    fn test_monotonically_increasing_id_multi_batch() -> Result<()> {
129        let first_batch_schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
130        let first_batch_data = Int64Array::from(vec![Some(42), None]);
131        let second_batch_schema = first_batch_schema.clone();
132        let second_batch_data = Int64Array::from(vec![None, Some(-42), None]);
133        let starting_offset: i64 = 100;
134        let mid_expr = MonotonicallyIncreasingId::from_offset(starting_offset);
135        let first_batch = RecordBatch::try_new(
136            Arc::new(first_batch_schema),
137            vec![Arc::new(first_batch_data)],
138        )?;
139        let first_batch_result = mid_expr
140            .evaluate(&first_batch)?
141            .into_array(first_batch.num_rows())?;
142        let second_batch = RecordBatch::try_new(
143            Arc::new(second_batch_schema),
144            vec![Arc::new(second_batch_data)],
145        )?;
146        let second_batch_result = mid_expr
147            .evaluate(&second_batch)?
148            .into_array(second_batch.num_rows())?;
149        let result_arrays: Vec<&dyn Array> = vec![
150            as_int64_array(&first_batch_result)?,
151            as_int64_array(&second_batch_result)?,
152        ];
153        let result_arrays = &concat(&result_arrays)?;
154        let final_result = as_int64_array(result_arrays)?;
155        let range_start = starting_offset;
156        let range_end =
157            starting_offset + first_batch.num_rows() as i64 + second_batch.num_rows() as i64;
158        let expected = &Int64Array::from_iter_values(range_start..range_end);
159        assert_eq!(final_result, expected);
160        Ok(())
161    }
162}