datafusion_comet_spark_expr/nondetermenistic_funcs/
monotonically_increasing_id.rs1use arrow::array::{Int64Array, RecordBatch};
19use arrow::datatypes::{DataType, Schema};
20use datafusion::common::Result;
21use datafusion::logical_expr::ColumnarValue;
22use datafusion::physical_expr::PhysicalExpr;
23use std::any::Any;
24use std::fmt::{Debug, Display, Formatter};
25use std::hash::{Hash, Hasher};
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28
29#[derive(Debug)]
30pub struct MonotonicallyIncreasingId {
31 initial_offset: i64,
32 current_offset: AtomicI64,
33}
34
35impl MonotonicallyIncreasingId {
36 pub fn from_offset(offset: i64) -> Self {
37 Self {
38 initial_offset: offset,
39 current_offset: AtomicI64::new(offset),
40 }
41 }
42
43 pub fn from_partition_id(partition: i32) -> Self {
44 Self::from_offset((partition as i64) << 33)
45 }
46}
47
48impl Display for MonotonicallyIncreasingId {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 write!(f, "monotonically_increasing_id()")
51 }
52}
53
54impl PartialEq for MonotonicallyIncreasingId {
55 fn eq(&self, other: &Self) -> bool {
56 self.initial_offset == other.initial_offset
57 }
58}
59
60impl Eq for MonotonicallyIncreasingId {}
61
62impl Hash for MonotonicallyIncreasingId {
63 fn hash<H: Hasher>(&self, state: &mut H) {
64 self.initial_offset.hash(state);
65 }
66}
67
68impl PhysicalExpr for MonotonicallyIncreasingId {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
74 let start = self
75 .current_offset
76 .fetch_add(batch.num_rows() as i64, Ordering::Relaxed);
77 let end = start + batch.num_rows() as i64;
78 let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
79 Ok(ColumnarValue::Array(array_ref))
80 }
81
82 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
83 vec![]
84 }
85
86 fn with_new_children(
87 self: Arc<Self>,
88 _: Vec<Arc<dyn PhysicalExpr>>,
89 ) -> Result<Arc<dyn PhysicalExpr>> {
90 Ok(self)
91 }
92
93 fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
94 unimplemented!()
95 }
96
97 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
98 Ok(DataType::Int64)
99 }
100
101 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
102 Ok(false)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use arrow::array::{Array, Int64Array};
110 use arrow::compute::concat;
111 use arrow::{array::StringArray, datatypes::*};
112 use datafusion::common::cast::as_int64_array;
113
114 #[test]
115 fn test_monotonically_increasing_id_single_batch() -> Result<()> {
116 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
117 let data = StringArray::from(vec![Some("foo"), None, None, Some("bar"), None]);
118 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
119 let mid_expr = MonotonicallyIncreasingId::from_offset(0);
120 let result = mid_expr.evaluate(&batch)?.into_array(batch.num_rows())?;
121 let result = as_int64_array(&result)?;
122 let expected = &Int64Array::from_iter_values(0..batch.num_rows() as i64);
123 assert_eq!(result, expected);
124 Ok(())
125 }
126
127 #[test]
128 fn test_monotonically_increasing_id_multi_batch() -> Result<()> {
129 let first_batch_schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
130 let first_batch_data = Int64Array::from(vec![Some(42), None]);
131 let second_batch_schema = first_batch_schema.clone();
132 let second_batch_data = Int64Array::from(vec![None, Some(-42), None]);
133 let starting_offset: i64 = 100;
134 let mid_expr = MonotonicallyIncreasingId::from_offset(starting_offset);
135 let first_batch = RecordBatch::try_new(
136 Arc::new(first_batch_schema),
137 vec![Arc::new(first_batch_data)],
138 )?;
139 let first_batch_result = mid_expr
140 .evaluate(&first_batch)?
141 .into_array(first_batch.num_rows())?;
142 let second_batch = RecordBatch::try_new(
143 Arc::new(second_batch_schema),
144 vec![Arc::new(second_batch_data)],
145 )?;
146 let second_batch_result = mid_expr
147 .evaluate(&second_batch)?
148 .into_array(second_batch.num_rows())?;
149 let result_arrays: Vec<&dyn Array> = vec![
150 as_int64_array(&first_batch_result)?,
151 as_int64_array(&second_batch_result)?,
152 ];
153 let result_arrays = &concat(&result_arrays)?;
154 let final_result = as_int64_array(result_arrays)?;
155 let range_start = starting_offset;
156 let range_end =
157 starting_offset + first_batch.num_rows() as i64 + second_batch.num_rows() as i64;
158 let expected = &Int64Array::from_iter_values(range_start..range_end);
159 assert_eq!(final_result, expected);
160 Ok(())
161 }
162}