vortex_array/stats/
array.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_error::{VortexError, VortexResult, vortex_panic};
10use vortex_scalar::{Scalar, ScalarValue};
11
12use super::{Precision, Stat, StatsProvider, StatsSet, StatsSetIntoIter, TypedStatsSetRef};
13use crate::Array;
14use crate::compute::{
15 MinMaxResult, is_constant, is_sorted, is_strict_sorted, min_max, nan_count, sum,
16};
17
18#[derive(Clone, Default, Debug)]
21pub struct ArrayStats {
22 inner: Arc<RwLock<StatsSet>>,
23}
24
25pub struct StatsSetRef<'a> {
29 dyn_array_ref: &'a dyn Array,
31 array_stats: &'a ArrayStats,
32}
33
34impl ArrayStats {
35 pub fn to_ref<'a>(&'a self, array: &'a dyn Array) -> StatsSetRef<'a> {
36 StatsSetRef {
37 dyn_array_ref: array,
38 array_stats: self,
39 }
40 }
41
42 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
43 self.inner.write().set(stat, value);
44 }
45
46 pub fn clear(&self, stat: Stat) {
47 self.inner.write().clear(stat);
48 }
49
50 pub fn retain(&self, stats: &[Stat]) {
51 self.inner.write().retain_only(stats);
52 }
53}
54
55impl From<StatsSet> for ArrayStats {
56 fn from(value: StatsSet) -> Self {
57 Self {
58 inner: Arc::new(RwLock::new(value)),
59 }
60 }
61}
62
63impl From<ArrayStats> for StatsSet {
64 fn from(value: ArrayStats) -> Self {
65 value.inner.read().clone()
66 }
67}
68
69impl StatsSetRef<'_> {
70 pub fn set_iter(&self, iter: StatsSetIntoIter) {
71 let mut guard = self.array_stats.inner.write();
72 for (stat, value) in iter {
73 guard.set(stat, value);
74 }
75 }
76
77 pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
78 if !Arc::ptr_eq(&self.array_stats.inner, &stats.array_stats.inner) {
80 stats.with_iter(|iter| self.inherit(iter));
81 }
82 }
83
84 pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
85 let mut guard = self.array_stats.inner.write();
86 for (stat, value) in iter {
87 if !value.is_exact() {
88 if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
89 guard.set(*stat, value.clone());
90 }
91 } else {
92 guard.set(*stat, value.clone());
93 }
94 }
95 }
96
97 pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
98 apply(
99 self.array_stats
100 .inner
101 .read()
102 .as_typed_ref(self.dyn_array_ref.dtype()),
103 )
104 }
105
106 pub fn to_owned(&self) -> StatsSet {
107 self.array_stats.inner.read().clone()
108 }
109
110 pub fn with_iter<
111 F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
112 R,
113 >(
114 &self,
115 f: F,
116 ) -> R {
117 let lock = self.array_stats.inner.read();
118 f(&mut lock.iter())
119 }
120
121 pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<Scalar>> {
122 if let Some(Precision::Exact(s)) = self.get(stat) {
124 return Ok(Some(s));
125 }
126
127 Ok(match stat {
128 Stat::Min => min_max(self.dyn_array_ref)?.map(|MinMaxResult { min, max: _ }| min),
129 Stat::Max => min_max(self.dyn_array_ref)?.map(|MinMaxResult { min: _, max }| max),
130 Stat::Sum => {
131 Stat::Sum
132 .dtype(self.dyn_array_ref.dtype())
133 .is_some()
134 .then(|| {
135 sum(self.dyn_array_ref)
137 })
138 .transpose()?
139 }
140 Stat::NullCount => Some(self.dyn_array_ref.invalid_count()?.into()),
141 Stat::IsConstant => {
142 if self.dyn_array_ref.is_empty() {
143 None
144 } else {
145 is_constant(self.dyn_array_ref)?.map(|v| v.into())
146 }
147 }
148 Stat::IsSorted => Some(is_sorted(self.dyn_array_ref)?.into()),
149 Stat::IsStrictSorted => Some(is_strict_sorted(self.dyn_array_ref)?.into()),
150 Stat::UncompressedSizeInBytes => {
151 let nbytes = self.dyn_array_ref.to_canonical()?.as_ref().nbytes();
152 self.set(stat, Precision::exact(nbytes));
153 Some(nbytes.into())
154 }
155 Stat::NaNCount => {
156 Stat::NaNCount
157 .dtype(self.dyn_array_ref.dtype())
158 .is_some()
159 .then(|| {
160 nan_count(self.dyn_array_ref)
162 })
163 .transpose()?
164 .map(|s| s.into())
165 }
166 })
167 }
168
169 pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
170 let mut stats_set = StatsSet::default();
171 for &stat in stats {
172 if let Some(s) = self.compute_stat(stat)? {
173 stats_set.set(stat, Precision::exact(s.into_value()))
174 }
175 }
176 Ok(stats_set)
177 }
178}
179
180impl StatsSetRef<'_> {
181 pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
182 &self,
183 stat: Stat,
184 ) -> Option<U> {
185 self.compute_stat(stat)
186 .inspect_err(|e| log::warn!("Failed to compute stat {stat}: {e}"))
187 .ok()
188 .flatten()
189 .map(|s| U::try_from(&s))
190 .transpose()
191 .unwrap_or_else(|err| {
192 vortex_panic!(
193 err,
194 "Failed to compute stat {} as {}",
195 stat,
196 std::any::type_name::<U>()
197 )
198 })
199 }
200
201 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
202 self.array_stats.set(stat, value);
203 }
204
205 pub fn clear(&self, stat: Stat) {
206 self.array_stats.clear(stat);
207 }
208
209 pub fn retain(&self, stats: &[Stat]) {
210 self.array_stats.retain(stats);
211 }
212
213 pub fn compute_min<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
214 self.compute_as(Stat::Min)
215 }
216
217 pub fn compute_max<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
218 self.compute_as(Stat::Max)
219 }
220
221 pub fn compute_is_sorted(&self) -> Option<bool> {
222 self.compute_as(Stat::IsSorted)
223 }
224
225 pub fn compute_is_strict_sorted(&self) -> Option<bool> {
226 self.compute_as(Stat::IsStrictSorted)
227 }
228
229 pub fn compute_is_constant(&self) -> Option<bool> {
230 self.compute_as(Stat::IsConstant)
231 }
232
233 pub fn compute_null_count(&self) -> Option<usize> {
234 self.compute_as(Stat::NullCount)
235 }
236
237 pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
238 self.compute_as(Stat::UncompressedSizeInBytes)
239 }
240}
241
242impl StatsProvider for StatsSetRef<'_> {
243 fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
244 self.array_stats
245 .inner
246 .read()
247 .as_typed_ref(self.dyn_array_ref.dtype())
248 .get(stat)
249 }
250
251 fn len(&self) -> usize {
252 self.array_stats.inner.read().len()
253 }
254}