vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::ToCanonical as _;
12use vortex_array::expr::stats::Stat;
13use vortex_array::stats::StatsSet;
14use vortex_dtype::DType;
15use vortex_dtype::Nullability;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19
20use crate::layouts::zoned::zone_map::StatsAccumulator;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequenceId;
23use crate::sequence::SequentialStreamAdapter;
24use crate::sequence::SequentialStreamExt;
25
26pub fn accumulate_stats(
27 stream: SendableSequentialStream,
28 stats: Arc<[Stat]>,
29 max_variable_length_statistics_size: usize,
30) -> (FileStatsAccumulator, SendableSequentialStream) {
31 let accumulator =
32 FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
33 let stream = SequentialStreamAdapter::new(
34 stream.dtype().clone(),
35 stream.scan(accumulator.clone(), |acc, item| {
36 future::ready(Some(acc.process(item)))
37 }),
38 )
39 .sendable();
40 (accumulator, stream)
41}
42
43#[derive(Clone)]
47pub struct FileStatsAccumulator {
48 stats: Arc<[Stat]>,
49 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
50}
51
52impl FileStatsAccumulator {
53 fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
54 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
55 Some(struct_dtype) => {
56 if dtype.nullability() == Nullability::Nullable {
57 vortex_panic!(
59 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
60 dtype
61 );
62 }
63
64 struct_dtype
65 .fields()
66 .map(|field_dtype| {
67 StatsAccumulator::new(
68 &field_dtype,
69 &stats,
70 max_variable_length_statistics_size,
71 )
72 })
73 .collect()
74 }
75 None => [StatsAccumulator::new(
76 dtype,
77 &stats,
78 max_variable_length_statistics_size,
79 )]
80 .into(),
81 }));
82
83 Self {
84 stats,
85 accumulators,
86 }
87 }
88
89 fn process(
90 &self,
91 chunk: VortexResult<(SequenceId, ArrayRef)>,
92 ) -> VortexResult<(SequenceId, ArrayRef)> {
93 let (sequence_id, chunk) = chunk?;
94 if chunk.dtype().is_struct() {
95 let chunk = chunk.to_struct();
96 for (acc, field) in self
97 .accumulators
98 .lock()
99 .iter_mut()
100 .zip_eq(chunk.fields().iter())
101 {
102 acc.push_chunk(field)?;
103 }
104 } else {
105 self.accumulators.lock()[0].push_chunk(&chunk)?;
106 }
107 Ok((sequence_id, chunk))
108 }
109
110 pub fn stats_sets(&self) -> Vec<StatsSet> {
111 self.accumulators
112 .lock()
113 .iter_mut()
114 .map(|acc| {
115 acc.as_stats_table()
116 .map(|table| {
117 table
118 .to_stats_set(&self.stats)
119 .vortex_expect("shouldn't fail to convert table we just created")
120 })
121 .unwrap_or_default()
122 })
123 .collect()
124 }
125}