1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use vortex_array::ExecutionCtx;
10use vortex_error::VortexError;
11use vortex_error::VortexResult;
12use vortex_error::vortex_panic;
13
14use super::MutTypedStatsSetRef;
15use super::StatsSet;
16use super::StatsSetIntoIter;
17use super::TypedStatsSetRef;
18use crate::ArrayRef;
19use crate::aggregate_fn::fns::is_constant::is_constant;
20use crate::aggregate_fn::fns::is_sorted::is_sorted;
21use crate::aggregate_fn::fns::is_sorted::is_strict_sorted;
22use crate::aggregate_fn::fns::min_max::MinMaxResult;
23use crate::aggregate_fn::fns::min_max::min_max;
24use crate::aggregate_fn::fns::nan_count::nan_count;
25use crate::aggregate_fn::fns::sum::sum;
26use crate::builders::builder_with_capacity;
27use crate::expr::stats::Precision;
28use crate::expr::stats::Stat;
29use crate::expr::stats::StatsProvider;
30use crate::scalar::Scalar;
31use crate::scalar::ScalarValue;
32
33#[derive(Clone, Default, Debug)]
36pub struct ArrayStats {
37 inner: Arc<RwLock<StatsSet>>,
38}
39
40pub struct StatsSetRef<'a> {
44 dyn_array_ref: &'a ArrayRef,
46 array_stats: &'a ArrayStats,
47}
48
49impl ArrayStats {
50 pub fn to_ref<'a>(&'a self, array: &'a ArrayRef) -> StatsSetRef<'a> {
51 StatsSetRef {
52 dyn_array_ref: array,
53 array_stats: self,
54 }
55 }
56
57 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
58 self.inner.write().set(stat, value);
59 }
60
61 pub fn clear(&self, stat: Stat) {
62 self.inner.write().clear(stat);
63 }
64
65 pub fn retain(&self, stats: &[Stat]) {
66 self.inner.write().retain_only(stats);
67 }
68}
69
70impl From<StatsSet> for ArrayStats {
71 fn from(value: StatsSet) -> Self {
72 Self {
73 inner: Arc::new(RwLock::new(value)),
74 }
75 }
76}
77
78impl From<ArrayStats> for StatsSet {
79 fn from(value: ArrayStats) -> Self {
80 value.inner.read().clone()
81 }
82}
83
84impl StatsSetRef<'_> {
85 pub(crate) fn replace(&self, stats: StatsSet) {
86 *self.array_stats.inner.write() = stats;
87 }
88
89 pub fn set_iter(&self, iter: StatsSetIntoIter) {
90 let mut guard = self.array_stats.inner.write();
91 for (stat, value) in iter {
92 guard.set(stat, value);
93 }
94 }
95
96 pub fn inherit_from(&self, stats: StatsSetRef<'_>) {
97 if !Arc::ptr_eq(&self.array_stats.inner, &stats.array_stats.inner) {
99 stats.with_iter(|iter| self.inherit(iter));
100 }
101 }
102
103 pub fn inherit<'a>(&self, iter: impl Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) {
104 let mut guard = self.array_stats.inner.write();
105 for (stat, value) in iter {
106 if !value.is_exact() {
107 if !guard.get(*stat).is_some_and(|v| v.is_exact()) {
108 guard.set(*stat, value.clone());
109 }
110 } else {
111 guard.set(*stat, value.clone());
112 }
113 }
114 }
115
116 pub fn with_typed_stats_set<U, F: FnOnce(TypedStatsSetRef) -> U>(&self, apply: F) -> U {
117 apply(
118 self.array_stats
119 .inner
120 .read()
121 .as_typed_ref(self.dyn_array_ref.dtype()),
122 )
123 }
124
125 pub fn with_mut_typed_stats_set<U, F: FnOnce(MutTypedStatsSetRef) -> U>(&self, apply: F) -> U {
126 apply(
127 self.array_stats
128 .inner
129 .write()
130 .as_mut_typed_ref(self.dyn_array_ref.dtype()),
131 )
132 }
133
134 pub fn to_owned(&self) -> StatsSet {
135 self.array_stats.inner.read().clone()
136 }
137
138 pub fn to_array_stats(&self) -> ArrayStats {
142 self.array_stats.clone()
143 }
144
145 pub fn with_iter<
146 F: for<'a> FnOnce(&mut dyn Iterator<Item = &'a (Stat, Precision<ScalarValue>)>) -> R,
147 R,
148 >(
149 &self,
150 f: F,
151 ) -> R {
152 let lock = self.array_stats.inner.read();
153 f(&mut lock.iter())
154 }
155
156 pub fn compute_stat(&self, stat: Stat, ctx: &mut ExecutionCtx) -> VortexResult<Option<Scalar>> {
157 if let Some(Precision::Exact(s)) = self.get(stat) {
159 return Ok(Some(s));
160 }
161
162 Ok(match stat {
163 Stat::Min => min_max(self.dyn_array_ref, ctx)?.map(|MinMaxResult { min, max: _ }| min),
164 Stat::Max => min_max(self.dyn_array_ref, ctx)?.map(|MinMaxResult { min: _, max }| max),
165 Stat::Sum => {
166 Stat::Sum
167 .dtype(self.dyn_array_ref.dtype())
168 .is_some()
169 .then(|| {
170 sum(self.dyn_array_ref, ctx)
172 })
173 .transpose()?
174 }
175 Stat::NullCount => self.dyn_array_ref.invalid_count(ctx).ok().map(Into::into),
176 Stat::IsConstant => {
177 if self.dyn_array_ref.is_empty() {
178 None
179 } else {
180 Some(is_constant(self.dyn_array_ref, ctx)?.into())
181 }
182 }
183 Stat::IsSorted => Some(is_sorted(self.dyn_array_ref, ctx)?.into()),
184 Stat::IsStrictSorted => Some(is_strict_sorted(self.dyn_array_ref, ctx)?.into()),
185 Stat::UncompressedSizeInBytes => {
186 let mut builder =
187 builder_with_capacity(self.dyn_array_ref.dtype(), self.dyn_array_ref.len());
188 unsafe {
189 builder.extend_from_array_unchecked(self.dyn_array_ref);
190 }
191 let nbytes = builder.finish().nbytes();
192 self.set(stat, Precision::exact(nbytes));
193 Some(nbytes.into())
194 }
195 Stat::NaNCount => {
196 Stat::NaNCount
197 .dtype(self.dyn_array_ref.dtype())
198 .is_some()
199 .then(|| {
200 nan_count(self.dyn_array_ref, ctx)
202 })
203 .transpose()?
204 .map(|s| s.into())
205 }
206 })
207 }
208
209 pub fn compute_all(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
210 let mut stats_set = StatsSet::default();
211 for &stat in stats {
212 if let Some(s) = self.compute_stat(stat, ctx)?
213 && let Some(value) = s.into_value()
214 {
215 stats_set.set(stat, Precision::exact(value));
216 }
217 }
218 Ok(stats_set)
219 }
220}
221
222impl StatsSetRef<'_> {
223 pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
224 &self,
225 stat: Stat,
226 ctx: &mut ExecutionCtx,
227 ) -> Option<U> {
228 self.compute_stat(stat, ctx)
229 .inspect_err(|e| tracing::warn!("Failed to compute stat {stat}: {e}"))
230 .ok()
231 .flatten()
232 .map(|s| U::try_from(&s))
233 .transpose()
234 .unwrap_or_else(|err| {
235 vortex_panic!(
236 err,
237 "Failed to compute stat {} as {}",
238 stat,
239 std::any::type_name::<U>()
240 )
241 })
242 }
243
244 pub fn set(&self, stat: Stat, value: Precision<ScalarValue>) {
245 self.array_stats.set(stat, value);
246 }
247
248 pub fn clear(&self, stat: Stat) {
249 self.array_stats.clear(stat);
250 }
251
252 pub fn compute_min<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
253 &self,
254 ctx: &mut ExecutionCtx,
255 ) -> Option<U> {
256 self.compute_as(Stat::Min, ctx)
257 }
258
259 pub fn compute_max<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
260 &self,
261 ctx: &mut ExecutionCtx,
262 ) -> Option<U> {
263 self.compute_as(Stat::Max, ctx)
264 }
265
266 pub fn compute_is_sorted(&self, ctx: &mut ExecutionCtx) -> Option<bool> {
267 self.compute_as(Stat::IsSorted, ctx)
268 }
269
270 pub fn compute_is_strict_sorted(&self, ctx: &mut ExecutionCtx) -> Option<bool> {
271 self.compute_as(Stat::IsStrictSorted, ctx)
272 }
273
274 pub fn compute_is_constant(&self, ctx: &mut ExecutionCtx) -> Option<bool> {
275 self.compute_as(Stat::IsConstant, ctx)
276 }
277
278 pub fn compute_null_count(&self, ctx: &mut ExecutionCtx) -> Option<usize> {
279 self.compute_as(Stat::NullCount, ctx)
280 }
281
282 pub fn compute_uncompressed_size_in_bytes(&self, ctx: &mut ExecutionCtx) -> Option<usize> {
283 self.compute_as(Stat::UncompressedSizeInBytes, ctx)
284 }
285}
286
287impl StatsProvider for StatsSetRef<'_> {
288 fn get(&self, stat: Stat) -> Option<Precision<Scalar>> {
289 self.array_stats
290 .inner
291 .read()
292 .as_typed_ref(self.dyn_array_ref.dtype())
293 .get(stat)
294 }
295
296 fn len(&self) -> usize {
297 self.array_stats.inner.read().len()
298 }
299}