vortex_array/stats/
array.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_error::{VortexError, VortexResult, vortex_panic};
10use vortex_scalar::ScalarValue;
11
12use super::{
13 Precision, Stat, StatType, StatsProvider, StatsProviderExt, StatsSet, StatsSetIntoIter,
14};
15use crate::Array;
16use crate::compute::{
17 MinMaxResult, is_constant, is_sorted, is_strict_sorted, min_max, nan_count, sum,
18};
19
20#[derive(Clone, Default, Debug)]
23pub struct ArrayStats {
24 inner: Arc<RwLock<StatsSet>>,
25}
26
27pub struct StatsSetRef<'a> {
31 dyn_array_ref: &'a dyn Array,
33 array_stats: &'a ArrayStats,
34}
35
36impl ArrayStats {
37 pub fn to_ref<'a>(&'a self, array: &'a dyn Array) -> StatsSetRef<'a> {
38 StatsSetRef {
39 dyn_array_ref: array,
40 array_stats: self,
41 }
42 }
43
44 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
45 self.inner.write().set(stat, value);
46 }
47
48 pub fn clear(&self, stat: Stat) {
49 self.inner.write().clear(stat);
50 }
51
52 pub fn retain(&self, stats: &[Stat]) {
53 self.inner.write().retain_only(stats);
54 }
55}
56
57impl From<StatsSet> for ArrayStats {
58 fn from(value: StatsSet) -> Self {
59 Self {
60 inner: Arc::new(RwLock::new(value)),
61 }
62 }
63}
64
65impl From<ArrayStats> for StatsSet {
66 fn from(value: ArrayStats) -> Self {
67 value.inner.read().clone()
68 }
69}
70
71impl StatsProvider for ArrayStats {
72 fn get(&self, stat: Stat) -> Option<Precision<ScalarValue>> {
73 let guard = self.inner.read();
74 guard.get(stat)
75 }
76
77 fn len(&self) -> usize {
78 let guard = self.inner.read();
79 guard.len()
80 }
81}
82
83impl StatsSetRef<'_> {
84 pub fn set_iter(&self, iter: StatsSetIntoIter) {
85 let mut guard = self.array_stats.inner.write();
86 for (stat, value) in iter {
87 guard.set(stat, value);
88 }
89 }
90
91 pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
92 stats.with_iter(|iter| self.inherit(iter));
93 }
94
95 pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
96 let mut guard = self.array_stats.inner.write();
98 for (stat, value) in iter {
99 guard.set(*stat, value.clone());
100 }
101 }
102
103 pub fn replace(&self, stats: StatsSet) {
104 *self.array_stats.inner.write() = stats;
105 }
106
107 pub fn to_owned(&self) -> StatsSet {
108 self.array_stats.inner.read().clone()
109 }
110
111 pub fn with_iter<
112 F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
113 R,
114 >(
115 &self,
116 f: F,
117 ) -> R {
118 let lock = self.array_stats.inner.read();
119 f(&mut lock.iter())
120 }
121
122 pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<ScalarValue>> {
123 if let Some(Precision::Exact(stat)) = self.get(stat) {
125 return Ok(Some(stat));
126 }
127
128 Ok(match stat {
129 Stat::Min => {
130 min_max(self.dyn_array_ref)?.map(|MinMaxResult { min, max: _ }| min.into_value())
131 }
132 Stat::Max => {
133 min_max(self.dyn_array_ref)?.map(|MinMaxResult { min: _, max }| max.into_value())
134 }
135 Stat::Sum => {
136 Stat::Sum
137 .dtype(self.dyn_array_ref.dtype())
138 .is_some()
139 .then(|| {
140 sum(self.dyn_array_ref)
142 })
143 .transpose()?
144 .map(|s| s.into_value())
145 }
146 Stat::NullCount => Some(self.dyn_array_ref.invalid_count()?.into()),
147 Stat::IsConstant => {
148 if self.dyn_array_ref.is_empty() {
149 None
150 } else {
151 is_constant(self.dyn_array_ref)?.map(ScalarValue::from)
152 }
153 }
154 Stat::IsSorted => Some(is_sorted(self.dyn_array_ref)?.into()),
155 Stat::IsStrictSorted => Some(is_strict_sorted(self.dyn_array_ref)?.into()),
156 Stat::UncompressedSizeInBytes => {
157 let nbytes: ScalarValue =
158 self.dyn_array_ref.to_canonical()?.as_ref().nbytes().into();
159 self.set(stat, Precision::exact(nbytes.clone()));
160 Some(nbytes)
161 }
162 Stat::NaNCount => {
163 Stat::NaNCount
164 .dtype(self.dyn_array_ref.dtype())
165 .is_some()
166 .then(|| {
167 nan_count(self.dyn_array_ref)
169 })
170 .transpose()?
171 .map(|s| s.into())
172 }
173 })
174 }
175
176 pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
177 let mut stats_set = StatsSet::default();
178 for &stat in stats {
179 if let Some(s) = self.compute_stat(stat)? {
180 stats_set.set(stat, Precision::exact(s))
181 }
182 }
183 Ok(stats_set)
184 }
185}
186
187impl StatsSetRef<'_> {
188 pub fn get_as<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
189 &self,
190 stat: Stat,
191 ) -> Option<Precision<U>> {
192 StatsProviderExt::get_as::<U>(self, stat)
193 }
194
195 pub fn get_as_bound<S, U>(&self) -> Option<S::Bound>
196 where
197 S: StatType<U>,
198 U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>,
199 {
200 StatsProviderExt::get_as_bound::<S, U>(self)
201 }
202
203 pub fn compute_as<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
204 &self,
205 stat: Stat,
206 ) -> Option<U> {
207 self.compute_stat(stat)
208 .inspect_err(|e| log::warn!("Failed to compute stat {stat}: {e}"))
209 .ok()
210 .flatten()
211 .map(|s| U::try_from(&s))
212 .transpose()
213 .unwrap_or_else(|err| {
214 vortex_panic!(
215 err,
216 "Failed to compute stat {} as {}",
217 stat,
218 std::any::type_name::<U>()
219 )
220 })
221 }
222
223 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
224 self.array_stats.set(stat, value);
225 }
226
227 pub fn clear(&self, stat: Stat) {
228 self.array_stats.clear(stat);
229 }
230
231 pub fn retain(&self, stats: &[Stat]) {
232 self.array_stats.retain(stats);
233 }
234
235 pub fn compute_min<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
236 &self,
237 ) -> Option<U> {
238 self.compute_as(Stat::Min)
239 }
240
241 pub fn compute_max<U: for<'a> TryFrom<&'a ScalarValue, Error = VortexError>>(
242 &self,
243 ) -> Option<U> {
244 self.compute_as(Stat::Max)
245 }
246
247 pub fn compute_is_sorted(&self) -> Option<bool> {
248 self.compute_as(Stat::IsSorted)
249 }
250
251 pub fn compute_is_strict_sorted(&self) -> Option<bool> {
252 self.compute_as(Stat::IsStrictSorted)
253 }
254
255 pub fn compute_is_constant(&self) -> Option<bool> {
256 self.compute_as(Stat::IsConstant)
257 }
258
259 pub fn compute_null_count(&self) -> Option<usize> {
260 self.compute_as(Stat::NullCount)
261 }
262
263 pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
264 self.compute_as(Stat::UncompressedSizeInBytes)
265 }
266}
267
268impl StatsProvider for StatsSetRef<'_> {
269 fn get(&self, stat: Stat) -> Option<Precision<ScalarValue>> {
270 self.array_stats.get(stat)
271 }
272
273 fn len(&self) -> usize {
274 self.array_stats.len()
275 }
276}