vortex_array/compute/
fill_null.rs1use std::sync::LazyLock;
5
6use arcref::ArcRef;
7use vortex_dtype::DType;
8use vortex_error::VortexError;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_err;
12use vortex_scalar::Scalar;
13
14use crate::Array;
15use crate::ArrayRef;
16use crate::IntoArray;
17use crate::arrays::ConstantArray;
18use crate::compute::ComputeFn;
19use crate::compute::ComputeFnVTable;
20use crate::compute::InvocationArgs;
21use crate::compute::Kernel;
22use crate::compute::Output;
23use crate::compute::cast;
24use crate::vtable::VTable;
25
26static FILL_NULL_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
27 let compute = ComputeFn::new("fill_null".into(), ArcRef::new_ref(&FillNull));
28 for kernel in inventory::iter::<FillNullKernelRef> {
29 compute.register_kernel(kernel.0.clone());
30 }
31 compute
32});
33
34pub(crate) fn warm_up_vtable() -> usize {
35 FILL_NULL_FN.kernels().len()
36}
37
38pub fn fill_null(array: &dyn Array, fill_value: &Scalar) -> VortexResult<ArrayRef> {
53 FILL_NULL_FN
54 .invoke(&InvocationArgs {
55 inputs: &[array.into(), fill_value.into()],
56 options: &(),
57 })?
58 .unwrap_array()
59}
60
61pub trait FillNullKernel: VTable {
62 fn fill_null(&self, array: &Self::Array, fill_value: &Scalar) -> VortexResult<ArrayRef>;
63}
64
65pub struct FillNullKernelRef(ArcRef<dyn Kernel>);
66inventory::collect!(FillNullKernelRef);
67
68#[derive(Debug)]
69pub struct FillNullKernelAdapter<V: VTable>(pub V);
70
71impl<V: VTable + FillNullKernel> FillNullKernelAdapter<V> {
72 pub const fn lift(&'static self) -> FillNullKernelRef {
73 FillNullKernelRef(ArcRef::new_ref(self))
74 }
75}
76
77impl<V: VTable + FillNullKernel> Kernel for FillNullKernelAdapter<V> {
78 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
79 let inputs = FillNullArgs::try_from(args)?;
80 let Some(array) = inputs.array.as_opt::<V>() else {
81 return Ok(None);
82 };
83 Ok(Some(
84 V::fill_null(&self.0, array, inputs.fill_value)?.into(),
85 ))
86 }
87}
88
89struct FillNull;
90
91impl ComputeFnVTable for FillNull {
92 fn invoke(
93 &self,
94 args: &InvocationArgs,
95 kernels: &[ArcRef<dyn Kernel>],
96 ) -> VortexResult<Output> {
97 let FillNullArgs { array, fill_value } = FillNullArgs::try_from(args)?;
98
99 if !array.dtype().is_nullable() || array.all_valid() {
100 return Ok(cast(array, fill_value.dtype())?.into());
101 }
102
103 if array.all_invalid() {
104 return Ok(ConstantArray::new(fill_value.clone(), array.len())
105 .into_array()
106 .into());
107 }
108
109 if fill_value.is_null() {
110 vortex_bail!("Cannot fill_null with a null value")
111 }
112
113 for kernel in kernels {
114 if let Some(output) = kernel.invoke(args)? {
115 return Ok(output);
116 }
117 }
118 if let Some(output) = array.invoke(&FILL_NULL_FN, args)? {
119 return Ok(output);
120 }
121
122 log::debug!("FillNullFn not implemented for {}", array.encoding_id());
123 if !array.is_canonical() {
124 let canonical_arr = array.to_canonical().into_array();
125 return Ok(fill_null(canonical_arr.as_ref(), fill_value)?.into());
126 }
127
128 vortex_bail!("fill null not implemented for DType {}", array.dtype())
130 }
131
132 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
133 let FillNullArgs { array, fill_value } = FillNullArgs::try_from(args)?;
134 if !array.dtype().eq_ignore_nullability(fill_value.dtype()) {
135 vortex_bail!("FillNull value must match array type (ignoring nullability)");
136 }
137 Ok(fill_value.dtype().clone())
138 }
139
140 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
141 let FillNullArgs { array, .. } = FillNullArgs::try_from(args)?;
142 Ok(array.len())
143 }
144
145 fn is_elementwise(&self) -> bool {
146 true
147 }
148}
149
150struct FillNullArgs<'a> {
151 array: &'a dyn Array,
152 fill_value: &'a Scalar,
153}
154
155impl<'a> TryFrom<&InvocationArgs<'a>> for FillNullArgs<'a> {
156 type Error = VortexError;
157
158 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
159 if value.inputs.len() != 2 {
160 vortex_bail!("FillNull requires 2 arguments");
161 }
162
163 let array = value.inputs[0]
164 .array()
165 .ok_or_else(|| vortex_err!("FillNull requires an array"))?;
166 let fill_value = value.inputs[1]
167 .scalar()
168 .ok_or_else(|| vortex_err!("FillNull requires a scalar"))?;
169
170 Ok(FillNullArgs { array, fill_value })
171 }
172}