vortex_compressor/stats/
cache.rs1use std::any::Any;
7use std::any::TypeId;
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11use vortex_array::ArrayRef;
12use vortex_array::ArrayView;
13use vortex_array::ExecutionCtx;
14use vortex_array::arrays::Bool;
15use vortex_array::arrays::Primitive;
16use vortex_array::arrays::VarBinView;
17use vortex_error::VortexExpect;
18
19use super::BoolStats;
20use super::FloatStats;
21use super::GenerateStatsOptions;
22use super::IntegerStats;
23use super::StringStats;
24use crate::trace;
25
26type StatsEntry = (TypeId, Arc<dyn Any + Send + Sync>);
28
29struct StatsCache {
35 entries: Arc<Mutex<Vec<StatsEntry>>>,
41}
42
43impl StatsCache {
44 fn new() -> Self {
46 Self {
47 entries: Arc::new(Mutex::new(Vec::new())),
48 }
49 }
50
51 fn get_or_insert_with<T: Send + Sync + 'static>(&self, f: impl FnOnce() -> T) -> Arc<T> {
53 let type_id = TypeId::of::<T>();
54 let mut guard = self.entries.lock();
55
56 if let Some(pos) = guard.iter().position(|(id, _)| *id == type_id) {
57 Arc::clone(&guard[pos].1)
58 .downcast::<T>()
59 .ok()
60 .vortex_expect("we just checked the TypeID")
61 } else {
62 let new_arc: Arc<T> = {
63 let _span = trace::generate_stats_span(std::any::type_name::<T>()).entered();
64 Arc::new(f())
65 };
66 guard.push((type_id, Arc::clone(&new_arc) as Arc<dyn Any + Send + Sync>));
67 new_arc
68 }
69 }
70}
71
72pub struct ArrayAndStats {
89 array: ArrayRef,
91 cache: StatsCache,
93 opts: GenerateStatsOptions,
95}
96
97impl ArrayAndStats {
98 pub fn new(array: ArrayRef, opts: GenerateStatsOptions) -> Self {
106 assert!(
107 array.is_canonical(),
108 "ArrayAndStats should only be created with canonical arrays"
109 );
110
111 Self {
112 array,
113 cache: StatsCache::new(),
114 opts,
115 }
116 }
117
118 pub fn array(&self) -> &ArrayRef {
120 &self.array
121 }
122
123 pub fn array_as_primitive(&self) -> ArrayView<'_, Primitive> {
129 self.array
130 .as_opt::<Primitive>()
131 .vortex_expect("the array is guaranteed to already be canonical by construction")
132 }
133
134 pub fn array_as_utf8(&self) -> ArrayView<'_, VarBinView> {
140 self.array
141 .as_opt::<VarBinView>()
142 .vortex_expect("the array is guaranteed to already be canonical by construction")
143 }
144
145 pub fn into_array(self) -> ArrayRef {
147 self.array
148 }
149
150 pub fn array_len(&self) -> usize {
152 self.array.len()
153 }
154
155 pub fn bool_stats(&self, ctx: &mut ExecutionCtx) -> Arc<BoolStats> {
157 let array = self.array.clone();
158 self.cache.get_or_insert_with::<BoolStats>(|| {
159 let bool_array = array
160 .as_opt::<Bool>()
161 .vortex_expect("the array is guaranteed to already be canonical by construction")
162 .into_owned();
163 BoolStats::generate(&bool_array, ctx).vortex_expect("BoolStats shouldn't fail")
164 })
165 }
166
167 pub fn integer_stats(&self, ctx: &mut ExecutionCtx) -> Arc<IntegerStats> {
169 let array = self.array.clone();
170 let opts = self.opts;
171 self.cache.get_or_insert_with::<IntegerStats>(|| {
172 let primitive = array
173 .as_opt::<Primitive>()
174 .vortex_expect("the array is guaranteed to already be canonical by construction")
175 .into_owned();
176 IntegerStats::generate_opts(&primitive, opts, ctx)
177 })
178 }
179
180 pub fn float_stats(&self, ctx: &mut ExecutionCtx) -> Arc<FloatStats> {
182 let array = self.array.clone();
183 let opts = self.opts;
184 self.cache.get_or_insert_with::<FloatStats>(|| {
185 let primitive = array
186 .as_opt::<Primitive>()
187 .vortex_expect("the array is guaranteed to already be canonical by construction")
188 .into_owned();
189 FloatStats::generate_opts(&primitive, opts, ctx)
190 })
191 }
192
193 pub fn string_stats(&self, ctx: &mut ExecutionCtx) -> Arc<StringStats> {
195 let array = self.array.clone();
196 let opts = self.opts;
197 self.cache.get_or_insert_with::<StringStats>(|| {
198 let varbinview = array
199 .as_opt::<VarBinView>()
200 .vortex_expect("the array is guaranteed to already be canonical by construction")
201 .into_owned();
202 StringStats::generate_opts(&varbinview, opts, ctx)
203 })
204 }
205
206 pub fn get_or_insert_with<T: Send + Sync + 'static>(&self, f: impl FnOnce() -> T) -> Arc<T> {
208 self.cache.get_or_insert_with::<T>(f)
209 }
210}