vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::ExecutionCtx;
12use vortex_array::VortexSessionExecute;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::expr::stats::Stat;
18use vortex_array::stats::StatsSet;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_panic;
22use vortex_session::VortexSession;
23
24use crate::layouts::zoned::StatsAccumulator;
25use crate::sequence::SendableSequentialStream;
26use crate::sequence::SequenceId;
27use crate::sequence::SequentialStreamAdapter;
28use crate::sequence::SequentialStreamExt;
29
30pub fn accumulate_stats(
31 stream: SendableSequentialStream,
32 stats: Arc<[Stat]>,
33 max_variable_length_statistics_size: usize,
34 session: &VortexSession,
35) -> (FileStatsAccumulator, SendableSequentialStream) {
36 let accumulator = FileStatsAccumulator::new(
37 stream.dtype(),
38 stats,
39 max_variable_length_statistics_size,
40 session,
41 );
42 let stream = SequentialStreamAdapter::new(
43 stream.dtype().clone(),
44 stream.scan(accumulator.clone(), |acc, item| {
45 future::ready(Some(acc.process(item)))
46 }),
47 )
48 .sendable();
49 (accumulator, stream)
50}
51
52#[derive(Clone)]
56pub struct FileStatsAccumulator {
57 stats: Arc<[Stat]>,
58 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
59 ctx: Arc<Mutex<ExecutionCtx>>,
60}
61
62impl FileStatsAccumulator {
63 fn new(
64 dtype: &DType,
65 stats: Arc<[Stat]>,
66 max_variable_length_statistics_size: usize,
67 session: &VortexSession,
68 ) -> Self {
69 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
70 Some(struct_dtype) => {
71 if dtype.nullability() == Nullability::Nullable {
72 vortex_panic!(
74 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
75 dtype
76 );
77 }
78
79 struct_dtype
80 .fields()
81 .map(|field_dtype| {
82 StatsAccumulator::new(
83 &field_dtype,
84 &stats,
85 max_variable_length_statistics_size,
86 )
87 })
88 .collect()
89 }
90 None => [StatsAccumulator::new(
91 dtype,
92 &stats,
93 max_variable_length_statistics_size,
94 )]
95 .into(),
96 }));
97
98 Self {
99 stats,
100 accumulators,
101 ctx: Arc::new(Mutex::new(session.create_execution_ctx())),
102 }
103 }
104
105 fn process(
106 &self,
107 chunk: VortexResult<(SequenceId, ArrayRef)>,
108 ) -> VortexResult<(SequenceId, ArrayRef)> {
109 let (sequence_id, chunk) = chunk?;
110 let mut ctx = self.ctx.lock();
111 if chunk.dtype().is_struct() {
112 let struct_chunk = chunk.clone().execute::<StructArray>(&mut ctx)?;
113 for (acc, field) in self
114 .accumulators
115 .lock()
116 .iter_mut()
117 .zip_eq(struct_chunk.iter_unmasked_fields())
118 {
119 acc.push_chunk(field, &mut ctx)?;
120 }
121 } else {
122 self.accumulators.lock()[0].push_chunk(&chunk, &mut ctx)?;
123 }
124 Ok((sequence_id, chunk))
125 }
126
127 pub fn stats_sets(&self) -> Vec<StatsSet> {
128 let mut ctx = self.ctx.lock();
129 self.accumulators
130 .lock()
131 .iter_mut()
132 .map(|acc| {
133 acc.as_stats_set(&self.stats, &mut ctx)
134 .vortex_expect("as_stats_table should not fail")
135 })
136 .collect()
137 }
138}