vortex_array/stats/
array.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_error::VortexError;
10use vortex_error::VortexResult;
11use vortex_error::vortex_panic;
12
13use super::MutTypedStatsSetRef;
14use super::StatsSet;
15use super::StatsSetIntoIter;
16use super::TypedStatsSetRef;
17use crate::DynArray;
18use crate::LEGACY_SESSION;
19use crate::VortexSessionExecute;
20use crate::aggregate_fn::fns::is_constant::is_constant;
21use crate::aggregate_fn::fns::is_sorted::is_sorted;
22use crate::aggregate_fn::fns::is_sorted::is_strict_sorted;
23use crate::aggregate_fn::fns::min_max::MinMaxResult;
24use crate::aggregate_fn::fns::min_max::min_max;
25use crate::aggregate_fn::fns::nan_count::nan_count;
26use crate::aggregate_fn::fns::sum::sum;
27use crate::builders::builder_with_capacity;
28use crate::expr::stats::Precision;
29use crate::expr::stats::Stat;
30use crate::expr::stats::StatsProvider;
31use crate::scalar::Scalar;
32use crate::scalar::ScalarValue;
33
34#[derive(Clone, Default, Debug)]
37pub struct ArrayStats {
38 inner: Arc<RwLock<StatsSet>>,
39}
40
41pub struct StatsSetRef<'a> {
45 dyn_array_ref: &'a dyn DynArray,
47 array_stats: &'a ArrayStats,
48}
49
50impl ArrayStats {
51 pub fn to_ref<'a>(&'a self, array: &'a dyn DynArray) -> StatsSetRef<'a> {
52 StatsSetRef {
53 dyn_array_ref: array,
54 array_stats: self,
55 }
56 }
57
58 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
59 self.inner.write().set(stat, value);
60 }
61
62 pub fn clear(&self, stat: Stat) {
63 self.inner.write().clear(stat);
64 }
65
66 pub fn retain(&self, stats: &[Stat]) {
67 self.inner.write().retain_only(stats);
68 }
69}
70
71impl From<StatsSet> for ArrayStats {
72 fn from(value: StatsSet) -> Self {
73 Self {
74 inner: Arc::new(RwLock::new(value)),
75 }
76 }
77}
78
79impl From<ArrayStats> for StatsSet {
80 fn from(value: ArrayStats) -> Self {
81 value.inner.read().clone()
82 }
83}
84
85impl StatsSetRef<'_> {
86 pub fn set_iter(&self, iter: StatsSetIntoIter) {
87 let mut guard = self.array_stats.inner.write();
88 for (stat, value) in iter {
89 guard.set(stat, value);
90 }
91 }
92
93 pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
94 if !Arc::ptr_eq(&self.array_stats.inner, &stats.array_stats.inner) {
96 stats.with_iter(|iter| self.inherit(iter));
97 }
98 }
99
100 pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
101 let mut guard = self.array_stats.inner.write();
102 for (stat, value) in iter {
103 if !value.is_exact() {
104 if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
105 guard.set(*stat, value.clone());
106 }
107 } else {
108 guard.set(*stat, value.clone());
109 }
110 }
111 }
112
113 pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
114 apply(
115 self.array_stats
116 .inner
117 .read()
118 .as_typed_ref(self.dyn_array_ref.dtype()),
119 )
120 }
121
122 pub fn with_mut_typed_stats_set<U, F: FnOnce(MutTypedStatsSetRef) -> U>(&self, apply: F) -> U {
123 apply(
124 self.array_stats
125 .inner
126 .write()
127 .as_mut_typed_ref(self.dyn_array_ref.dtype()),
128 )
129 }
130
131 pub fn to_owned(&self) -> StatsSet {
132 self.array_stats.inner.read().clone()
133 }
134
135 pub fn to_array_stats(&self) -> ArrayStats {
139 self.array_stats.clone()
140 }
141
142 pub fn with_iter<
143 F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
144 R,
145 >(
146 &self,
147 f: F,
148 ) -> R {
149 let lock = self.array_stats.inner.read();
150 f(&mut lock.iter())
151 }
152
153 pub fn compute_stat(&self, stat: Stat) -> VortexResult<Option<Scalar>> {
154 let mut ctx = LEGACY_SESSION.create_execution_ctx();
155
156 if let Some(Precision::Exact(s)) = self.get(stat) {
158 return Ok(Some(s));
159 }
160
161 let array_ref = self.dyn_array_ref.to_array();
162 Ok(match stat {
163 Stat::Min => min_max(&array_ref, &mut ctx)?.map(|MinMaxResult { min, max: _ }| min),
164 Stat::Max => min_max(&array_ref, &mut ctx)?.map(|MinMaxResult { min: _, max }| max),
165 Stat::Sum => {
166 Stat::Sum
167 .dtype(self.dyn_array_ref.dtype())
168 .is_some()
169 .then(|| {
170 sum(&array_ref, &mut ctx)
172 })
173 .transpose()?
174 }
175 Stat::NullCount => self.dyn_array_ref.invalid_count().ok().map(Into::into),
176 Stat::IsConstant => {
177 if self.dyn_array_ref.is_empty() {
178 None
179 } else {
180 Some(is_constant(&array_ref, &mut ctx)?.into())
181 }
182 }
183 Stat::IsSorted => Some(is_sorted(&array_ref, &mut ctx)?.into()),
184 Stat::IsStrictSorted => Some(is_strict_sorted(&array_ref, &mut ctx)?.into()),
185 Stat::UncompressedSizeInBytes => {
186 let mut builder =
187 builder_with_capacity(self.dyn_array_ref.dtype(), self.dyn_array_ref.len());
188 unsafe {
189 builder.extend_from_array_unchecked(&array_ref);
190 }
191 let nbytes = builder.finish().nbytes();
192 self.set(stat, Precision::exact(nbytes));
193 Some(nbytes.into())
194 }
195 Stat::NaNCount => {
196 Stat::NaNCount
197 .dtype(self.dyn_array_ref.dtype())
198 .is_some()
199 .then(|| {
200 nan_count(&array_ref, &mut ctx)
202 })
203 .transpose()?
204 .map(|s| s.into())
205 }
206 })
207 }
208
209 pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
210 let mut stats_set = StatsSet::default();
211 for &stat in stats {
212 if let Some(s) = self.compute_stat(stat)?
213 && let Some(value) = s.into_value()
214 {
215 stats_set.set(stat, Precision::exact(value));
216 }
217 }
218 Ok(stats_set)
219 }
220}
221
222impl StatsSetRef<'_> {
223 pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
224 &self,
225 stat: Stat,
226 ) -> Option<U> {
227 self.compute_stat(stat)
228 .inspect_err(|e| tracing::warn!("Failed to compute stat {stat}: {e}"))
229 .ok()
230 .flatten()
231 .map(|s| U::try_from(&s))
232 .transpose()
233 .unwrap_or_else(|err| {
234 vortex_panic!(
235 err,
236 "Failed to compute stat {} as {}",
237 stat,
238 std::any::type_name::<U>()
239 )
240 })
241 }
242
243 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
244 self.array_stats.set(stat, value);
245 }
246
247 pub fn clear(&self, stat: Stat) {
248 self.array_stats.clear(stat);
249 }
250
251 pub fn compute_min<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
252 self.compute_as(Stat::Min)
253 }
254
255 pub fn compute_max<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(&self) -> Option<U> {
256 self.compute_as(Stat::Max)
257 }
258
259 pub fn compute_is_sorted(&self) -> Option<bool> {
260 self.compute_as(Stat::IsSorted)
261 }
262
263 pub fn compute_is_strict_sorted(&self) -> Option<bool> {
264 self.compute_as(Stat::IsStrictSorted)
265 }
266
267 pub fn compute_is_constant(&self) -> Option<bool> {
268 self.compute_as(Stat::IsConstant)
269 }
270
271 pub fn compute_null_count(&self) -> Option<usize> {
272 self.compute_as(Stat::NullCount)
273 }
274
275 pub fn compute_uncompressed_size_in_bytes(&self) -> Option<usize> {
276 self.compute_as(Stat::UncompressedSizeInBytes)
277 }
278}
279
280impl StatsProvider for StatsSetRef<'_> {
281 fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
282 self.array_stats
283 .inner
284 .read()
285 .as_typed_ref(self.dyn_array_ref.dtype())
286 .get(stat)
287 }
288
289 fn len(&self) -> usize {
290 self.array_stats.inner.read().len()
291 }
292}