1use std::any::Any;
2use std::sync::LazyLock;
3
4use arcref::ArcRef;
5use vortex_dtype::{DType, Nullability};
6use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
7use vortex_scalar::Scalar;
8
9use crate::Array;
10use crate::arrays::{ConstantVTable, NullVTable};
11use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
12use crate::stats::{Precision, Stat};
13use crate::vtable::VTable;
14
15pub fn is_sorted(array: &dyn Array) -> VortexResult<bool> {
16    is_sorted_opts(array, false)
17}
18
19pub fn is_strict_sorted(array: &dyn Array) -> VortexResult<bool> {
20    is_sorted_opts(array, true)
21}
22
23pub fn is_sorted_opts(array: &dyn Array, strict: bool) -> VortexResult<bool> {
24    Ok(IS_SORTED_FN
25        .invoke(&InvocationArgs {
26            inputs: &[array.into()],
27            options: &IsSortedOptions { strict },
28        })?
29        .unwrap_scalar()?
30        .as_bool()
31        .value()
32        .vortex_expect("non-nullable"))
33}
34
35struct IsSorted;
36impl ComputeFnVTable for IsSorted {
37    fn invoke(
38        &self,
39        args: &InvocationArgs,
40        kernels: &[ArcRef<dyn Kernel>],
41    ) -> VortexResult<Output> {
42        let IsSortedArgs { array, strict } = IsSortedArgs::try_from(args)?;
43
44        if array.dtype().is_struct() {
46            return Ok(Scalar::from(false).into());
47        }
48
49        let is_sorted = if strict {
50            if let Some(Precision::Exact(value)) =
51                array.statistics().get_as::<bool>(Stat::IsStrictSorted)
52            {
53                return Ok(Scalar::from(value).into());
54            }
55
56            let is_strict_sorted = is_sorted_impl(array, kernels, true)?;
57            let array_stats = array.statistics();
58
59            if is_strict_sorted {
60                array_stats.set(Stat::IsSorted, Precision::Exact(true.into()));
61                array_stats.set(Stat::IsStrictSorted, Precision::Exact(true.into()));
62            } else {
63                array_stats.set(Stat::IsStrictSorted, Precision::Exact(false.into()));
64            }
65
66            is_strict_sorted
67        } else {
68            if let Some(Precision::Exact(value)) = array.statistics().get_as::<bool>(Stat::IsSorted)
69            {
70                return Ok(Scalar::from(value).into());
71            }
72
73            let is_sorted = is_sorted_impl(array, kernels, false)?;
74            let array_stats = array.statistics();
75
76            if is_sorted {
77                array_stats.set(Stat::IsSorted, Precision::Exact(true.into()));
78            } else {
79                array_stats.set(Stat::IsSorted, Precision::Exact(false.into()));
80                array_stats.set(Stat::IsStrictSorted, Precision::Exact(false.into()));
81            }
82
83            is_sorted
84        };
85
86        Ok(Scalar::from(is_sorted).into())
87    }
88
89    fn return_dtype(&self, _args: &InvocationArgs) -> VortexResult<DType> {
90        Ok(DType::Bool(Nullability::NonNullable))
91    }
92
93    fn return_len(&self, _args: &InvocationArgs) -> VortexResult<usize> {
94        Ok(1)
95    }
96
97    fn is_elementwise(&self) -> bool {
98        true
99    }
100}
101
102struct IsSortedArgs<'a> {
103    array: &'a dyn Array,
104    strict: bool,
105}
106
107impl<'a> TryFrom<&InvocationArgs<'a>> for IsSortedArgs<'a> {
108    type Error = VortexError;
109
110    fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
111        if value.inputs.len() != 1 {
112            vortex_bail!(
113                "IsSorted function requires exactly one argument, got {}",
114                value.inputs.len()
115            );
116        }
117        let array = value.inputs[0]
118            .array()
119            .ok_or_else(|| vortex_err!("Invalid argument type for is sorted function"))?;
120        let options = *value
121            .options
122            .as_any()
123            .downcast_ref::<IsSortedOptions>()
124            .ok_or_else(|| vortex_err!("Invalid options type for is sorted function"))?;
125
126        Ok(IsSortedArgs {
127            array,
128            strict: options.strict,
129        })
130    }
131}
132
133#[derive(Clone, Copy)]
134struct IsSortedOptions {
135    strict: bool,
136}
137
138impl Options for IsSortedOptions {
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142}
143
144pub static IS_SORTED_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
145    let compute = ComputeFn::new("is_sorted".into(), ArcRef::new_ref(&IsSorted));
146    for kernel in inventory::iter::<IsSortedKernelRef> {
147        compute.register_kernel(kernel.0.clone());
148    }
149    compute
150});
151
152pub struct IsSortedKernelRef(ArcRef<dyn Kernel>);
153inventory::collect!(IsSortedKernelRef);
154
155#[derive(Debug)]
156pub struct IsSortedKernelAdapter<V: VTable>(pub V);
157
158impl<V: VTable + IsSortedKernel> IsSortedKernelAdapter<V> {
159    pub const fn lift(&'static self) -> IsSortedKernelRef {
160        IsSortedKernelRef(ArcRef::new_ref(self))
161    }
162}
163
164impl<V: VTable + IsSortedKernel> Kernel for IsSortedKernelAdapter<V> {
165    fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
166        let IsSortedArgs { array, strict } = IsSortedArgs::try_from(args)?;
167        let Some(array) = array.as_opt::<V>() else {
168            return Ok(None);
169        };
170
171        let is_sorted = if strict {
172            V::is_strict_sorted(&self.0, array)?
173        } else {
174            V::is_sorted(&self.0, array)?
175        };
176
177        Ok(Some(Scalar::from(is_sorted).into()))
178    }
179}
180
181pub trait IsSortedKernel: VTable {
182    fn is_sorted(&self, array: &Self::Array) -> VortexResult<bool>;
188
189    fn is_strict_sorted(&self, array: &Self::Array) -> VortexResult<bool>;
190}
191
192#[allow(clippy::wrong_self_convention)]
193pub trait IsSortedIteratorExt: Iterator
195where
196    <Self as Iterator>::Item: PartialOrd,
197{
198    fn is_strict_sorted(self) -> bool
199    where
200        Self: Sized,
201        Self::Item: PartialOrd,
202    {
203        self.is_sorted_by(|a, b| a < b)
204    }
205}
206
207impl<T> IsSortedIteratorExt for T
208where
209    T: Iterator + ?Sized,
210    T::Item: PartialOrd,
211{
212}
213
214fn is_sorted_impl(
215    array: &dyn Array,
216    kernels: &[ArcRef<dyn Kernel>],
217    strict: bool,
218) -> VortexResult<bool> {
219    if array.len() <= 1 {
221        return Ok(true);
222    }
223
224    if array.is::<ConstantVTable>() || array.is::<NullVTable>() {
226        return Ok(!strict);
227    }
228
229    let invalid_count = array.invalid_count()?;
230
231    if strict {
233        match invalid_count {
234            0 => {}
236            1 => {
238                if !array.is_invalid(0)? {
239                    return Ok(false);
240                }
241            }
242            _ => return Ok(false),
243        }
244    }
245
246    let args = InvocationArgs {
247        inputs: &[array.into()],
248        options: &IsSortedOptions { strict },
249    };
250
251    for kernel in kernels {
252        if let Some(output) = kernel.invoke(&args)? {
253            return Ok(output
254                .unwrap_scalar()?
255                .as_bool()
256                .value()
257                .vortex_expect("non-nullable"));
258        }
259    }
260    if let Some(output) = array.invoke(&IS_SORTED_FN, &args)? {
261        return Ok(output
262            .unwrap_scalar()?
263            .as_bool()
264            .value()
265            .vortex_expect("non-nullable"));
266    }
267
268    if !array.is_canonical() {
269        log::debug!(
270            "No is_sorted implementation found for {}",
271            array.encoding_id()
272        );
273
274        let array = array.to_canonical()?;
276
277        return if strict {
278            is_strict_sorted(array.as_ref())
279        } else {
280            is_sorted(array.as_ref())
281        };
282    }
283
284    vortex_bail!(
285        "No is_sorted function for canonical array: {}",
286        array.encoding_id(),
287    )
288}
289
290#[cfg(test)]
291mod tests {
292    use vortex_buffer::buffer;
293
294    use crate::IntoArray;
295    use crate::arrays::{BoolArray, PrimitiveArray};
296    use crate::compute::{is_sorted, is_strict_sorted};
297    use crate::validity::Validity;
298    #[test]
299    fn test_is_sorted() {
300        assert!(
301            is_sorted(PrimitiveArray::new(buffer!(0, 1, 2, 3), Validity::AllValid).as_ref())
302                .unwrap()
303        );
304        assert!(
305            is_sorted(
306                PrimitiveArray::new(
307                    buffer!(0, 1, 2, 3),
308                    Validity::Array(BoolArray::from_iter([false, true, true, true]).into_array())
309                )
310                .as_ref()
311            )
312            .unwrap()
313        );
314        assert!(
315            !is_sorted(
316                PrimitiveArray::new(
317                    buffer!(0, 1, 2, 3),
318                    Validity::Array(BoolArray::from_iter([true, false, true, true]).into_array())
319                )
320                .as_ref()
321            )
322            .unwrap()
323        );
324
325        assert!(
326            !is_sorted(PrimitiveArray::new(buffer!(0, 1, 3, 2), Validity::AllValid).as_ref())
327                .unwrap()
328        );
329        assert!(
330            !is_sorted(
331                PrimitiveArray::new(
332                    buffer!(0, 1, 3, 2),
333                    Validity::Array(BoolArray::from_iter([false, true, true, true]).into_array()),
334                )
335                .as_ref()
336            )
337            .unwrap(),
338        );
339    }
340
341    #[test]
342    fn test_is_strict_sorted() {
343        assert!(
344            is_strict_sorted(PrimitiveArray::new(buffer!(0, 1, 2, 3), Validity::AllValid).as_ref())
345                .unwrap()
346        );
347        assert!(
348            is_strict_sorted(
349                PrimitiveArray::new(
350                    buffer!(0, 1, 2, 3),
351                    Validity::Array(BoolArray::from_iter([false, true, true, true]).into_array())
352                )
353                .as_ref()
354            )
355            .unwrap()
356        );
357        assert!(
358            !is_strict_sorted(
359                PrimitiveArray::new(
360                    buffer!(0, 1, 2, 3),
361                    Validity::Array(BoolArray::from_iter([true, false, true, true]).into_array()),
362                )
363                .as_ref()
364            )
365            .unwrap(),
366        );
367
368        assert!(
369            !is_strict_sorted(
370                PrimitiveArray::new(
371                    buffer!(0, 1, 3, 2),
372                    Validity::Array(BoolArray::from_iter([false, true, true, true]).into_array()),
373                )
374                .as_ref()
375            )
376            .unwrap(),
377        );
378    }
379}