1use std::any::Any;
2use std::sync::LazyLock;
3
4use arcref::ArcRef;
5use vortex_dtype::DType;
6use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
7
8use crate::arrow::{Datum, from_arrow_array_with_len};
9use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
10use crate::vtable::VTable;
11use crate::{Array, ArrayRef};
12
13pub fn like(
19    array: &dyn Array,
20    pattern: &dyn Array,
21    options: LikeOptions,
22) -> VortexResult<ArrayRef> {
23    LIKE_FN
24        .invoke(&InvocationArgs {
25            inputs: &[array.into(), pattern.into()],
26            options: &options,
27        })?
28        .unwrap_array()
29}
30
31pub struct LikeKernelRef(ArcRef<dyn Kernel>);
32inventory::collect!(LikeKernelRef);
33
34pub trait LikeKernel: VTable {
35    fn like(
36        &self,
37        array: &Self::Array,
38        pattern: &dyn Array,
39        options: LikeOptions,
40    ) -> VortexResult<Option<ArrayRef>>;
41}
42
43#[derive(Debug)]
44pub struct LikeKernelAdapter<V: VTable>(pub V);
45
46impl<V: VTable + LikeKernel> LikeKernelAdapter<V> {
47    pub const fn lift(&'static self) -> LikeKernelRef {
48        LikeKernelRef(ArcRef::new_ref(self))
49    }
50}
51
52impl<V: VTable + LikeKernel> Kernel for LikeKernelAdapter<V> {
53    fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
54        let inputs = LikeArgs::try_from(args)?;
55        let Some(array) = inputs.array.as_opt::<V>() else {
56            return Ok(None);
57        };
58        Ok(V::like(&self.0, array, inputs.pattern, inputs.options)?.map(|array| array.into()))
59    }
60}
61
62pub static LIKE_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
63    let compute = ComputeFn::new("like".into(), ArcRef::new_ref(&Like));
64    for kernel in inventory::iter::<LikeKernelRef> {
65        compute.register_kernel(kernel.0.clone());
66    }
67    compute
68});
69
70struct Like;
71
72impl ComputeFnVTable for Like {
73    fn invoke(
74        &self,
75        args: &InvocationArgs,
76        kernels: &[ArcRef<dyn Kernel>],
77    ) -> VortexResult<Output> {
78        let LikeArgs {
79            array,
80            pattern,
81            options,
82        } = LikeArgs::try_from(args)?;
83
84        for kernel in kernels {
85            if let Some(output) = kernel.invoke(args)? {
86                return Ok(output);
87            }
88        }
89        if let Some(output) = array.invoke(&LIKE_FN, args)? {
90            return Ok(output);
91        }
92
93        Ok(arrow_like(array, pattern, options)?.into())
95    }
96
97    fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
98        let LikeArgs { array, pattern, .. } = LikeArgs::try_from(args)?;
99        if !matches!(array.dtype(), DType::Utf8(..)) {
100            vortex_bail!("Expected utf8 array, got {}", array.dtype());
101        }
102        if !matches!(pattern.dtype(), DType::Utf8(..)) {
103            vortex_bail!("Expected utf8 pattern, got {}", array.dtype());
104        }
105        let nullability = array.dtype().is_nullable() || pattern.dtype().is_nullable();
106        Ok(DType::Bool(nullability.into()))
107    }
108
109    fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
110        let LikeArgs { array, pattern, .. } = LikeArgs::try_from(args)?;
111        if array.len() != pattern.len() {
112            vortex_bail!(
113                "Length mismatch lhs len {} ({}) != rhs len {} ({})",
114                array.len(),
115                array.encoding_id(),
116                pattern.len(),
117                pattern.encoding_id()
118            );
119        }
120        Ok(array.len())
121    }
122
123    fn is_elementwise(&self) -> bool {
124        true
125    }
126}
127
128#[derive(Default, Debug, Clone, Copy)]
130pub struct LikeOptions {
131    pub negated: bool,
132    pub case_insensitive: bool,
133}
134
135impl Options for LikeOptions {
136    fn as_any(&self) -> &dyn Any {
137        self
138    }
139}
140
141struct LikeArgs<'a> {
142    array: &'a dyn Array,
143    pattern: &'a dyn Array,
144    options: LikeOptions,
145}
146
147impl<'a> TryFrom<&InvocationArgs<'a>> for LikeArgs<'a> {
148    type Error = VortexError;
149
150    fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
151        if value.inputs.len() != 2 {
152            vortex_bail!("Expected 2 inputs, found {}", value.inputs.len());
153        }
154        let array = value.inputs[0]
155            .array()
156            .ok_or_else(|| vortex_err!("Expected first input to be an array"))?;
157        let pattern = value.inputs[1]
158            .array()
159            .ok_or_else(|| vortex_err!("Expected second input to be an array"))?;
160        let options = *value
161            .options
162            .as_any()
163            .downcast_ref::<LikeOptions>()
164            .vortex_expect("Expected options to be LikeOptions");
165
166        Ok(LikeArgs {
167            array,
168            pattern,
169            options,
170        })
171    }
172}
173
174pub(crate) fn arrow_like(
176    array: &dyn Array,
177    pattern: &dyn Array,
178    options: LikeOptions,
179) -> VortexResult<ArrayRef> {
180    let nullable = array.dtype().is_nullable() | pattern.dtype().is_nullable();
181    let len = array.len();
182    assert_eq!(
183        array.len(),
184        pattern.len(),
185        "Arrow Like: length mismatch for {}",
186        array.encoding_id()
187    );
188    let lhs = Datum::try_new(array)?;
189    let rhs = Datum::try_new(pattern)?;
190
191    let result = match (options.negated, options.case_insensitive) {
192        (false, false) => arrow_string::like::like(&lhs, &rhs)?,
193        (true, false) => arrow_string::like::nlike(&lhs, &rhs)?,
194        (false, true) => arrow_string::like::ilike(&lhs, &rhs)?,
195        (true, true) => arrow_string::like::nilike(&lhs, &rhs)?,
196    };
197
198    from_arrow_array_with_len(&result, len, nullable)
199}