vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical as _;
13use vortex_array::VortexSessionExecute;
14use vortex_array::dtype::DType;
15use vortex_array::dtype::Nullability;
16use vortex_array::expr::stats::Stat;
17use vortex_array::stats::StatsSet;
18use vortex_error::VortexExpect;
19use vortex_error::VortexResult;
20use vortex_error::vortex_panic;
21
22use crate::layouts::zoned::zone_map::StatsAccumulator;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequenceId;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28pub fn accumulate_stats(
29 stream: SendableSequentialStream,
30 stats: Arc<[Stat]>,
31 max_variable_length_statistics_size: usize,
32) -> (FileStatsAccumulator, SendableSequentialStream) {
33 let accumulator =
34 FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
35 let stream = SequentialStreamAdapter::new(
36 stream.dtype().clone(),
37 stream.scan(accumulator.clone(), |acc, item| {
38 future::ready(Some(acc.process(item)))
39 }),
40 )
41 .sendable();
42 (accumulator, stream)
43}
44
45#[derive(Clone)]
49pub struct FileStatsAccumulator {
50 stats: Arc<[Stat]>,
51 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
52}
53
54impl FileStatsAccumulator {
55 fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
56 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
57 Some(struct_dtype) => {
58 if dtype.nullability() == Nullability::Nullable {
59 vortex_panic!(
61 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
62 dtype
63 );
64 }
65
66 struct_dtype
67 .fields()
68 .map(|field_dtype| {
69 StatsAccumulator::new(
70 &field_dtype,
71 &stats,
72 max_variable_length_statistics_size,
73 )
74 })
75 .collect()
76 }
77 None => [StatsAccumulator::new(
78 dtype,
79 &stats,
80 max_variable_length_statistics_size,
81 )]
82 .into(),
83 }));
84
85 Self {
86 stats,
87 accumulators,
88 }
89 }
90
91 fn process(
92 &self,
93 chunk: VortexResult<(SequenceId, ArrayRef)>,
94 ) -> VortexResult<(SequenceId, ArrayRef)> {
95 let (sequence_id, chunk) = chunk?;
96 if chunk.dtype().is_struct() {
97 let chunk = chunk.to_struct();
98 for (acc, field) in self
99 .accumulators
100 .lock()
101 .iter_mut()
102 .zip_eq(chunk.unmasked_fields().iter())
103 {
104 acc.push_chunk(field)?;
105 }
106 } else {
107 self.accumulators.lock()[0].push_chunk(&chunk)?;
108 }
109 Ok((sequence_id, chunk))
110 }
111
112 pub fn stats_sets(&self) -> Vec<StatsSet> {
113 let mut ctx = LEGACY_SESSION.create_execution_ctx();
114 self.accumulators
115 .lock()
116 .iter_mut()
117 .map(|acc| {
118 acc.as_stats_table()
119 .vortex_expect("as_stats_table should not fail")
120 .map(|table| {
121 table
122 .to_stats_set(&self.stats, &mut ctx)
123 .vortex_expect("shouldn't fail to convert table we just created")
124 })
125 .unwrap_or_default()
126 })
127 .collect()
128 }
129}