vortex_array/stats/
array.rs

1//! Stats as they are stored on arrays.
2
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use vortex_error::{VortexError, VortexResult, vortex_panic};
7use vortex_scalar::ScalarValue;
8
9use super::{
10    Precision, Stat, StatType, StatsProvider, StatsProviderExt, StatsSet, StatsSetIntoIter,
11};
12use crate::Array;
13use crate::compute::{MinMaxResult, is_constant, min_max, sum, uncompressed_size};
14
15/// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place.
16// TODO(adamg): This is a very bad name.
17#[derive(Clone, Default, Debug)]
18pub struct ArrayStats {
19    inner: Arc<RwLock<StatsSet>>,
20}
21
22/// Reference to an array's [`StatsSet`]. Can be used to get and mutate the underlying stats.
23pub struct StatsSetRef<'a> {
24    // We need to reference back to the array
25    dyn_array_ref: &'a dyn Array,
26    parent_stats: ArrayStats,
27}
28
29impl ArrayStats {
30    pub fn to_ref<'a>(&self, array: &'a dyn Array) -> StatsSetRef<'a> {
31        StatsSetRef {
32            dyn_array_ref: array,
33            parent_stats: self.clone(),
34        }
35    }
36
37    pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
38        self.inner.write().set(stat, value);
39    }
40
41    pub fn clear(&self, stat: Stat) {
42        self.inner.write().clear(stat);
43    }
44
45    pub fn retain(&self, stats: &[Stat]) {
46        self.inner.write().retain_only(stats);
47    }
48}
49
50impl From<StatsSet> for ArrayStats {
51    fn from(value: StatsSet) -> Self {
52        Self {
53            inner: Arc::new(RwLock::new(value)),
54        }
55    }
56}
57
58impl From<ArrayStats> for StatsSet {
59    fn from(value: ArrayStats) -> Self {
60        value.inner.read().clone()
61    }
62}
63
64impl StatsProvider for ArrayStats {
65    fn get(&self, stat: Stat) -> Option<Precision<ScalarValue>> {
66        let guard = self.inner.read();
67        guard.get(stat)
68    }
69
70    fn len(&self) -> usize {
71        let guard = self.inner.read();
72        guard.len()
73    }
74}
75
76impl StatsSetRef<'_> {
77    pub fn set_iter(&self, iter: StatsSetIntoIter) {
78        let mut guard = self.parent_stats.inner.write();
79
80        for (stat, value) in iter {
81            guard.set(stat, value);
82        }
83    }
84
85    pub fn inherit(&self, parent_stats: StatsSetRef<'_>) {
86        // TODO(ngates): depending on statistic, this should choose the more precise one
87        self.set_iter(parent_stats.into_iter());
88    }
89
90    // TODO(adamg): potentially problematic name
91    pub fn to_owned(&self) -> StatsSet {
92        self.parent_stats.inner.read().clone()
93    }
94
95    pub fn into_iter(&self) -> StatsSetIntoIter {
96        self.to_owned().into_iter()
97    }
98
99    pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<ScalarValue>> {
100        // If it's already computed and exact, we can return it.
101        if let Some(Precision::Exact(stat)) = self.get(stat) {
102            return Ok(Some(stat));
103        }
104
105        // NOTE(ngates): this is the beginning of the stats refactor that pushes stats compute into
106        //  regular compute functions.
107        Ok(match stat {
108            Stat::Min => {
109                min_max(self.dyn_array_ref)?.map(|MinMaxResult { min, max: _ }| min.into_value())
110            }
111            Stat::Max => {
112                min_max(self.dyn_array_ref)?.map(|MinMaxResult { min: _, max }| max.into_value())
113            }
114            Stat::Sum => {
115                Stat::Sum
116                    .dtype(self.dyn_array_ref.dtype())
117                    .is_some()
118                    .then(|| {
119                        // Sum is supported for this dtype.
120                        sum(self.dyn_array_ref)
121                    })
122                    .transpose()?
123                    .map(|s| s.into_value())
124            }
125            Stat::NullCount => Some(self.dyn_array_ref.invalid_count()?.into()),
126            Stat::IsConstant => {
127                if self.dyn_array_ref.is_empty() {
128                    None
129                } else {
130                    Some(is_constant(self.dyn_array_ref)?.into())
131                }
132            }
133            Stat::UncompressedSizeInBytes => Some(uncompressed_size(self.dyn_array_ref)?.into()),
134            _ => {
135                let vtable = self.dyn_array_ref.vtable();
136                let stats_set = vtable.compute_statistics(self.dyn_array_ref, stat)?;
137                // Update the stats set with all the computed stats.
138                for (stat, value) in stats_set.into_iter() {
139                    self.set(stat, value);
140                }
141                self.get(stat).and_then(|p| p.as_exact())
142            }
143        })
144    }
145
146    pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
147        let mut stats_set = StatsSet::default();
148        for stat in stats {
149            if let Some(s) = self.compute_stat(*stat)? {
150                stats_set.set(*stat, Precision::exact(s))
151            }
152        }
153        Ok(stats_set)
154    }
155}
156
157impl StatsSetRef<'_> {
158    pub fn get_as<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
159        &self,
160        stat: Stat,
161    ) -> Option<Precision<U>> {
162        StatsProviderExt::get_as::<U>(self, stat)
163    }
164
165    pub fn get_as_bound<S, U>(&self) -> Option<S::Bound>
166    where
167        S: StatType<U>,
168        U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>,
169    {
170        StatsProviderExt::get_as_bound::<S, U>(self)
171    }
172
173    pub fn compute_as<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
174        &self,
175        stat: Stat,
176    ) -> Option<U> {
177        self.compute_stat(stat)
178            .inspect_err(|e| log::warn!("Failed to compute stat {}: {}", stat, e))
179            .ok()
180            .flatten()
181            .map(|s| U::try_from(&s))
182            .transpose()
183            .unwrap_or_else(|err| {
184                vortex_panic!(
185                    err,
186                    "Failed to compute stat {} as {}",
187                    stat,
188                    std::any::type_name::<U>()
189                )
190            })
191    }
192
193    pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
194        self.parent_stats.set(stat, value);
195    }
196
197    pub fn clear(&self, stat: Stat) {
198        self.parent_stats.clear(stat);
199    }
200
201    pub fn retain(&self, stats: &[Stat]) {
202        self.parent_stats.retain(stats);
203    }
204
205    pub fn compute_min<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
206        &self,
207    ) -> Option<U> {
208        self.compute_as(Stat::Min)
209    }
210
211    pub fn compute_max<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
212        &self,
213    ) -> Option<U> {
214        self.compute_as(Stat::Max)
215    }
216
217    pub fn compute_is_sorted(&self) -> Option<bool> {
218        self.compute_as(Stat::IsSorted)
219    }
220
221    pub fn compute_is_strict_sorted(&self) -> Option<bool> {
222        self.compute_as(Stat::IsStrictSorted)
223    }
224
225    pub fn compute_is_constant(&self) -> Option<bool> {
226        self.compute_as(Stat::IsConstant)
227    }
228
229    pub fn compute_null_count(&self) -> Option<usize> {
230        self.compute_as(Stat::NullCount)
231    }
232
233    pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
234        self.compute_as(Stat::UncompressedSizeInBytes)
235    }
236}
237
238impl StatsProvider for StatsSetRef<'_> {
239    fn get(&self, stat: Stat) -> Option<Precision<ScalarValue>> {
240        self.parent_stats.get(stat)
241    }
242
243    fn len(&self) -> usize {
244        self.parent_stats.len()
245    }
246}