Skip to main content

vortex_array/stats/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Stats as they are stored on arrays.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_error::VortexError;
10use vortex_error::VortexResult;
11use vortex_error::vortex_panic;
12
13use super::MutTypedStatsSetRef;
14use super::StatsSet;
15use super::StatsSetIntoIter;
16use super::TypedStatsSetRef;
17use crate::DynArray;
18use crate::LEGACY_SESSION;
19use crate::VortexSessionExecute;
20use crate::aggregate_fn::fns::is_constant::is_constant;
21use crate::aggregate_fn::fns::is_sorted::is_sorted;
22use crate::aggregate_fn::fns::is_sorted::is_strict_sorted;
23use crate::aggregate_fn::fns::min_max::MinMaxResult;
24use crate::aggregate_fn::fns::min_max::min_max;
25use crate::aggregate_fn::fns::nan_count::nan_count;
26use crate::aggregate_fn::fns::sum::sum;
27use crate::builders::builder_with_capacity;
28use crate::expr::stats::Precision;
29use crate::expr::stats::Stat;
30use crate::expr::stats::StatsProvider;
31use crate::scalar::Scalar;
32use crate::scalar::ScalarValue;
33
34/// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place.
35// TODO(adamg): This is a very bad name.
36#[derive(Clone, Default, Debug)]
37pub struct ArrayStats {
38    inner: Arc<RwLock<StatsSet>>,
39}
40
41/// Reference to an array's [`StatsSet`]. Can be used to get and mutate the underlying stats.
42///
43/// Constructed by calling [`ArrayStats::to_ref`].
44pub struct StatsSetRef<'a> {
45    // We need to reference back to the array
46    dyn_array_ref: &'a dyn DynArray,
47    array_stats: &'a ArrayStats,
48}
49
50impl ArrayStats {
51    pub fn to_ref<'a>(&'a self, array: &'a dyn DynArray) -> StatsSetRef<'a> {
52        StatsSetRef {
53            dyn_array_ref: array,
54            array_stats: self,
55        }
56    }
57
58    pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
59        self.inner.write().set(stat, value);
60    }
61
62    pub fn clear(&self, stat: Stat) {
63        self.inner.write().clear(stat);
64    }
65
66    pub fn retain(&self, stats: &[Stat]) {
67        self.inner.write().retain_only(stats);
68    }
69}
70
71impl From<StatsSet> for ArrayStats {
72    fn from(value: StatsSet) -> Self {
73        Self {
74            inner: Arc::new(RwLock::new(value)),
75        }
76    }
77}
78
79impl From<ArrayStats> for StatsSet {
80    fn from(value: ArrayStats) -> Self {
81        value.inner.read().clone()
82    }
83}
84
85impl StatsSetRef<'_> {
86    pub fn set_iter(&self, iter: StatsSetIntoIter) {
87        let mut guard = self.array_stats.inner.write();
88        for (stat, value) in iter {
89            guard.set(stat, value);
90        }
91    }
92
93    pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
94        // Only inherit if the underlying stats are different
95        if !Arc::ptr_eq(&self.array_stats.inner, &stats.array_stats.inner) {
96            stats.with_iter(|iter| self.inherit(iter));
97        }
98    }
99
100    pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
101        let mut guard = self.array_stats.inner.write();
102        for (stat, value) in iter {
103            if !value.is_exact() {
104                if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
105                    guard.set(*stat, value.clone());
106                }
107            } else {
108                guard.set(*stat, value.clone());
109            }
110        }
111    }
112
113    pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
114        apply(
115            self.array_stats
116                .inner
117                .read()
118                .as_typed_ref(self.dyn_array_ref.dtype()),
119        )
120    }
121
122    pub fn with_mut_typed_stats_set<U, F: FnOnce(MutTypedStatsSetRef) -> U>(&self, apply: F) -> U {
123        apply(
124            self.array_stats
125                .inner
126                .write()
127                .as_mut_typed_ref(self.dyn_array_ref.dtype()),
128        )
129    }
130
131    pub fn to_owned(&self) -> StatsSet {
132        self.array_stats.inner.read().clone()
133    }
134
135    pub fn with_iter<
136        F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
137        R,
138    >(
139        &self,
140        f: F,
141    ) -> R {
142        let lock = self.array_stats.inner.read();
143        f(&mut lock.iter())
144    }
145
146    pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<Scalar>> {
147        let mut ctx = LEGACY_SESSION.create_execution_ctx();
148
149        // If it's already computed and exact, we can return it.
150        if let Some(Precision::Exact(s)) = self.get(stat) {
151            return Ok(Some(s));
152        }
153
154        let array_ref = self.dyn_array_ref.to_array();
155        Ok(match stat {
156            Stat::Min => min_max(&array_ref, &mut ctx)?.map(|MinMaxResult { min, max: _ }| min),
157            Stat::Max => min_max(&array_ref, &mut ctx)?.map(|MinMaxResult { min: _, max }| max),
158            Stat::Sum => {
159                Stat::Sum
160                    .dtype(self.dyn_array_ref.dtype())
161                    .is_some()
162                    .then(|| {
163                        // Sum is supported for this dtype.
164                        sum(&array_ref, &mut ctx)
165                    })
166                    .transpose()?
167            }
168            Stat::NullCount => self.dyn_array_ref.invalid_count().ok().map(Into::into),
169            Stat::IsConstant => {
170                if self.dyn_array_ref.is_empty() {
171                    None
172                } else {
173                    Some(is_constant(&array_ref, &mut ctx)?.into())
174                }
175            }
176            Stat::IsSorted => Some(is_sorted(&array_ref, &mut ctx)?.into()),
177            Stat::IsStrictSorted => Some(is_strict_sorted(&array_ref, &mut ctx)?.into()),
178            Stat::UncompressedSizeInBytes => {
179                let mut builder =
180                    builder_with_capacity(self.dyn_array_ref.dtype(), self.dyn_array_ref.len());
181                unsafe {
182                    builder.extend_from_array_unchecked(&array_ref);
183                }
184                let nbytes = builder.finish().nbytes();
185                self.set(stat, Precision::exact(nbytes));
186                Some(nbytes.into())
187            }
188            Stat::NaNCount => {
189                Stat::NaNCount
190                    .dtype(self.dyn_array_ref.dtype())
191                    .is_some()
192                    .then(|| {
193                        // NaNCount is supported for this dtype.
194                        nan_count(&array_ref, &mut ctx)
195                    })
196                    .transpose()?
197                    .map(|s| s.into())
198            }
199        })
200    }
201
202    pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
203        let mut stats_set = StatsSet::default();
204        for &stat in stats {
205            if let Some(s) = self.compute_stat(stat)?
206                && let Some(value) = s.into_value()
207            {
208                stats_set.set(stat, Precision::exact(value));
209            }
210        }
211        Ok(stats_set)
212    }
213}
214
215impl StatsSetRef<'_> {
216    pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
217        &self,
218        stat: Stat,
219    ) -> Option<U> {
220        self.compute_stat(stat)
221            .inspect_err(|e| tracing::warn!("Failed to compute stat {stat}: {e}"))
222            .ok()
223            .flatten()
224            .map(|s| U::try_from(&s))
225            .transpose()
226            .unwrap_or_else(|err| {
227                vortex_panic!(
228                    err,
229                    "Failed to compute stat {} as {}",
230                    stat,
231                    std::any::type_name::<U>()
232                )
233            })
234    }
235
236    pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
237        self.array_stats.set(stat, value);
238    }
239
240    pub fn clear(&self, stat: Stat) {
241        self.array_stats.clear(stat);
242    }
243
244    pub fn compute_min<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
245        self.compute_as(Stat::Min)
246    }
247
248    pub fn compute_max<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
249        self.compute_as(Stat::Max)
250    }
251
252    pub fn compute_is_sorted(&self) -> Option<bool> {
253        self.compute_as(Stat::IsSorted)
254    }
255
256    pub fn compute_is_strict_sorted(&self) -> Option<bool> {
257        self.compute_as(Stat::IsStrictSorted)
258    }
259
260    pub fn compute_is_constant(&self) -> Option<bool> {
261        self.compute_as(Stat::IsConstant)
262    }
263
264    pub fn compute_null_count(&self) -> Option<usize> {
265        self.compute_as(Stat::NullCount)
266    }
267
268    pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
269        self.compute_as(Stat::UncompressedSizeInBytes)
270    }
271}
272
273impl StatsProvider for StatsSetRef<'_> {
274    fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
275        self.array_stats
276            .inner
277            .read()
278            .as_typed_ref(self.dyn_array_ref.dtype())
279            .get(stat)
280    }
281
282    fn len(&self) -> usize {
283        self.array_stats.inner.read().len()
284    }
285}