vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical as _;
13use vortex_array::VortexSessionExecute;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::expr::stats::Stat;
18use vortex_array::stats::StatsSet;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_panic;
22
23use crate::layouts::zoned::zone_map::StatsAccumulator;
24use crate::sequence::SendableSequentialStream;
25use crate::sequence::SequenceId;
26use crate::sequence::SequentialStreamAdapter;
27use crate::sequence::SequentialStreamExt;
28
29pub fn accumulate_stats(
30 stream: SendableSequentialStream,
31 stats: Arc<[Stat]>,
32 max_variable_length_statistics_size: usize,
33) -> (FileStatsAccumulator, SendableSequentialStream) {
34 let accumulator =
35 FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
36 let stream = SequentialStreamAdapter::new(
37 stream.dtype().clone(),
38 stream.scan(accumulator.clone(), |acc, item| {
39 future::ready(Some(acc.process(item)))
40 }),
41 )
42 .sendable();
43 (accumulator, stream)
44}
45
46#[derive(Clone)]
50pub struct FileStatsAccumulator {
51 stats: Arc<[Stat]>,
52 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
53}
54
55impl FileStatsAccumulator {
56 fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
57 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
58 Some(struct_dtype) => {
59 if dtype.nullability() == Nullability::Nullable {
60 vortex_panic!(
62 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
63 dtype
64 );
65 }
66
67 struct_dtype
68 .fields()
69 .map(|field_dtype| {
70 StatsAccumulator::new(
71 &field_dtype,
72 &stats,
73 max_variable_length_statistics_size,
74 )
75 })
76 .collect()
77 }
78 None => [StatsAccumulator::new(
79 dtype,
80 &stats,
81 max_variable_length_statistics_size,
82 )]
83 .into(),
84 }));
85
86 Self {
87 stats,
88 accumulators,
89 }
90 }
91
92 fn process(
93 &self,
94 chunk: VortexResult<(SequenceId, ArrayRef)>,
95 ) -> VortexResult<(SequenceId, ArrayRef)> {
96 let (sequence_id, chunk) = chunk?;
97 if chunk.dtype().is_struct() {
98 let chunk = chunk.to_struct();
99 for (acc, field) in self
100 .accumulators
101 .lock()
102 .iter_mut()
103 .zip_eq(chunk.iter_unmasked_fields())
104 {
105 acc.push_chunk(field)?;
106 }
107 } else {
108 self.accumulators.lock()[0].push_chunk(&chunk)?;
109 }
110 Ok((sequence_id, chunk))
111 }
112
113 pub fn stats_sets(&self) -> Vec<StatsSet> {
114 let mut ctx = LEGACY_SESSION.create_execution_ctx();
115 self.accumulators
116 .lock()
117 .iter_mut()
118 .map(|acc| {
119 acc.as_stats_table()
120 .vortex_expect("as_stats_table should not fail")
121 .map(|table| {
122 table
123 .to_stats_set(&self.stats, &mut ctx)
124 .vortex_expect("shouldn't fail to convert table we just created")
125 })
126 .unwrap_or_default()
127 })
128 .collect()
129 }
130}