vortex_array/stats/
array.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_error::{VortexError, VortexResult, vortex_panic};
10use vortex_scalar::{Scalar, ScalarValue};
11
12use super::{
13 MutTypedStatsSetRef, Precision, Stat, StatsProvider, StatsSet, StatsSetIntoIter,
14 TypedStatsSetRef,
15};
16use crate::Array;
17use crate::builders::builder_with_capacity;
18use crate::compute::{
19 MinMaxResult, is_constant, is_sorted, is_strict_sorted, min_max, nan_count, sum,
20};
21
22#[derive(Clone, Default, Debug)]
25pub struct ArrayStats {
26 inner: Arc<RwLock<StatsSet>>,
27}
28
29pub struct StatsSetRef<'a> {
33 dyn_array_ref: &'a dyn Array,
35 array_stats: &'a ArrayStats,
36}
37
38impl ArrayStats {
39 pub fn to_ref<'a>(&'a self, array: &'a dyn Array) -> StatsSetRef<'a> {
40 StatsSetRef {
41 dyn_array_ref: array,
42 array_stats: self,
43 }
44 }
45
46 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
47 self.inner.write().set(stat, value);
48 }
49
50 pub fn clear(&self, stat: Stat) {
51 self.inner.write().clear(stat);
52 }
53
54 pub fn retain(&self, stats: &[Stat]) {
55 self.inner.write().retain_only(stats);
56 }
57}
58
59impl From<StatsSet> for ArrayStats {
60 fn from(value: StatsSet) -> Self {
61 Self {
62 inner: Arc::new(RwLock::new(value)),
63 }
64 }
65}
66
67impl From<ArrayStats> for StatsSet {
68 fn from(value: ArrayStats) -> Self {
69 value.inner.read().clone()
70 }
71}
72
73impl StatsSetRef<'_> {
74 pub fn set_iter(&self, iter: StatsSetIntoIter) {
75 let mut guard = self.array_stats.inner.write();
76 for (stat, value) in iter {
77 guard.set(stat, value);
78 }
79 }
80
81 pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
82 if !Arc::ptr_eq(&self.array_stats.inner, &stats.array_stats.inner) {
84 stats.with_iter(|iter| self.inherit(iter));
85 }
86 }
87
88 pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
89 let mut guard = self.array_stats.inner.write();
90 for (stat, value) in iter {
91 if !value.is_exact() {
92 if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
93 guard.set(*stat, value.clone());
94 }
95 } else {
96 guard.set(*stat, value.clone());
97 }
98 }
99 }
100
101 pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
102 apply(
103 self.array_stats
104 .inner
105 .read()
106 .as_typed_ref(self.dyn_array_ref.dtype()),
107 )
108 }
109
110 pub fn with_mut_typed_stats_set<U, F: FnOnce(MutTypedStatsSetRef) -> U>(&self, apply: F) -> U {
111 apply(
112 self.array_stats
113 .inner
114 .write()
115 .as_mut_typed_ref(self.dyn_array_ref.dtype()),
116 )
117 }
118
119 pub fn to_owned(&self) -> StatsSet {
120 self.array_stats.inner.read().clone()
121 }
122
123 pub fn with_iter<
124 F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
125 R,
126 >(
127 &self,
128 f: F,
129 ) -> R {
130 let lock = self.array_stats.inner.read();
131 f(&mut lock.iter())
132 }
133
134 pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<Scalar>> {
135 if let Some(Precision::Exact(s)) = self.get(stat) {
137 return Ok(Some(s));
138 }
139
140 Ok(match stat {
141 Stat::Min => min_max(self.dyn_array_ref)?.map(|MinMaxResult { min, max: _ }| min),
142 Stat::Max => min_max(self.dyn_array_ref)?.map(|MinMaxResult { min: _, max }| max),
143 Stat::Sum => {
144 Stat::Sum
145 .dtype(self.dyn_array_ref.dtype())
146 .is_some()
147 .then(|| {
148 sum(self.dyn_array_ref)
150 })
151 .transpose()?
152 }
153 Stat::NullCount => Some(self.dyn_array_ref.invalid_count().into()),
154 Stat::IsConstant => {
155 if self.dyn_array_ref.is_empty() {
156 None
157 } else {
158 is_constant(self.dyn_array_ref)?.map(|v| v.into())
159 }
160 }
161 Stat::IsSorted => is_sorted(self.dyn_array_ref)?.map(|v| v.into()),
162 Stat::IsStrictSorted => is_strict_sorted(self.dyn_array_ref)?.map(|v| v.into()),
163 Stat::UncompressedSizeInBytes => {
164 let mut builder =
165 builder_with_capacity(self.dyn_array_ref.dtype(), self.dyn_array_ref.len());
166 unsafe {
167 builder.extend_from_array_unchecked(self.dyn_array_ref);
168 }
169 let nbytes = builder.finish().nbytes();
170 self.set(stat, Precision::exact(nbytes));
171 Some(nbytes.into())
172 }
173 Stat::NaNCount => {
174 Stat::NaNCount
175 .dtype(self.dyn_array_ref.dtype())
176 .is_some()
177 .then(|| {
178 nan_count(self.dyn_array_ref)
180 })
181 .transpose()?
182 .map(|s| s.into())
183 }
184 })
185 }
186
187 pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
188 let mut stats_set = StatsSet::default();
189 for &stat in stats {
190 if let Some(s) = self.compute_stat(stat)? {
191 stats_set.set(stat, Precision::exact(s.into_value()))
192 }
193 }
194 Ok(stats_set)
195 }
196}
197
198impl StatsSetRef<'_> {
199 pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
200 &self,
201 stat: Stat,
202 ) -> Option<U> {
203 self.compute_stat(stat)
204 .inspect_err(|e| log::warn!("Failed to compute stat {stat}: {e}"))
205 .ok()
206 .flatten()
207 .map(|s| U::try_from(&s))
208 .transpose()
209 .unwrap_or_else(|err| {
210 vortex_panic!(
211 err,
212 "Failed to compute stat {} as {}",
213 stat,
214 std::any::type_name::<U>()
215 )
216 })
217 }
218
219 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
220 self.array_stats.set(stat, value);
221 }
222
223 pub fn clear(&self, stat: Stat) {
224 self.array_stats.clear(stat);
225 }
226
227 pub fn compute_min<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
228 self.compute_as(Stat::Min)
229 }
230
231 pub fn compute_max<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
232 self.compute_as(Stat::Max)
233 }
234
235 pub fn compute_is_sorted(&self) -> Option<bool> {
236 self.compute_as(Stat::IsSorted)
237 }
238
239 pub fn compute_is_strict_sorted(&self) -> Option<bool> {
240 self.compute_as(Stat::IsStrictSorted)
241 }
242
243 pub fn compute_is_constant(&self) -> Option<bool> {
244 self.compute_as(Stat::IsConstant)
245 }
246
247 pub fn compute_null_count(&self) -> Option<usize> {
248 self.compute_as(Stat::NullCount)
249 }
250
251 pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
252 self.compute_as(Stat::UncompressedSizeInBytes)
253 }
254}
255
256impl StatsProvider for StatsSetRef<'_> {
257 fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
258 self.array_stats
259 .inner
260 .read()
261 .as_typed_ref(self.dyn_array_ref.dtype())
262 .get(stat)
263 }
264
265 fn len(&self) -> usize {
266 self.array_stats.inner.read().len()
267 }
268}